feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)#233
Merged
Merged
Conversation
Adds the agentic-loop and fan-out features deferred from phase 1, plus
visualization on the DAG, plus soft-deprecation of the legacy branching/
fan-out steps. Phase 1 surface is preserved; everything here is additive.
What's new:
- Send(target, payload) dataclass. Routers can return list[Send] for
runtime fan-out: workers run concurrently with their own payload-merged
state copy; results reduce back into shared state.
- Cycles supported in state mode. PipelineBuilder(state=...) constructs
the underlying DAG with allow_cycles=True so a node can route back to
itself for ReAct loops / retry-with-critique.
- recursion_limit kwarg on PipelineBuilder (default 25). Per-node visit
counter aborts runaway cycles with a clean failure result.
- DAG.to_mermaid() / DAG.to_json() for any DAG.
- StatePipeline.to_mermaid() that adds branch-edge labels from the
registered mapping.
- BranchStep and FanOutStep emit DeprecationWarning pointing to
.branch(...) and Send(...). Existing pipelines continue to work.
API additions exported from pipeline.__init__:
Send, RecursionLimitError
Tests: 9 new in test_state_pipeline_phase2.py covering loop-with-exit,
recursion_limit, map-reduce-style fan-out, unknown-target error,
Mermaid/JSON output, and deprecation warning emission. Pipeline suite
now 97 passed (88 phase-1 + 9 phase-2); full unit suite 1405 passed.
Ruff check + format clean. Pyright clean on touched modules.
…eline - examples/pipeline_state.py: runnable demo covering branching, software-factory with checkpoint/resume, and Send-based map-reduce. No API key needed. - docs/pipeline.md: new "State-Based Pipelines" section documenting state schema, reducers, .branch, checkpoint/resume, recursion_limit, Send fan-out, Mermaid export, and a "when to use which mode" comparison. Mark BranchStep / FanOutStep as deprecated with pointers to the new API. - state_pipeline.py: small simplifications from the simplifier pass — dead 'rendered' set removed from to_mermaid, _common_successor uses direct equality instead of set intersection, dropped unused last_node_id tracker, _NodeFailureError as a plain Exception subclass instead of @DataClass. 97 pipeline tests still green; ruff check + format clean.
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on #232. Closes the remaining items from #147 left out of phase 1.
What's new
Cycles (ReAct / retry-with-critique). In state mode, the underlying DAG is now built with
allow_cycles=True, so a node can route back to itself via the existing.branch(...)mechanism. Arecursion_limitkwarg onPipelineBuilder(default 25) acts as the safety net — a runaway loop surfaces asresult.success=Falsewith a clear message, not an infinite hang.Send(target, payload)runtime fan-out. Routers can returnlist[Send]to dispatch concurrent work over the same worker (or different workers). Each Send gets its own state copy with the payload merged in; the worker's return reduces back into shared state. Replaces the legacyFanOutSteppattern.When all workers share a common successor (typical map-reduce shape), execution continues at that successor automatically. The aggregator runs once with all fan-out results already merged into state.
Mermaid + JSON export.
DAG.to_mermaid()/DAG.to_json()for any DAG.StatePipeline.to_mermaid()adds branch-edge labels from the registered mapping (`a -->|label| b`).Soft-deprecation of
BranchStep/FanOutStep. Construction emits aDeprecationWarningpointing at.branch(...)andSend(...). Existing pipelines keep working — removal is a separate decision, gated on internal callers migrating.API additions
Exported from
fireflyframework_agentic.pipeline:SendRecursionLimitErrorPipelineBuilder(...)gainsrecursion_limit: int = 25.Implementation notes
asyncio.gather, so concurrency is intra-fan-out only. That matches the typical agentic use case (one branch at a time, fan-out for map-reduce) without committing to full parallel-DAG semantics.apply_updatereducer machinery that ordinary node updates use — no special-casing._common_successorlooks for a single shared successor across all worker targets and continues from there. If workers have divergent successors, the run ends (an aggregator pattern with a shared successor is the supported shape; richer fan-out topologies land in a future phase).Tests
tests/unit/pipeline/test_state_pipeline_phase2.py(9 tests, all green):recursion_limit=5triggers a clean failure on an infinite-loop router.DAG.to_mermaid()renders nodes and edges.DAG.to_json()produces a round-trippable JSON document.StatePipeline.to_mermaid()labels branch edges with the mapping labels.BranchStep(...)emitsDeprecationWarningpointing to.branch(...).FanOutStep(...)emitsDeprecationWarningpointing toSend.Suite-wide:
pytest tests/unit/pipeline/— 97 passed (88 phase 1 + 9 phase 2)pytest tests/unit/— 1405 passed (no regressions)ruff check+ruff format --checkcleanpyrightclean on touched modulesStacking
Targets
issue-147-pipeline-evolution(the phase-1 branch). After #232 merges to main, this PR's base will retarget to main automatically.Out of scope (phase 3 / follow-up)
pipeline.stream()).Literal[...]validation on node return annotations.BranchStep/FanOutStep.