~/About~/Foundry~/Blueprint~/Journal~/Projects
Book a Call
Journal

Building a DAG Orchestrator That Rewrites Its Own Execution Plan

From the Workflow Automation Engine system

·8 min read·Kingsley Onoh·View on GitHub

Step 7 is a condition: if the HTTP response came back 200, continue to step 8 and step 9. If not, skip them and jump to step 10. Five steps downstream of a single boolean, two possible paths, and a directed acyclic graph that only understands one thing: which step depends on which.

DAGs don't branch. That was the problem I had to solve.

The linear model

The linear step types worked without issues. HTTP calls, data transforms, delays, sub-workflows. The orchestrator in orchestrator.py called topological_sort() from graph.py, got back a flat list of steps in dependency order, and executed them one by one. After each step completed, its output merged into a shared JSONB context accessible to every downstream step via Jinja2 expressions like {{ steps.fetch_data.output.body }}.

The execution model was simple: walk the sorted list, execute each step, persist the state to PostgreSQL, move to the next. No branching. No decisions. A conveyor belt that worked perfectly for linear workflows.

Then I needed conditions.

Why the graph stays flat

The workflow definition needed to stay simple: a flat list of steps with plain depends_on references. Conditions should be a step type like any other. The branching logic should live at runtime, not in the graph structure.

Most workflow engines encode branching directly into the graph. BPMN uses gateway nodes that split the flow into separate paths. Airflow uses BranchPythonOperator to select downstream tasks. You define the branches as part of the DAG, each path continues independently, and eventually they converge at a join node.

I sketched this out and hit two problems. A 12-step workflow with three conditions and two paths each produces a graph where users need to reason about up to eight distinct execution paths when designing the workflow. The depends_on array in each step definition becomes a tangled web of conditional references. Step 10 depends on step 7, but only if step 7's condition was true. The dependency model needs to become richer than "this step waits for that step."

The second problem is structural. The DAG parser in parser.py already handles cycle detection and depth validation on the flat dependency graph. Introducing branch paths would require a second validation pass for branch connectivity, dead-end detection, and convergence verification. Every new validation rule is a new failure mode for users to debug.

The skip set

The solution was to keep the topological sort and add a mutable skipped_steps set that grows during execution. When the orchestrator reaches a condition step, it evaluates the Jinja2 expression and calls _resolve_condition_branches(). That function reads the condition's true_branch and false_branch config (each a list of step IDs) and adds the non-taken branch's steps to the skip set.

async def _resolve_condition_branches(
    self,
    step_def: StepDefinition,
    output: dict,
) -> set[str]:
    skipped = set()
    result = output.get("result", False)
    if result:
        skipped.update(step_def.config.get("false_branch", []))
    else:
        skipped.update(step_def.config.get("true_branch", []))
    return skipped

Before executing each step in the sorted order, the orchestrator checks: is this step ID in skipped_steps? If yes, transition its state to skipped and move on. The skipped state is terminal in the state machine (empty transition set, no further changes allowed). The topological order never changes. The graph structure never changes. Steps just get removed from the plan as it runs.

A 12-step workflow with three conditions still has 12 entries in the topological sort. The orchestrator always walks the same list. It just skips some entries based on runtime evaluation. No path explosion. No conditional dependency syntax. No gateway nodes.

Where the truthiness got messy

The condition step evaluates a Jinja2 expression and gets back a string. Jinja2 doesn't return Python booleans from template rendering. Everything comes back as text. So the string "False" needs to be treated as falsy, and "1" as truthy.

I defined a _FALSY_VALUES frozenset in condition.py:

_FALSY_VALUES: frozenset[str] = frozenset({
    "", "false", "0", "none", "False", "None",
})

This handles the common cases: empty strings, Python-style false values, string representations of zero. But the orchestrator also needs to resolve which branch was taken, and it has its own truthiness check for the condition output. I ended up with the same logic in two places: the condition executor computes the boolean and writes it to the output, and the orchestrator reads that output to populate the skip set. Both places need to agree on what "false" means.

The duplication is a code smell I haven't fixed. The orchestrator should trust the result boolean from the condition's output (which the executor already computed) instead of re-evaluating truthiness on the raw expression result. It works today, but there's a subtle bug waiting if someone changes the falsy list in one place and not the other.

The state machine underneath

Making skip sets work required a state machine with six distinct step states: pending, queued, running, completed, failed, and skipped. The transitions are defined as an adjacency dictionary in state.py:

ALLOWED_STEP_TRANSITIONS: dict[str, set[str]] = {
    "pending": {"queued", "skipped"},
    "queued": {"running"},
    "running": {"completed", "failed"},
    "failed": {"queued"},
    "completed": set(),
    "skipped": set(),
}

Two details matter here. First, skipped has an empty transition set. Once a step is skipped, it stays skipped. You can't un-skip a step during execution. The condition evaluated, the branch was chosen, the decision is final.

Second, failed can transition back to queued. That's the retry mechanism. When a step fails and has retry attempts remaining (default 3, configurable up to 10 via RetryConfig), the orchestrator transitions it back to queued and re-executes. Each attempt updates step_exec.attempt in PostgreSQL before the retry runs. If the process crashes between retries, the attempt count survives in the database.

Every state transition writes to PostgreSQL before anything else happens. The orchestrator doesn't execute step N+1 until step N's final state is persisted. This is deliberately slow. An in-memory state machine would be faster, but if the worker dies between steps, every execution in flight would lose its state. Persist-before-execute means a crash leaves every execution in a known, recoverable position.

Cycle detection on the reverse graph

The skip set approach only works if the graph is actually a DAG. Cycles would cause the topological sort to loop forever. The detect_cycles() function in graph.py uses DFS with three-color marking: white (unvisited), gray (currently being explored), black (fully explored). A back edge from any node to a gray node means a cycle.

The non-obvious detail: the function builds a reverse graph where each entry maps a dependency to its dependents, not the other way around. If step C depends on step B, the reverse graph has B: [C]. DFS then explores "who depends on me?" paths. Cycles in that direction are what would break execution order. When a cycle is found, _reconstruct_cycle() walks backward through parent pointers to produce the full path: ["step_a", "step_b", "step_c", "step_a"]. The error message shows exactly which steps form the loop.

Maximum DAG depth is capped at 20 via a hardcoded MAX_DEPTH constant. The depth calculation uses Kahn's algorithm with a twist: instead of just sorting, it tracks the longest path to each node. If step D depends on both B and C, and B is at depth 2 while C is at depth 4, then D gets depth 5. The maximum across all nodes determines the workflow's depth.

What surprised me

I expected the hard part to be the topological sort or the cycle detection. Both were textbook implementations of well-known algorithms. The hard part was getting the skip set to interact correctly with depends_on.

Consider: step 9 depends on step 8, and step 8 is in the false branch of step 7's condition. If step 7 evaluates to true, step 8 gets skipped. But step 9 is still in the topological order. Its dependency was skipped, not completed. Should step 9 execute?

The answer I landed on: if all dependencies of a step are either completed or skipped, the step can execute. A skipped dependency counts as resolved for scheduling purposes. This lets you build workflows where step 9 runs regardless of which branch step 7 took, as long as its other dependencies are met. The orchestrator checks this when deciding which steps to queue next.

I could have made skipped dependencies block execution. But that would mean a single condition deep in the DAG could freeze all downstream steps, even ones that don't care about the branch result. The current behavior is more useful: skip sets remove steps from the plan, but they don't create invisible walls in the dependency graph.

The numbers

The engine handles workflows up to 50 steps with a maximum DAG depth of 20 in the longest dependency chain. Condition branching adds zero overhead to the execution plan because the topological sort runs once, before any step executes. The skip set is a Python set() with O(1) lookups. State persistence adds one database write per step transition, which at 15 connections in the pool (5 steady plus 10 overflow) handles the concurrency comfortably.

544 tests cover the engine, including edge cases for nested conditions (a condition inside another condition's true branch), skipped steps with downstream dependents, retry of a failed step after a prior execution was replayed, and sub-workflows that inherit the parent's depth counter. The condition branching logic alone accounts for 12 dedicated test cases covering truthiness evaluation, branch resolution, and the interaction between skip sets and the dependency resolver.

#dag-orchestration#state-machines#conditional-branching#python

The architecture behind this essay for Workflow Automation Engine

Get Notified

New system breakdown? You'll know first.