Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/software_factory/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

"""Console progress handler.

Implements (structurally) the framework's :class:`StatePipelineEventHandler`
Implements (structurally) the framework's :class:`EventHandler`
Protocol. Prints one line per pipeline / node event so the QA loop and
checkpoint+resume flow are visible when running the example by hand.
"""
Expand Down
14 changes: 2 additions & 12 deletions fireflyframework_agentic/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,13 @@
from fireflyframework_agentic.pipeline.dag import DAG, DAGEdge, DAGNode, FailureStrategy
from fireflyframework_agentic.pipeline.engine import (
EventHandler,
Pause,
PipelineEngine,
PipelineEventHandler,
StatePipelineEventHandler,
Send,
)
from fireflyframework_agentic.pipeline.reducers import append, extend, merge_dict, replace
from fireflyframework_agentic.pipeline.result import ExecutionTraceEntry, NodeResult, PipelineResult
from fireflyframework_agentic.pipeline.state_pipeline import (
Pause,
RecursionLimitError,
Send,
StatePipeline,
StatePipelineResult,
)
from fireflyframework_agentic.pipeline.steps import (
AgentStep,
BatchLLMStep,
Expand Down Expand Up @@ -103,12 +97,8 @@
"PipelineResult",
"QueryableAuditLog",
"ReasoningStep",
"RecursionLimitError",
"RetrievalStep",
"Send",
"StatePipeline",
"StatePipelineEventHandler",
"StatePipelineResult",
"StepExecutor",
"append",
"extend",
Expand Down
174 changes: 102 additions & 72 deletions fireflyframework_agentic/pipeline/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

"""Fluent builder API for constructing pipeline DAGs.

Two modes:
Two modes, both backed by the same :class:`PipelineEngine`:

1. **Port-based** (legacy, parallel-friendly): nodes are added by string id,
data flows over edge ports, executed by :class:`PipelineEngine`. Use this
for ETL-shaped DAGs with independent parallel steps::
1. **Port-based** (parallel-friendly): nodes are added by string id, data
flows over edge ports::

pipeline = (
PipelineBuilder("idp")
Expand All @@ -29,10 +28,10 @@
)

2. **State-based**: configure ``state=SomeModel`` and nodes become
``async (state) -> dict`` functions over a typed shared state. Branching
is one ``.branch(source, router)`` call. Function references can be used
as node ids. Optional checkpointing supports resume after failure and
mid-pipeline start. Produces a :class:`StatePipeline`::
``async (state) -> dict | None | Pause | Send | list[Send]``. Branching
is one ``.branch(source, router)`` call; function references work as
node ids. Optional checkpointing supports resume after failure and
mid-pipeline start::

pipeline = (
PipelineBuilder("agent", state=AgentState, checkpointer=FileCheckpointer("./ckpt"))
Expand All @@ -46,38 +45,94 @@

from __future__ import annotations

import asyncio
import inspect
from collections.abc import Callable
from collections.abc import Awaitable, Callable
from typing import Any

from pydantic import BaseModel

from fireflyframework_agentic.exceptions import PipelineError
from fireflyframework_agentic.pipeline.audit import AuditLog
from fireflyframework_agentic.pipeline.checkpoint import Checkpointer
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.pipeline.dag import DAG, DAGEdge, DAGNode, FailureStrategy
from fireflyframework_agentic.pipeline.engine import PipelineEngine, StatePipelineEventHandler
from fireflyframework_agentic.pipeline.state_pipeline import (
BranchSpec,
from fireflyframework_agentic.pipeline.engine import (
EventHandler,
PipelineEngine,
PipelineEventHandler,
RouterFn,
Send, # noqa: F401 re-exported via pipeline/__init__.py
StateNodeFn,
StatePipeline,
coerce_state_node_fn,
)
from fireflyframework_agentic.pipeline.steps import AgentStep, CallableStep, StepExecutor

StateNodeFn = Callable[[Any], Awaitable[Any]]
"""Signature for a state-mode node: ``async (state) -> dict | None | Pause | Send | list[Send]``."""


class _StateStepAdapter:
"""Adapts a state-mode node fn into the :class:`StepExecutor` shape so it
can ride through :meth:`PipelineEngine._execute_node`.

State-mode functions take ``(state)`` and return a state update (or one
of the control sentinels :class:`Pause` / :class:`Send`). The engine
calls ``step.execute(context, inputs)``; this adapter forwards
``context.state`` to the wrapped fn and returns its value verbatim so
PipelineEngine's existing dict/Pause/Send handling fires.
"""

def __init__(self, fn: Callable[..., Any]) -> None:
self._fn = _coerce_state_node_fn(fn)

async def execute(self, context: PipelineContext, inputs: dict[str, Any]) -> Any: # noqa: ARG002
return await self._fn(context.state)


def _coerce_state_node_fn(fn: Callable[..., Any]) -> StateNodeFn:
"""Turn user-supplied state-mode callables into the standard ``async (state) -> Any`` shape.

Accepts:
* ``async def f(state)`` — used as-is.
* ``def f(state)`` — wrapped to run on a worker thread.
* Object with ``async run(state)`` (e.g. a FireflyAgent) — adapter calls ``.run(state)``.
"""
if inspect.iscoroutinefunction(fn):
return fn # type: ignore[return-value]

run = getattr(fn, "run", None)
if not callable(fn) and run is not None and callable(run):

async def _agent_wrap(state: Any) -> Any:
if inspect.iscoroutinefunction(run):
return await run(state)
return await asyncio.get_running_loop().run_in_executor(None, run, state)

return _agent_wrap

if callable(fn):

async def _async_wrap(state: Any) -> Any:
return await asyncio.get_running_loop().run_in_executor(None, fn, state)

return _async_wrap

raise PipelineError(f"Cannot adapt {fn!r} as a state node function")


class PipelineBuilder:
"""Fluent builder for pipelines.

Parameters:
name: Human-readable name for the pipeline.
state: Optional Pydantic model class for typed shared state.
When set, the builder produces a :class:`StatePipeline` and nodes
are expected to be ``async (state) -> dict | None``.
checkpointer: Optional :class:`Checkpointer` for state-based pipelines.
Ignored when ``state`` is not set.
When set, the builder produces a state-aware
:class:`PipelineEngine` and nodes are expected to be
``async (state) -> dict | None | Pause | Send | list[Send]``.
checkpointer: Optional :class:`Checkpointer` for resume.
recursion_limit: Max visits per node in cycle-aware runs.
event_handler: Optional :class:`EventHandler` (or legacy
:class:`PipelineEventHandler`).
audit_log: Optional :class:`AuditLog`.
"""

def __init__(
Expand All @@ -87,11 +142,10 @@ def __init__(
state: type[BaseModel] | None = None,
checkpointer: Checkpointer | None = None,
recursion_limit: int = 25,
event_handler: StatePipelineEventHandler | None = None,
event_handler: EventHandler | PipelineEventHandler | None = None,
audit_log: AuditLog | None = None,
) -> None:
# State pipelines may use cyclic graphs (ReAct loops, retry-with-critique).
# The legacy port-based path keeps acyclicity as an invariant.
# State-aware pipelines may have cycles (ReAct loops, retry-with-critique).
self._dag = DAG(name=name, allow_cycles=state is not None)
self._name = name
self._state_schema = state
Expand All @@ -101,9 +155,9 @@ def __init__(
self._audit_log = audit_log
self._pending_nodes: list[DAGNode] = []
self._pending_edges: list[DAGEdge] = []
# State-based mode bookkeeping. Keyed by node id.
self._state_node_fns: dict[str, StateNodeFn] = {}
self._branches: dict[str, BranchSpec] = {}
# Routers + mappings drive the cyclic scheduler's next-step pick.
self._routers: dict[str, RouterFn] = {}
self._router_mappings: dict[str, dict[str, str]] = {}

def add_node(
self,
Expand All @@ -122,23 +176,21 @@ def add_node(
* ``add_node(fn)`` — state-based mode. ``fn`` is a callable; the node
id is taken from ``fn.__name__``. Requires the builder was constructed
with ``state=...``.
* ``add_node(node_id, step)`` — legacy port-based mode. ``step`` is a
* ``add_node(node_id, step)`` — port-based mode. ``step`` is a
:class:`StepExecutor`, an agent-like, or an async callable.
"""
if step is None and callable(node_id_or_fn) and not isinstance(node_id_or_fn, str):
# State-based: derive id from function name.
if self._state_schema is None:
raise PipelineError(
"Function-reference add_node(fn) requires PipelineBuilder(state=...). "
"Use add_node('id', step) for port-based pipelines."
)
fn = node_id_or_fn
node_id = getattr(fn, "__name__", None) or repr(fn)
self._state_node_fns[node_id] = coerce_state_node_fn(fn)
self._pending_nodes.append(
DAGNode(
node_id=node_id,
step=_StateNodePlaceholder(), # never executed; engine path is unused for state pipelines
step=_StateStepAdapter(fn),
condition=condition,
retry_max=retry_max,
timeout_seconds=timeout_seconds,
Expand All @@ -152,19 +204,16 @@ def add_node(
node_id = node_id_or_fn

if self._state_schema is not None and step is not None:
# State-based pipeline: accept a callable, or an agent-like object
# exposing async ``run(state)``. ``coerce_state_node_fn`` handles both.
run_method = getattr(step, "run", None)
if not callable(step) and not callable(run_method):
raise PipelineError(
f"State pipeline node '{node_id}' must be a callable or expose async run(state); "
f"got {type(step).__name__}"
)
self._state_node_fns[node_id] = coerce_state_node_fn(step)
self._pending_nodes.append(
DAGNode(
node_id=node_id,
step=_StateNodePlaceholder(),
step=_StateStepAdapter(step),
condition=condition,
retry_max=retry_max,
timeout_seconds=timeout_seconds,
Expand Down Expand Up @@ -225,61 +274,50 @@ def branch(
router: RouterFn,
mapping: dict[str, str | Callable[..., Any]] | None = None,
) -> PipelineBuilder:
"""Register a router on ``source``.
"""Register a runtime router on ``source``.

``router`` is a synchronous ``(state) -> str`` callable. Behaviour:
``router`` is a synchronous ``(state) -> str | Send | list[Send]``
callable. Behaviour:

* If ``mapping`` is None, the router must return the **id of an
existing node** that will run next.
* If ``mapping`` is provided, the router returns an abstract label
that is looked up in ``mapping`` to find the target node id.

State-based pipelines only.
State-aware pipelines only.
"""
if self._state_schema is None:
raise PipelineError(".branch(...) requires PipelineBuilder(state=...)")
source_id = _id(source)
resolved_mapping: dict[str, str] | None = None
if mapping is not None:
resolved_mapping = {label: _id(target) for label, target in mapping.items()}
# Materialize each label's edge into the DAG so topology is inspectable.
self._router_mappings[source_id] = resolved_mapping
# Materialize each label's edge so topology stays inspectable.
for target_id in resolved_mapping.values():
self._pending_edges.append(DAGEdge(source=source_id, target=target_id))
else:
# No mapping: we don't know targets at build time; edges will
# be missing from the DAG. That's fine for the StatePipeline
# executor (it consults the router), but visualisation will be
# incomplete. Materialize edges lazily when the router fires.
pass
self._branches[source_id] = BranchSpec(source=source_id, router=router, mapping=resolved_mapping)
self._routers[source_id] = router
return self

def build(self) -> PipelineEngine | StatePipeline:
"""Build the DAG and return either a :class:`PipelineEngine`
(legacy port-based) or :class:`StatePipeline` (when ``state=`` is set).
"""
def build(self) -> PipelineEngine:
"""Build the DAG and return a :class:`PipelineEngine`."""
for node in self._pending_nodes:
self._dag.add_node(node)
for edge in self._pending_edges:
self._dag.add_edge(edge)

if self._state_schema is not None:
return StatePipeline(
name=self._name,
dag=self._dag,
state_schema=self._state_schema,
node_fns=self._state_node_fns,
branches=self._branches,
checkpointer=self._checkpointer,
recursion_limit=self._recursion_limit,
event_handler=self._event_handler,
audit_log=self._audit_log,
)

return PipelineEngine(self._dag)
return PipelineEngine(
self._dag,
event_handler=self._event_handler,
checkpointer=self._checkpointer,
audit_log=self._audit_log,
state_schema=self._state_schema,
recursion_limit=self._recursion_limit,
routers=self._routers,
router_mappings=self._router_mappings,
)

def build_dag(self) -> DAG:
"""Build and return just the :class:`DAG` (for inspection or custom engines)."""
"""Build and return just the :class:`DAG` (for inspection)."""
for node in self._pending_nodes:
self._dag.add_node(node)
for edge in self._pending_edges:
Expand Down Expand Up @@ -309,11 +347,3 @@ def _id(ref: str | Callable[..., Any]) -> str:
if not name:
raise PipelineError(f"Cannot derive node id from {ref!r}")
return name


class _StateNodePlaceholder:
"""Sentinel step kept in the DAG so topology is intact. Never executed —
state pipelines bypass :class:`PipelineEngine` entirely."""

async def execute(self, *_args: Any, **_kwargs: Any) -> Any:
raise PipelineError("_StateNodePlaceholder.execute called — state pipelines should not use PipelineEngine.")
2 changes: 1 addition & 1 deletion fireflyframework_agentic/pipeline/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class CheckpointRecord(BaseModel):
"""One saved checkpoint.

``paused`` and ``pause_reason`` are set when a node returns
:class:`fireflyframework_agentic.pipeline.state_pipeline.Pause`. Default
:class:`fireflyframework_agentic.pipeline.engine.Pause`. Default
to ``False`` / ``None`` so existing records from earlier phases load
cleanly under the new schema.
"""
Expand Down
Loading