Skip to content

feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)#233

Merged
miguelgfierro merged 2 commits into
issue-147-pipeline-evolutionfrom
issue-147-phase-2
May 27, 2026
Merged

feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)#233
miguelgfierro merged 2 commits into
issue-147-pipeline-evolutionfrom
issue-147-phase-2

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Stacked on #232. Closes the remaining items from #147 left out of phase 1.

What's new

Cycles (ReAct / retry-with-critique). In state mode, the underlying DAG is now built with allow_cycles=True, so a node can route back to itself via the existing .branch(...) mechanism. A recursion_limit kwarg on PipelineBuilder (default 25) acts as the safety net — a runaway loop surfaces as result.success=False with a clear message, not an infinite hang.

async def step(state):
    return {"counter": state.counter + 1}

def route(state):
    return "done" if state.counter >= 3 else "step"

PipelineBuilder("loop", state=LoopState, recursion_limit=25)
    .add_node(step).add_node(done).branch(step, route).build()

Send(target, payload) runtime fan-out. Routers can return list[Send] to dispatch concurrent work over the same worker (or different workers). Each Send gets its own state copy with the payload merged in; the worker's return reduces back into shared state. Replaces the legacy FanOutStep pattern.

def dispatch(state):
    return [Send(\"worker\", {\"item\": x}) for x in state.items]

PipelineBuilder(\"mapreduce\", state=FanOutState)
    .add_node(planner).add_node(worker).add_node(collect)
    .add_edge(worker, collect)
    .branch(planner, dispatch)
    .build()

When all workers share a common successor (typical map-reduce shape), execution continues at that successor automatically. The aggregator runs once with all fan-out results already merged into state.

Mermaid + JSON export. DAG.to_mermaid() / DAG.to_json() for any DAG. StatePipeline.to_mermaid() adds branch-edge labels from the registered mapping (`a -->|label| b`).

Soft-deprecation of BranchStep / FanOutStep. Construction emits a DeprecationWarning pointing at .branch(...) and Send(...). Existing pipelines keep working — removal is a separate decision, gated on internal callers migrating.

API additions

Exported from fireflyframework_agentic.pipeline:

  • Send
  • RecursionLimitError

PipelineBuilder(...) gains recursion_limit: int = 25.

Implementation notes

  • The executor remains sequential at the source level. Fan-out runs the per-Send tasks via asyncio.gather, so concurrency is intra-fan-out only. That matches the typical agentic use case (one branch at a time, fan-out for map-reduce) without committing to full parallel-DAG semantics.
  • Per-Send state isolation uses the same apply_update reducer machinery that ordinary node updates use — no special-casing.
  • After fan-out, _common_successor looks for a single shared successor across all worker targets and continues from there. If workers have divergent successors, the run ends (an aggregator pattern with a shared successor is the supported shape; richer fan-out topologies land in a future phase).
  • Resume across a fan-out boundary is explicitly rejected for now with a clear error — the checkpoint format would need to track in-flight Send tasks, which is phase-3 territory.

Tests

tests/unit/pipeline/test_state_pipeline_phase2.py (9 tests, all green):

  • Cycle with exit router runs the loop 3× then terminates.
  • recursion_limit=5 triggers a clean failure on an infinite-loop router.
  • 3-way fan-out runs all workers concurrently and the reducer aggregates results before the collector node runs.
  • Send to an unknown target surfaces as a failed result (not an exception).
  • DAG.to_mermaid() renders nodes and edges.
  • DAG.to_json() produces a round-trippable JSON document.
  • StatePipeline.to_mermaid() labels branch edges with the mapping labels.
  • BranchStep(...) emits DeprecationWarning pointing to .branch(...).
  • FanOutStep(...) emits DeprecationWarning pointing to Send.

Suite-wide:

  • pytest tests/unit/pipeline/ — 97 passed (88 phase 1 + 9 phase 2)
  • pytest tests/unit/ — 1405 passed (no regressions)
  • ruff check + ruff format --check clean
  • pyright clean on touched modules

Stacking

Targets issue-147-pipeline-evolution (the phase-1 branch). After #232 merges to main, this PR's base will retarget to main automatically.

Out of scope (phase 3 / follow-up)

  • Resume across a fan-out boundary (in-flight Send checkpointing).
  • Streaming partial state (pipeline.stream()).
  • Typed-edge Literal[...] validation on node return annotations.
  • Redis / Postgres checkpointer implementations.
  • Removal (not just soft-deprecation) of BranchStep / FanOutStep.

Adds the agentic-loop and fan-out features deferred from phase 1, plus
visualization on the DAG, plus soft-deprecation of the legacy branching/
fan-out steps. Phase 1 surface is preserved; everything here is additive.

What's new:
- Send(target, payload) dataclass. Routers can return list[Send] for
  runtime fan-out: workers run concurrently with their own payload-merged
  state copy; results reduce back into shared state.
- Cycles supported in state mode. PipelineBuilder(state=...) constructs
  the underlying DAG with allow_cycles=True so a node can route back to
  itself for ReAct loops / retry-with-critique.
- recursion_limit kwarg on PipelineBuilder (default 25). Per-node visit
  counter aborts runaway cycles with a clean failure result.
- DAG.to_mermaid() / DAG.to_json() for any DAG.
- StatePipeline.to_mermaid() that adds branch-edge labels from the
  registered mapping.
- BranchStep and FanOutStep emit DeprecationWarning pointing to
  .branch(...) and Send(...). Existing pipelines continue to work.

API additions exported from pipeline.__init__:
    Send, RecursionLimitError

Tests: 9 new in test_state_pipeline_phase2.py covering loop-with-exit,
recursion_limit, map-reduce-style fan-out, unknown-target error,
Mermaid/JSON output, and deprecation warning emission. Pipeline suite
now 97 passed (88 phase-1 + 9 phase-2); full unit suite 1405 passed.

Ruff check + format clean. Pyright clean on touched modules.
@miguelgfierro miguelgfierro marked this pull request as ready for review May 27, 2026 14:07
…eline

- examples/pipeline_state.py: runnable demo covering branching, software-factory
  with checkpoint/resume, and Send-based map-reduce. No API key needed.
- docs/pipeline.md: new "State-Based Pipelines" section documenting state schema,
  reducers, .branch, checkpoint/resume, recursion_limit, Send fan-out, Mermaid
  export, and a "when to use which mode" comparison. Mark BranchStep / FanOutStep
  as deprecated with pointers to the new API.
- state_pipeline.py: small simplifications from the simplifier pass —
  dead 'rendered' set removed from to_mermaid, _common_successor uses direct
  equality instead of set intersection, dropped unused last_node_id tracker,
  _NodeFailureError as a plain Exception subclass instead of @DataClass.

97 pipeline tests still green; ruff check + format clean.
@miguelgfierro miguelgfierro merged commit 9bc4d39 into issue-147-pipeline-evolution May 27, 2026
@miguelgfierro miguelgfierro deleted the issue-147-phase-2 branch May 27, 2026 14:23
ancongui pushed a commit that referenced this pull request May 31, 2026
feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant