feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 layer 8)#255
Merged
miguelgfierro merged 1 commit intoMay 28, 2026
Conversation
…ayer 8) Final layer of the unification. StatePipeline + state_pipeline.py + the dual-mode logic in builder.py are gone; PipelineEngine is the only executor. PipelineBuilder(state=...) now constructs a PipelineEngine configured with state_schema, recursion_limit, audit_log, checkpointer, event_handler, and any routers registered via .branch(...). No deprecation cycle needed: StatePipeline never landed on main (introduced in #232 which is the umbrella PR). All consumers move at the same time. Engine changes: - PipelineEngine accepts routers: dict[str, RouterFn] and router_mappings: dict[str, dict[str, str]]. When non-empty (or the DAG is cyclic), the engine routes through the cyclic frontier scheduler. - New _cyclic_next consults routers first, then falls back to edge conditions. - _resolve_router_decision lifts the router-return → next-step translation from state_pipeline. - _run_cyclic now handles Pause, Send, and list[Send] returns alongside the dict/state-update path. - _run_sends accepts visit_counts; per-Send visit numbers are tracked per target so observability shows the right counter across fan-out. - PipelineEngine.to_mermaid renders branch labels from router_mappings. - PipelineEngine.invoke(state, ...) shorthand mirrors the StatePipeline.invoke signature so test migration stays mechanical. - _record_audit accepts status_override + pause_reason so a Pause-returning node is audited as "paused" rather than "success". - Span names use "pipeline.state.*" when state_schema is set (matches the legacy taxonomy observability dashboards already key on). - on_node_start is now emitted by the schedulers (acyclic main loop, cyclic loop, _run_sends) — removed from _execute_node so the visit number is correct in every scheduler. - _load_for_resume returns list[str] (ordered) instead of set[str] so PipelineResult.completed_nodes after resume reflects the original execution order. - Resume seeds trace_entries with the pre_completed nodes so PipelineResult.completed_nodes (now derived from execution_trace) includes the full history. Builder changes: - PipelineBuilder.build() always returns PipelineEngine. The state-mode branch wires state_schema/routers/router_mappings into the engine. - New _StateStepAdapter wraps a state-mode fn into a StepExecutor so the unified engine's _execute_node can run it. - _coerce_state_node_fn (moved from state_pipeline.py) keeps the "async def, def, or .run(state) object" forms working. - _StateNodePlaceholder is gone — state-mode nodes now run real code. Result changes: - PipelineResult grows state-mode convenience properties: state (alias of final_state), completed_nodes (derived from execution_trace so cyclic visits appear individually), failed_node, error. Deletions: - fireflyframework_agentic/pipeline/state_pipeline.py (~750 LOC) - StatePipeline, StatePipelineResult, StatePipelineEventHandler, RecursionLimitError, BranchSpec, StateNodeFn (in state_pipeline) - _StateNodePlaceholder (in builder) - tests/unit/pipeline/test_state_pipeline_deprecation.py (Layer 7's warning is gone too) - " StatePipeline," imports and isinstance assertions across the state-pipeline test files. Migration notes (internal to this PR): - Pause and Send already live in engine.py since Layer 5; this PR just removes the state_pipeline re-export. - StatePipelineEventHandler removed; existing usages either implement EventHandler (the unified protocol) or the legacy PipelineEventHandler. Full unit suite: 1594 passed. Net diff: -507 LOC (-955 deleted, +448 added). Refs: #245
miguelgfierro
added a commit
that referenced
this pull request
May 28, 2026
…-example fix(example): software-factory pipeline imports PipelineEngine (post-#255)
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
…-state-pipeline feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 layer 8)
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
Layer 8 (#255) deleted StatePipeline but I missed the import + return type in examples/software_factory/pipeline.py. CI on #232 fails to collect tests/examples/software_factory/test_pipeline.py with: ImportError: cannot import name 'StatePipeline' from 'fireflyframework_agentic.pipeline' Fix: import PipelineEngine instead and drop the now-unnecessary cast. PipelineBuilder.build() returns PipelineEngine directly after Layer 8. Test passes locally.
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
…-example fix(example): software-factory pipeline imports PipelineEngine (post-#255)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Final layer of the unification (#245).
StatePipeline+state_pipeline.py+ the dual-mode logic inbuilder.pyare gone;PipelineEngineis the only executor.Per the discussion: no deprecation cycle needed —
StatePipelinenever landed onmain(introduced in #232, the umbrella PR). All consumers move at the same time.Stacked on layer 7 (#253) which is already merged into
issue-147-pipeline-evolution. Layer 7'sDeprecationWarningis removed by this PR — the class it pointed at is gone.What lands
PipelineBuilder(state=...)now returns aPipelineEngineconfigured withstate_schema,routers,router_mappings, etc. Same fluent API; one executor.Engine
PipelineEngineacceptsrouters: dict[node_id, RouterFn]androuter_mappings: dict[node_id, dict[label, target]]. When non-empty (or the DAG is cyclic), the engine routes through the cyclic frontier scheduler._cyclic_nextconsults routers first, then falls back to edge conditions._resolve_router_decisionlifts the router-return → next-step translation from the deletedstate_pipeline._run_cyclichandlesPause,Send, andlist[Send]returns alongside the dict/state-update path._run_sendsacceptsvisit_counts; per-Send visit numbers are tracked per target so observability shows the right counter across fan-out.PipelineEngine.to_mermaid()renders branch labels fromrouter_mappings.PipelineEngine.invoke(state, ...)shorthand mirrors the legacyStatePipeline.invokesignature so test migration stays mechanical._record_auditacceptsstatus_override+pause_reasonso a Pause-returning node is audited as"paused"rather than"success"."pipeline.state.*"whenstate_schemais set (matches the legacy taxonomy observability dashboards key on).on_node_startis now emitted by the schedulers (acyclic main loop, cyclic loop,_run_sends) — removed from_execute_nodeso the visit number is correct in every scheduler._load_for_resumereturnslist[str](ordered) instead ofset[str]soPipelineResult.completed_nodesafter resume reflects the original execution order.trace_entrieswith the pre-completed nodes soPipelineResult.completed_nodes(now derived fromexecution_trace) includes the full history.Builder
PipelineBuilder.build()always returnsPipelineEngine. The state-mode branch wiresstate_schema/routers/router_mappingsinto the engine._StateStepAdapterwraps a state-mode fn into aStepExecutorso the unified engine's_execute_nodecan run it._coerce_state_node_fn(moved from state_pipeline.py) keeps the "async def, def, or .run(state) object" forms working._StateNodePlaceholderis gone — state-mode nodes now run real code.Result
PipelineResultgrows state-mode convenience properties:state— alias offinal_statecompleted_nodes— derived fromexecution_traceso cyclic visits appear individually (matches StatePipelineResult semantics)failed_node,error— first-failed-node convenience accessorsDeletions
fireflyframework_agentic/pipeline/state_pipeline.py(~750 LOC)StatePipeline,StatePipelineResult,StatePipelineEventHandler,RecursionLimitError,BranchSpec,StateNodeFn(moved tobuilder.pyas a type alias)_StateNodePlaceholder(in builder)tests/unit/pipeline/test_state_pipeline_deprecation.py(Layer 7's warning is gone too)StatePipeline,imports andisinstance(pipeline, StatePipeline)assertions across the state-pipeline test files.Backward compatibility
PauseandSendalready live inengine.pysince Layer 5; this PR just removes thestate_pipelinere-export. Publicfrom fireflyframework_agentic.pipeline import Pause, Sendstill works.PipelineEventHandler(the legacy port-based protocol) is kept and still works.PipelineBuilder(state=...)continues to accept all the same kwargs; the build result is just aPipelineEngineinstead of aStatePipelinewrapper.Tests
Full unit suite: 1594 passed. Migration patches across the state-pipeline tests:
result.statekeeps working via the newPipelineResult.stateproperty.result.completed_nodesderives fromexecution_traceso cyclic visits are preserved.pipeline.invoke(state)works via the engine'sinvokeshorthand.pipeline.to_mermaid()works via the engine'sto_mermaidmethod.Net diff
-507 LOC (-955 deleted, +448 added). The unification leaves the codebase smaller than #232 left it, with every feature preserved.
Refs
issue-147-pipeline-evolution