Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions fireflyframework_agentic/pipeline/state_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import logging
import time
import uuid
import warnings
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from datetime import UTC, datetime
Expand Down Expand Up @@ -97,6 +98,30 @@ class StatePipelineResult:
class StatePipeline:
"""Compiled state-based pipeline. Returned by ``PipelineBuilder.build()``
when a ``state=`` schema is configured.

.. deprecated::
:class:`StatePipeline` is being subsumed by
:class:`fireflyframework_agentic.pipeline.engine.PipelineEngine`
configured with ``state_schema=``. The unified engine supports
the same features (state overlay, reducers, Pause, Send, cycles,
recursion_limit, checkpointing, audit, resume, start_at) and adds
true parallelism for state-aware pipelines via the topological
scheduler. New code should prefer ``PipelineEngine`` directly:

.. code-block:: python

engine = PipelineEngine(
dag,
state_schema=MyState,
checkpointer=cp,
audit_log=al,
recursion_limit=10,
)
result = await engine.run(state=MyState(...))

See issue #245 for the full migration plan. The next layer of
unification removes :class:`StatePipeline` after a deprecation
cycle.
"""

def __init__(
Expand All @@ -112,6 +137,13 @@ def __init__(
event_handler: StatePipelineEventHandler | None = None,
audit_log: AuditLog | None = None,
) -> None:
warnings.warn(
"StatePipeline is deprecated; use PipelineEngine(state_schema=...) "
"for the unified API. The unified engine supports the same features "
"and adds parallel state-aware execution. See issue #245.",
DeprecationWarning,
stacklevel=2,
)
self._name = name
self._dag = dag
self._state_schema = state_schema
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/pipeline/test_state_pipeline_deprecation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Layer 7 of the unification (#245): StatePipeline deprecation.

Constructing :class:`StatePipeline` now emits a :class:`DeprecationWarning`
pointing at :class:`PipelineEngine` configured with ``state_schema=`` as
the supported replacement.
"""

from __future__ import annotations

import warnings

from pydantic import BaseModel

from fireflyframework_agentic.pipeline.builder import PipelineBuilder


class _S(BaseModel):
x: int = 0


async def _noop(state):
return None


def test_state_pipeline_emits_deprecation_warning():
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
PipelineBuilder("p", state=_S).add_node(_noop).build()
deprec = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert deprec, "expected a DeprecationWarning when constructing StatePipeline"
assert "PipelineEngine" in str(deprec[0].message)
assert "#245" in str(deprec[0].message)