Skip to content

feat(pipeline): cycle-aware scheduler and topo-sort safety (#245 layer 4)#250

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-4-cycle-scheduler
May 28, 2026
Merged

feat(pipeline): cycle-aware scheduler and topo-sort safety (#245 layer 4)#250
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-4-cycle-scheduler

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Fourth layer of the unification (#245). `PipelineEngine` gains the ability to run cyclic DAGs (ReAct loops, retry-with-critique) and the long-standing silent corruption in `topological_sort`/`execution_levels` on cyclic graphs is fixed.

Stacked on layer 3 (#249) which is already merged into `issue-147-pipeline-evolution`.

New surface

```python
engine = PipelineEngine(
dag, # may contain cycles (allow_cycles=True)
state_schema=MyState,
recursion_limit=10, # NEW; default 25
)
```

How cycles work

`PipelineEngine.run()` checks `dag.is_cyclic()` and routes to a new `_run_cyclic` helper for cyclic graphs. Acyclic graphs continue to use the existing parallel topological scheduler — unchanged.

The cyclic scheduler:

  1. Picks an entry node (insertion order, matching `StatePipeline`).
  2. Executes; audits with the current visit count; checkpoints on success.
  3. Walks each completed node's outgoing edges; picks the unique alive one as the next node.
  4. Increments a per-node visit counter; if any node's visit count exceeds `recursion_limit`, halts with a clear failure message.
  5. Stops when no outgoing edge is alive.

Cycle-mode is sequential within the cycle — multi-target alive fan-out (true parallel branches inside a cycle) is the job of `Send` in layer 5; for now multiple alive outgoing edges raise a clean `PipelineError`.

ReAct-style example

```python
class State(BaseModel):
counter: int = 0

dag = DAG("loop", allow_cycles=True)
dag.add_node(DAGNode(node_id="agent", step=ReActStep()))
dag.add_node(DAGNode(node_id="judge", step=JudgeStep()))
dag.add_edge(DAGEdge(source="agent", target="judge"))
dag.add_edge(
DAGEdge(source="judge", target="agent",
condition=lambda ctx: ctx.state.counter < 3)
)
engine = PipelineEngine(dag, state_schema=State, recursion_limit=10)
```

DAG silent-corruption fix

Both `topological_sort()` and `execution_levels()` previously misbehaved on cyclic DAGs:

  • `topological_sort()` raised `PipelineError("DAG contains a cycle (should not reach here)")` — the message was wrong because the engine's new `allow_cycles=True` mode could indeed reach there.
  • `execution_levels()` silently produced an incomplete list of levels (dropping nodes inside the cycle) with no error at all.

Both now raise `PipelineError` immediately, pointing the caller at `is_cyclic()` as the right precheck. This was the latent bug I flagged in the #245 review.

Audit visit numbers

`_record_audit` now accepts `visit=`. Acyclic scheduler passes `visit=1`; cyclic scheduler passes the actual iteration. `FileAuditLog.list_entries()` produces one entry per visit, distinguishable by the `visit` field. ReAct loops are debuggable.

Backward compatibility

  • `recursion_limit` default of 25 matches `StatePipeline`.
  • Acyclic engines are completely unchanged — no behavior diff, no perf cost.
  • `PipelineEventHandler` legacy handlers continue to work; `visit` is only delivered to handlers that declare the parameter (the layer 1B dispatch handles this).

Tests

7 new tests in `tests/unit/pipeline/test_pipeline_engine_cycles.py`:

  • `topological_sort` raises on cyclic DAGs
  • `execution_levels` raises on cyclic DAGs
  • ReAct-style finite loop terminates correctly
  • Recursion limit halts runaway cycle
  • Default `recursion_limit` is 25
  • Audit visit number increments per iteration
  • Acyclic DAG with `allow_cycles=True` still uses parallel scheduler

Full suite: 1580 passed.

What's NOT in this PR

  • Parallel fan-out inside cycles — multiple alive outgoing edges from a cyclic node raise a clean error today; `Send` in layer 5 lifts that restriction.
  • Mid-cycle resume — same caveat `StatePipeline` already has (resume across fan-out is not supported).
  • Concurrent in-cycle execution — within a cycle, scheduling is sequential. Acyclic execution remains fully parallel.

Refs

…r 4)

Fourth layer of the unification. PipelineEngine gains the ability to run
cyclic DAGs (ReAct loops, retry-with-critique) and the long-standing silent
corruption in topological_sort/execution_levels on cyclic graphs is fixed.

Engine changes:

- PipelineEngine(__init__) accepts recursion_limit: int = 25 (matches
  StatePipeline). Bounds visit count per node in cyclic mode.
- run() detects cyclic DAGs via dag.is_cyclic() and routes to a new
  _run_cyclic helper. Acyclic graphs use the existing topological scheduler
  unchanged.
- _run_cyclic: sequential frontier-following. Picks the unique alive
  outgoing edge from each completed node, increments per-node visit count,
  enforces recursion_limit. Fan-out to multiple alive edges raises a clear
  PipelineError (multi-target cyclic fan-out arrives with Send in layer 5).
- _record_audit accepts visit=, defaulting to 1 for the acyclic scheduler.
  The cyclic scheduler passes the actual visit number so audit entries
  distinguish iterations.

DAG changes (silent-corruption fix from #245's review):

- topological_sort() now raises PipelineError on cyclic graphs instead of
  returning a wrong-length list with a misleading "should not reach here"
  message.
- execution_levels() now raises PipelineError on cyclic graphs instead of
  silently producing incomplete levels.

Both methods document is_cyclic() as the right pre-check.

Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_cycles.py
covering:
- topological_sort raises on cyclic DAGs
- execution_levels raises on cyclic DAGs
- ReAct-style finite loop terminates correctly
- Recursion limit halts runaway cycle
- Default recursion_limit is 25
- Audit visit number increments per iteration
- Acyclic DAG with allow_cycles=True still uses the parallel scheduler

Full suite: 1580 passed.

Refs: #245
@miguelgfierro miguelgfierro merged commit aab6b3c into issue-147-pipeline-evolution May 28, 2026
@miguelgfierro miguelgfierro deleted the unify-pipeline-4-cycle-scheduler branch May 28, 2026 12:59
ancongui pushed a commit that referenced this pull request May 31, 2026
…scheduler

feat(pipeline): cycle-aware scheduler and topo-sort safety (#245 layer 4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant