From 56d52ecdec217d711b06e185909cd5aeaa8c687c Mon Sep 17 00:00:00 2001 From: miguelgfierro Date: Thu, 28 May 2026 15:24:41 +0200 Subject: [PATCH] feat(pipeline): deprecate StatePipeline (#245 layer 7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Seventh layer of the unification. StatePipeline now emits a DeprecationWarning on construction pointing users at PipelineEngine configured with state_schema= as the unified replacement. PipelineEngine has feature parity with StatePipeline after layers 1-6: state overlay, reducers, Pause, Send, cycles, recursion_limit, checkpointing, audit log, resume, start_at — plus parallel state-aware execution that StatePipeline could not provide. Changes: - StatePipeline.__init__ raises DeprecationWarning with migration text. - import warnings added at module level. - Class docstring includes a deprecation notice and migration example. StatePipeline still works — all 37 existing state-pipeline tests pass unchanged (they emit the new DeprecationWarning but continue to operate). Tests: 1 new in tests/unit/pipeline/test_state_pipeline_deprecation.py verifying the warning is raised and references PipelineEngine + #245. Full suite: 1595 passed. Layer 8 (full deletion of state_pipeline.py and dual-mode logic in builder.py) is tracked as follow-up — it requires migrating the 37 state-pipeline tests to the unified PipelineEngine surface and translating PipelineBuilder.branch(source, router, mapping) into conditional DAGEdges, which is more invasive than this deprecation layer warrants. Refs: #245 --- .../pipeline/state_pipeline.py | 32 +++++++++++++++++++ .../test_state_pipeline_deprecation.py | 32 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 tests/unit/pipeline/test_state_pipeline_deprecation.py diff --git a/fireflyframework_agentic/pipeline/state_pipeline.py b/fireflyframework_agentic/pipeline/state_pipeline.py index aae0c0c4..07a3b1ea 100644 --- a/fireflyframework_agentic/pipeline/state_pipeline.py +++ b/fireflyframework_agentic/pipeline/state_pipeline.py @@ -29,6 +29,7 @@ import logging import time import uuid +import warnings from collections.abc import Awaitable, Callable from dataclasses import dataclass from datetime import UTC, datetime @@ -97,6 +98,30 @@ class StatePipelineResult: class StatePipeline: """Compiled state-based pipeline. Returned by ``PipelineBuilder.build()`` when a ``state=`` schema is configured. + + .. deprecated:: + :class:`StatePipeline` is being subsumed by + :class:`fireflyframework_agentic.pipeline.engine.PipelineEngine` + configured with ``state_schema=``. The unified engine supports + the same features (state overlay, reducers, Pause, Send, cycles, + recursion_limit, checkpointing, audit, resume, start_at) and adds + true parallelism for state-aware pipelines via the topological + scheduler. New code should prefer ``PipelineEngine`` directly: + + .. code-block:: python + + engine = PipelineEngine( + dag, + state_schema=MyState, + checkpointer=cp, + audit_log=al, + recursion_limit=10, + ) + result = await engine.run(state=MyState(...)) + + See issue #245 for the full migration plan. The next layer of + unification removes :class:`StatePipeline` after a deprecation + cycle. """ def __init__( @@ -112,6 +137,13 @@ def __init__( event_handler: StatePipelineEventHandler | None = None, audit_log: AuditLog | None = None, ) -> None: + warnings.warn( + "StatePipeline is deprecated; use PipelineEngine(state_schema=...) " + "for the unified API. The unified engine supports the same features " + "and adds parallel state-aware execution. See issue #245.", + DeprecationWarning, + stacklevel=2, + ) self._name = name self._dag = dag self._state_schema = state_schema diff --git a/tests/unit/pipeline/test_state_pipeline_deprecation.py b/tests/unit/pipeline/test_state_pipeline_deprecation.py new file mode 100644 index 00000000..ec65da49 --- /dev/null +++ b/tests/unit/pipeline/test_state_pipeline_deprecation.py @@ -0,0 +1,32 @@ +"""Layer 7 of the unification (#245): StatePipeline deprecation. + +Constructing :class:`StatePipeline` now emits a :class:`DeprecationWarning` +pointing at :class:`PipelineEngine` configured with ``state_schema=`` as +the supported replacement. +""" + +from __future__ import annotations + +import warnings + +from pydantic import BaseModel + +from fireflyframework_agentic.pipeline.builder import PipelineBuilder + + +class _S(BaseModel): + x: int = 0 + + +async def _noop(state): + return None + + +def test_state_pipeline_emits_deprecation_warning(): + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + PipelineBuilder("p", state=_S).add_node(_noop).build() + deprec = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert deprec, "expected a DeprecationWarning when constructing StatePipeline" + assert "PipelineEngine" in str(deprec[0].message) + assert "#245" in str(deprec[0].message)