Skip to content

feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 layer 8)#255

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-8-delete-state-pipeline
May 28, 2026
Merged

feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 layer 8)#255
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-8-delete-state-pipeline

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Final layer of the unification (#245). StatePipeline + state_pipeline.py + the dual-mode logic in builder.py are gone; PipelineEngine is the only executor.

Per the discussion: no deprecation cycle needed — StatePipeline never landed on main (introduced in #232, the umbrella PR). All consumers move at the same time.

Stacked on layer 7 (#253) which is already merged into issue-147-pipeline-evolution. Layer 7's DeprecationWarning is removed by this PR — the class it pointed at is gone.

What lands

PipelineBuilder(state=...) now returns a PipelineEngine configured with state_schema, routers, router_mappings, etc. Same fluent API; one executor.

Engine

  • PipelineEngine accepts routers: dict[node_id, RouterFn] and router_mappings: dict[node_id, dict[label, target]]. When non-empty (or the DAG is cyclic), the engine routes through the cyclic frontier scheduler.
  • New _cyclic_next consults routers first, then falls back to edge conditions.
  • _resolve_router_decision lifts the router-return → next-step translation from the deleted state_pipeline.
  • _run_cyclic handles Pause, Send, and list[Send] returns alongside the dict/state-update path.
  • _run_sends accepts visit_counts; per-Send visit numbers are tracked per target so observability shows the right counter across fan-out.
  • PipelineEngine.to_mermaid() renders branch labels from router_mappings.
  • PipelineEngine.invoke(state, ...) shorthand mirrors the legacy StatePipeline.invoke signature so test migration stays mechanical.
  • _record_audit accepts status_override + pause_reason so a Pause-returning node is audited as "paused" rather than "success".
  • Span names use "pipeline.state.*" when state_schema is set (matches the legacy taxonomy observability dashboards key on).
  • on_node_start is now emitted by the schedulers (acyclic main loop, cyclic loop, _run_sends) — removed from _execute_node so the visit number is correct in every scheduler.
  • _load_for_resume returns list[str] (ordered) instead of set[str] so PipelineResult.completed_nodes after resume reflects the original execution order.
  • Resume seeds trace_entries with the pre-completed nodes so PipelineResult.completed_nodes (now derived from execution_trace) includes the full history.

Builder

  • PipelineBuilder.build() always returns PipelineEngine. The state-mode branch wires state_schema/routers/router_mappings into the engine.
  • New _StateStepAdapter wraps a state-mode fn into a StepExecutor so the unified engine's _execute_node can run it.
  • _coerce_state_node_fn (moved from state_pipeline.py) keeps the "async def, def, or .run(state) object" forms working.
  • _StateNodePlaceholder is gone — state-mode nodes now run real code.

Result

PipelineResult grows state-mode convenience properties:

  • state — alias of final_state
  • completed_nodes — derived from execution_trace so cyclic visits appear individually (matches StatePipelineResult semantics)
  • failed_node, error — first-failed-node convenience accessors

Deletions

  • fireflyframework_agentic/pipeline/state_pipeline.py (~750 LOC)
  • StatePipeline, StatePipelineResult, StatePipelineEventHandler, RecursionLimitError, BranchSpec, StateNodeFn (moved to builder.py as a type alias)
  • _StateNodePlaceholder (in builder)
  • tests/unit/pipeline/test_state_pipeline_deprecation.py (Layer 7's warning is gone too)
  • StatePipeline, imports and isinstance(pipeline, StatePipeline) assertions across the state-pipeline test files.

Backward compatibility

  • Pause and Send already live in engine.py since Layer 5; this PR just removes the state_pipeline re-export. Public from fireflyframework_agentic.pipeline import Pause, Send still works.
  • PipelineEventHandler (the legacy port-based protocol) is kept and still works.
  • PipelineBuilder(state=...) continues to accept all the same kwargs; the build result is just a PipelineEngine instead of a StatePipeline wrapper.

Tests

Full unit suite: 1594 passed. Migration patches across the state-pipeline tests:

  • result.state keeps working via the new PipelineResult.state property.
  • result.completed_nodes derives from execution_trace so cyclic visits are preserved.
  • pipeline.invoke(state) works via the engine's invoke shorthand.
  • pipeline.to_mermaid() works via the engine's to_mermaid method.
  • Single-node state-mode pipelines now go through the level-based scheduler (no cycles, no routers); behavior identical.

Net diff

-507 LOC (-955 deleted, +448 added). The unification leaves the codebase smaller than #232 left it, with every feature preserved.

Refs

…ayer 8)

Final layer of the unification. StatePipeline + state_pipeline.py + the
dual-mode logic in builder.py are gone; PipelineEngine is the only
executor. PipelineBuilder(state=...) now constructs a PipelineEngine
configured with state_schema, recursion_limit, audit_log, checkpointer,
event_handler, and any routers registered via .branch(...).

No deprecation cycle needed: StatePipeline never landed on main
(introduced in #232 which is the umbrella PR). All consumers move at the
same time.

Engine changes:

- PipelineEngine accepts routers: dict[str, RouterFn] and
  router_mappings: dict[str, dict[str, str]]. When non-empty (or the DAG
  is cyclic), the engine routes through the cyclic frontier scheduler.
- New _cyclic_next consults routers first, then falls back to edge
  conditions.
- _resolve_router_decision lifts the router-return → next-step
  translation from state_pipeline.
- _run_cyclic now handles Pause, Send, and list[Send] returns alongside
  the dict/state-update path.
- _run_sends accepts visit_counts; per-Send visit numbers are tracked
  per target so observability shows the right counter across fan-out.
- PipelineEngine.to_mermaid renders branch labels from router_mappings.
- PipelineEngine.invoke(state, ...) shorthand mirrors the
  StatePipeline.invoke signature so test migration stays mechanical.
- _record_audit accepts status_override + pause_reason so a Pause-returning
  node is audited as "paused" rather than "success".
- Span names use "pipeline.state.*" when state_schema is set (matches the
  legacy taxonomy observability dashboards already key on).
- on_node_start is now emitted by the schedulers (acyclic main loop,
  cyclic loop, _run_sends) — removed from _execute_node so the visit
  number is correct in every scheduler.
- _load_for_resume returns list[str] (ordered) instead of set[str] so
  PipelineResult.completed_nodes after resume reflects the original
  execution order.
- Resume seeds trace_entries with the pre_completed nodes so
  PipelineResult.completed_nodes (now derived from execution_trace)
  includes the full history.

Builder changes:

- PipelineBuilder.build() always returns PipelineEngine. The state-mode
  branch wires state_schema/routers/router_mappings into the engine.
- New _StateStepAdapter wraps a state-mode fn into a StepExecutor so the
  unified engine's _execute_node can run it.
- _coerce_state_node_fn (moved from state_pipeline.py) keeps the
  "async def, def, or .run(state) object" forms working.
- _StateNodePlaceholder is gone — state-mode nodes now run real code.

Result changes:

- PipelineResult grows state-mode convenience properties: state (alias
  of final_state), completed_nodes (derived from execution_trace so cyclic
  visits appear individually), failed_node, error.

Deletions:

- fireflyframework_agentic/pipeline/state_pipeline.py (~750 LOC)
- StatePipeline, StatePipelineResult, StatePipelineEventHandler,
  RecursionLimitError, BranchSpec, StateNodeFn (in state_pipeline)
- _StateNodePlaceholder (in builder)
- tests/unit/pipeline/test_state_pipeline_deprecation.py (Layer 7's
  warning is gone too)
- "    StatePipeline," imports and isinstance assertions across the
  state-pipeline test files.

Migration notes (internal to this PR):

- Pause and Send already live in engine.py since Layer 5; this PR just
  removes the state_pipeline re-export.
- StatePipelineEventHandler removed; existing usages either implement
  EventHandler (the unified protocol) or the legacy PipelineEventHandler.

Full unit suite: 1594 passed.

Net diff: -507 LOC (-955 deleted, +448 added).

Refs: #245
@miguelgfierro miguelgfierro merged commit 15ac615 into issue-147-pipeline-evolution May 28, 2026
@miguelgfierro miguelgfierro deleted the unify-pipeline-8-delete-state-pipeline branch May 28, 2026 14:29
miguelgfierro added a commit that referenced this pull request May 28, 2026
…-example

fix(example): software-factory pipeline imports PipelineEngine (post-#255)
ancongui pushed a commit that referenced this pull request May 31, 2026
…-state-pipeline

feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 layer 8)
ancongui pushed a commit that referenced this pull request May 31, 2026
Layer 8 (#255) deleted StatePipeline but I missed the import + return type
in examples/software_factory/pipeline.py. CI on #232 fails to collect
tests/examples/software_factory/test_pipeline.py with:
    ImportError: cannot import name 'StatePipeline' from
    'fireflyframework_agentic.pipeline'

Fix: import PipelineEngine instead and drop the now-unnecessary cast.
PipelineBuilder.build() returns PipelineEngine directly after Layer 8.

Test passes locally.
ancongui pushed a commit that referenced this pull request May 31, 2026
…-example

fix(example): software-factory pipeline imports PipelineEngine (post-#255)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant