From edaf9f7bcdd099330c01f2fb2f1d52b7b2a95639 Mon Sep 17 00:00:00 2001 From: miguelgfierro Date: Thu, 28 May 2026 15:11:03 +0200 Subject: [PATCH] feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5) Fifth layer of the unification. PipelineEngine now recognizes the same control sentinels that StatePipeline uses today: - A node returning Pause(reason=...) halts the pipeline cleanly. The run resumes with engine.run(run_id=..., approve_pause=True). - A node returning Send or list[Send] triggers parallel fan-out where each Send's target runs concurrently with the payload merged into a per-worker state copy. Reducers merge worker outputs back into shared state. Refactor (no behavior change): - Pause and Send dataclasses moved from state_pipeline.py to engine.py so PipelineEngine can recognize them without a circular import. state_pipeline.py now imports them from engine. Public re-exports from fireflyframework_agentic.pipeline are unchanged. Engine changes: - run() accepts approve_pause: bool = False kwarg. Resuming a paused checkpoint without approve_pause=True raises PipelineError with the pause reason. - _save_checkpoint accepts paused=, pause_reason= kwargs, persisted on CheckpointRecord (the fields landed in Layer 1A). - _load_for_resume enforces the approve_pause gate. - Main loop branches on Pause: emits on_node_pause event, checkpoints with paused=True, sets pending_pause and aborts. - Main loop branches on Send: dispatches workers via the new _run_sends helper, marks workers as completed so the scheduler does not re-run them. - _run_sends: validates target IDs up front; per-worker PipelineContext with its own state copy (payload applied via reducers); asyncio.gather across workers; results merge back into shared state via reducers; any worker failure aborts the pipeline. - _is_send_payload helper at module level. Result changes: - PipelineResult gains paused: bool, paused_node: str | None, pause_reason: str | None. Mirrors StatePipelineResult. Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_pause_send.py covering: - Pause halts the pipeline and records paused state in result + checkpoint - Resume without approve_pause=True raises - Resume with approve_pause=True continues from the paused node's successor - list[Send] dispatches workers concurrently with per-worker state copies - Single Send is treated as list[Send] of one - Unknown Send target marks the pipeline as failed - Pause and Send remain re-exported from the pipeline package Full suite: 1587 passed. Refs: #245 --- fireflyframework_agentic/pipeline/engine.py | 207 +++++++++++++++++- fireflyframework_agentic/pipeline/result.py | 5 + .../pipeline/state_pipeline.py | 43 +--- .../test_pipeline_engine_pause_send.py | 203 +++++++++++++++++ 4 files changed, 413 insertions(+), 45 deletions(-) create mode 100644 tests/unit/pipeline/test_pipeline_engine_pause_send.py diff --git a/fireflyframework_agentic/pipeline/engine.py b/fireflyframework_agentic/pipeline/engine.py index 556fc071..cebee341 100644 --- a/fireflyframework_agentic/pipeline/engine.py +++ b/fireflyframework_agentic/pipeline/engine.py @@ -23,6 +23,7 @@ import random import time import uuid +from dataclasses import dataclass from datetime import UTC, datetime from typing import Any, Protocol, runtime_checkable @@ -130,6 +131,59 @@ async def on_pipeline_complete( ) -> None: ... +@dataclass +class Pause: + """Human-in-the-loop sentinel returned by a node to halt the pipeline. + + A node returns ``Pause(reason="...")`` when external approval is required + before the pipeline may continue. The engine then: + + 1. Writes a checkpoint with ``paused=True`` and the reason set. + 2. Emits ``on_node_pause`` on the configured event handler. + 3. Returns a :class:`PipelineResult` with ``paused=True`` and + ``success=False`` — the run is not finished, but it did not fail + either. + + Resume after approval:: + + result = await engine.run(run_id=paused_run_id, approve_pause=True) + + The successor of the paused node runs next — the pause node itself is + not re-executed. Without ``approve_pause=True``, resuming a paused run + raises :class:`PipelineError`. + """ + + reason: str + + +@dataclass +class Send: + """Runtime fan-out dispatch: run ``target`` with ``payload`` merged into state. + + A node may return a single ``Send`` or ``list[Send]`` to dispatch one or + more targets concurrently. Each Send's payload is applied to a *copy* of + the current state before its target runs; the target's return is then + merged back into shared state via reducers. + + Replaces the legacy ``FanOutStep`` pattern with a first-class primitive. + """ + + target: str + payload: dict[str, Any] + + +def _is_send_payload(value: Any) -> bool: + """True when a node's return value is a single :class:`Send` or a + non-empty ``list[Send]``. Drives the runtime fan-out branch in + :meth:`PipelineEngine.run`. + """ + if isinstance(value, Send): + return True + if isinstance(value, list) and value and all(isinstance(s, Send) for s in value): + return True + return False + + def _serialize_value(value: Any) -> Any: """Best-effort conversion of arbitrary values into JSON-safe form. @@ -245,6 +299,7 @@ async def run( inputs: Any = None, state: BaseModel | None = None, run_id: str | None = None, + approve_pause: bool = False, ) -> PipelineResult: """Execute the pipeline. @@ -265,7 +320,7 @@ async def run( """ if run_id is not None and context is None and inputs is None and state is None: resume_run_id: str = run_id - context, pre_completed, sequence_start = self._load_for_resume(resume_run_id) + context, pre_completed, sequence_start = self._load_for_resume(resume_run_id, approve_pause=approve_pause) all_results: dict[str, NodeResult] = { nid: nr for nid in pre_completed @@ -338,6 +393,7 @@ async def run( inputs_by_node: dict[str, dict[str, Any]] = {} sequence = sequence_start abort = False + pending_pause: tuple[str, str] | None = None # (node_id, reason) if Pause def _edge_alive(edge: Any) -> bool: """An edge is alive if it has no condition, or its condition returns True. @@ -465,6 +521,48 @@ async def _record_skip(nid: str) -> None: inputs_snapshot=inputs_by_node.get(node_id, {}), trace_entries=trace_entries, ) + # HITL: a node returned Pause(reason=...). Halt cleanly, save + # a paused checkpoint, and surface the pause in the result. + if nr.success and isinstance(nr.output, Pause): + pause_reason = nr.output.reason + await self._dispatch( + "on_node_pause", + pipeline_name=self._dag.name, + run_id=run_id, + node_id=node_id, + reason=pause_reason, + ) + self._save_checkpoint( + run_id=run_id, + node_id=node_id, + sequence=sequence, + context=context, + all_results=all_results, + paused=True, + pause_reason=pause_reason, + ) + pending_pause = (node_id, pause_reason) + abort = True + continue + + # Runtime fan-out: a node returned Send / list[Send]. + if nr.success and _is_send_payload(nr.output): + sends = nr.output if isinstance(nr.output, list) else [nr.output] + ok = await self._run_sends( + sends=sends, + context=context, + run_id=run_id, + all_results=all_results, + trace_entries=trace_entries, + completed=completed, + pending=pending, + ) + if not ok: + abort = True + # Successors of the worker targets are picked up by the + # normal readiness sweep on the next loop iteration. + continue + if nr.success and not nr.skipped: # State overlay: a dict return from the node is a state # update; non-dict returns flow through edges as ports. @@ -529,6 +627,12 @@ async def _record_skip(nid: str) -> None: if _pipeline_span is not None: _pipeline_span.end() + paused_node = pending_pause[0] if pending_pause else None + pause_reason_final = pending_pause[1] if pending_pause else None + if pending_pause is not None: + # A paused run is not "successful" — it didn't finish. + success = False + return PipelineResult( pipeline_name=self._dag.name, outputs=all_results, @@ -539,8 +643,91 @@ async def _record_skip(nid: str) -> None: usage=usage_summary, run_id=run_id, final_state=context.state, + paused=pending_pause is not None, + paused_node=paused_node, + pause_reason=pause_reason_final, ) + async def _run_sends( + self, + *, + sends: list[Send], + context: PipelineContext, + run_id: str, + all_results: dict[str, NodeResult], + trace_entries: list[ExecutionTraceEntry], + completed: set[str], + pending: set[str], + ) -> bool: + """Dispatch a list of :class:`Send` workers concurrently. + + Each Send's payload is applied to a copy of the current state before + its target runs. Results merge back into shared state via reducers. + Targets are added to ``completed`` and removed from ``pending`` so + the main scheduler does not re-execute them. + + Returns ``True`` on success, ``False`` if any worker failed (the + caller treats this as an abort signal). + """ + # Validate targets up front so unknown ones fail loud, not after gather(). + for send in sends: + if send.target not in self._dag.nodes: + nr = NodeResult( + node_id=send.target, + success=False, + error=f"Send dispatches to unknown target '{send.target}'", + ) + all_results[send.target] = nr + return False + + async def _run_one(send: Send) -> tuple[Send, NodeResult]: + await self._dispatch( + "on_node_start", + pipeline_name=self._dag.name, + run_id=run_id, + node_id=send.target, + visit=1, + ) + # Per-worker context: own state copy with payload applied so + # workers don't race on the shared state object. + worker_context = PipelineContext(inputs=context.inputs) + if self._state_schema is not None and context.state is not None: + worker_context.state = apply_update(context.state, send.payload, self._reducers) + for nid, prev in context.results.items(): + worker_context.set_node_result(nid, prev) + nr = await self._execute_node( + send.target, + worker_context, + trace_entries, + None, + inputs={"input": send.payload}, + run_id=run_id, + ) + return send, nr + + try: + results = await asyncio.gather(*(_run_one(s) for s in sends)) + except Exception as exc: + logger.exception("Fan-out worker crashed") + for send in sends: + if send.target not in all_results: + all_results[send.target] = NodeResult(node_id=send.target, success=False, error=str(exc)) + return False + + all_ok = True + for send, nr in results: + all_results[send.target] = nr + context.set_node_result(send.target, nr) + completed.add(send.target) + pending.discard(send.target) + await self._emit_node_result(nr, run_id) + if not nr.success: + all_ok = False + continue + if self._state_schema is not None and context.state is not None and isinstance(nr.output, dict): + context.state = apply_update(context.state, nr.output, self._reducers) + return all_ok + async def _run_cyclic( self, *, @@ -858,13 +1045,23 @@ async def _emit_node_result(self, nr: NodeResult, run_id: str) -> None: else: await self._dispatch("on_node_error", error=nr.error or "unknown", **common) - def _load_for_resume(self, run_id: str) -> tuple[PipelineContext, set[str], int]: - """Rebuild context + completed-set from the latest checkpoint.""" + def _load_for_resume(self, run_id: str, *, approve_pause: bool = False) -> tuple[PipelineContext, set[str], int]: + """Rebuild context + completed-set from the latest checkpoint. + + Resuming a paused run (checkpoint.paused=True) requires + ``approve_pause=True``; otherwise a :class:`PipelineError` halts the + attempt and surfaces the pause reason. + """ if self._checkpointer is None: raise PipelineError("Cannot resume: pipeline has no checkpointer configured") record = self._checkpointer.load_latest(self._dag.name, run_id) if record is None: raise PipelineError(f"No checkpoint found for run_id='{run_id}'") + if record.paused and not approve_pause: + raise PipelineError( + f"Run '{run_id}' is paused at node '{record.node_id}' " + f"(reason: {record.pause_reason!r}). Pass approve_pause=True to resume." + ) context = PipelineContext(inputs=record.state.get("inputs")) for nid, nr_dict in record.state.get("results", {}).items(): try: @@ -888,6 +1085,8 @@ def _save_checkpoint( sequence: int, context: PipelineContext, all_results: dict[str, NodeResult], + paused: bool = False, + pause_reason: str | None = None, ) -> None: """Persist state after a successful node. No-op if no checkpointer. @@ -911,6 +1110,8 @@ def _save_checkpoint( sequence=sequence, state=state, completed_nodes=completed_successful, + paused=paused, + pause_reason=pause_reason, ) ) except Exception: diff --git a/fireflyframework_agentic/pipeline/result.py b/fireflyframework_agentic/pipeline/result.py index 1b3e26ee..1f6bc260 100644 --- a/fireflyframework_agentic/pipeline/result.py +++ b/fireflyframework_agentic/pipeline/result.py @@ -82,6 +82,11 @@ class PipelineResult(BaseModel): # Final shared state for pipelines configured with state_schema. None # when the engine had no state overlay. final_state: Any = None + # HITL: a node returned :class:`Pause` and the run halted cleanly. + # Resume via ``engine.run(run_id=..., approve_pause=True)``. + paused: bool = False + paused_node: str | None = None + pause_reason: str | None = None @property def failed_nodes(self) -> list[str]: diff --git a/fireflyframework_agentic/pipeline/state_pipeline.py b/fireflyframework_agentic/pipeline/state_pipeline.py index c8c9247b..aae0c0c4 100644 --- a/fireflyframework_agentic/pipeline/state_pipeline.py +++ b/fireflyframework_agentic/pipeline/state_pipeline.py @@ -40,7 +40,7 @@ from fireflyframework_agentic.pipeline.audit import AuditEntry, AuditLog, AuditStatus from fireflyframework_agentic.pipeline.checkpoint import Checkpointer, CheckpointRecord from fireflyframework_agentic.pipeline.dag import DAG, _mermaid_id -from fireflyframework_agentic.pipeline.engine import start_otel_span +from fireflyframework_agentic.pipeline.engine import Pause, Send, start_otel_span from fireflyframework_agentic.pipeline.reducers import apply_update, discover_reducers if TYPE_CHECKING: @@ -53,51 +53,10 @@ RouterFn = Callable[[Any], "str | Send | list[Send]"] -@dataclass -class Send: - """Runtime fan-out dispatch: run ``target`` with ``payload`` merged into state. - - Routers can return a single ``Send`` or a list of ``Send`` to dispatch multiple - target invocations concurrently. Each Send's payload is applied to a *copy* - of the current state before its target runs; the target's return is then - merged back into shared state via reducers. - - Replaces the legacy ``FanOutStep`` pattern with a first-class primitive. - """ - - target: str - payload: dict[str, Any] - - class RecursionLimitError(Exception): """Raised when a node is visited more times than ``recursion_limit`` permits.""" -@dataclass -class Pause: - """Human-in-the-loop sentinel returned by a node to halt the pipeline. - - A node returns ``Pause(reason="...")`` when external approval (a human, - another system, a wall-clock event) is required before the pipeline may - continue. The pipeline then: - - 1. Writes a checkpoint with ``paused=True`` and the reason set. - 2. Emits ``on_node_pause`` on the configured event handler. - 3. Returns a :class:`StatePipelineResult` with ``paused=True`` and - ``success=False`` — the run is not finished, but it did not fail either. - - To resume after approval:: - - result = await pipeline.invoke(run_id=paused_run_id, approve_pause=True) - - Without ``approve_pause=True``, resuming a paused run raises - :class:`PipelineError`. The successor of the paused node runs next — - the pause node itself is not re-executed. - """ - - reason: str - - @dataclass class BranchSpec: """Internal: registered branch from one source node.""" diff --git a/tests/unit/pipeline/test_pipeline_engine_pause_send.py b/tests/unit/pipeline/test_pipeline_engine_pause_send.py new file mode 100644 index 00000000..f4526e15 --- /dev/null +++ b/tests/unit/pipeline/test_pipeline_engine_pause_send.py @@ -0,0 +1,203 @@ +"""Layer 5 of the unification (#245): Pause and Send in PipelineEngine. + +PipelineEngine recognizes the same control sentinels that StatePipeline +uses today: + +- A node returning :class:`Pause` halts the pipeline cleanly; the run + resumes with ``engine.run(run_id=..., approve_pause=True)``. +- A node returning :class:`Send` or ``list[Send]`` triggers a parallel + fan-out where each Send's target runs concurrently with the supplied + payload merged into a per-worker state copy. Reducers merge worker + outputs back into shared state. + +Both sentinels work in the acyclic and cyclic schedulers. +""" + +from __future__ import annotations + +from typing import Annotated + +import pytest +from pydantic import BaseModel + +from fireflyframework_agentic.exceptions import PipelineError +from fireflyframework_agentic.pipeline.checkpoint import FileCheckpointer +from fireflyframework_agentic.pipeline.dag import DAG, DAGEdge, DAGNode + +# Pause and Send live in pipeline.engine now (moved from state_pipeline in +# this layer); the public re-export from pipeline/__init__.py is unchanged. +from fireflyframework_agentic.pipeline.engine import Pause, PipelineEngine, Send +from fireflyframework_agentic.pipeline.reducers import extend + +# ---- shared state --------------------------------------------------------- + + +class _LoopState(BaseModel): + items: Annotated[list[str], extend] = [] + approved: bool = False + deployed_to: str = "" + + +# ---- Pause ---------------------------------------------------------------- + + +def _step_pause(reason: str = "human gate"): + class _Step: + async def execute(self, ctx, inputs): + return Pause(reason=reason) + + return _Step() + + +def _step_record(label: str): + class _Step: + async def execute(self, ctx, inputs): + return {"items": [label]} + + return _Step() + + +async def test_pause_halts_pipeline_and_records_state(tmp_path): + """A node returning Pause halts: result.paused=True, success=False, checkpoint with paused=True.""" + build = _step_record("build") + gate = _step_pause("awaiting approval") + deploy = _step_record("deploy") + dag = DAG("hitl") + dag.add_node(DAGNode(node_id="build", step=build)) + dag.add_node(DAGNode(node_id="gate", step=gate)) + dag.add_node(DAGNode(node_id="deploy", step=deploy)) + dag.add_edge(DAGEdge(source="build", target="gate")) + dag.add_edge(DAGEdge(source="gate", target="deploy")) + cp = FileCheckpointer(tmp_path) + engine = PipelineEngine(dag, state_schema=_LoopState, checkpointer=cp) + result = await engine.run(inputs="") + assert result.paused is True + assert result.paused_node == "gate" + assert result.pause_reason == "awaiting approval" + assert not result.success + # State so far: only 'build' contributed. + assert result.final_state.items == ["build"] + # Checkpoint reflects the paused state. + record = cp.load_latest("hitl", result.run_id) + assert record is not None + assert record.paused is True + assert record.pause_reason == "awaiting approval" + + +async def test_resume_paused_run_requires_approve_pause(tmp_path): + build = _step_record("build") + gate = _step_pause("awaiting approval") + dag = DAG("needs-approve") + dag.add_node(DAGNode(node_id="build", step=build)) + dag.add_node(DAGNode(node_id="gate", step=gate)) + dag.add_edge(DAGEdge(source="build", target="gate")) + cp = FileCheckpointer(tmp_path) + engine = PipelineEngine(dag, state_schema=_LoopState, checkpointer=cp) + paused = await engine.run(inputs="") + assert paused.paused is True + # Without approve_pause: error. + with pytest.raises(PipelineError, match="paused"): + await engine.run(run_id=paused.run_id) + + +async def test_resume_with_approve_pause_continues_from_successor(tmp_path): + build = _step_record("build") + gate = _step_pause("awaiting approval") + deploy = _step_record("deploy") + dag = DAG("approved") + dag.add_node(DAGNode(node_id="build", step=build)) + dag.add_node(DAGNode(node_id="gate", step=gate)) + dag.add_node(DAGNode(node_id="deploy", step=deploy)) + dag.add_edge(DAGEdge(source="build", target="gate")) + dag.add_edge(DAGEdge(source="gate", target="deploy")) + cp = FileCheckpointer(tmp_path) + engine = PipelineEngine(dag, state_schema=_LoopState, checkpointer=cp) + paused = await engine.run(inputs="") + assert paused.paused is True + resumed = await engine.run(run_id=paused.run_id, approve_pause=True) + assert resumed.success + # 'gate' is NOT re-executed; only 'deploy' adds to items. + assert resumed.final_state.items == ["build", "deploy"] + + +# ---- Send ------------------------------------------------------------------ + + +def _step_emit_sends(targets: list[str]): + class _Step: + async def execute(self, ctx, inputs): + return [Send(target=t, payload={"items": [f"sent-{t}"]}) for t in targets] + + return _Step() + + +def _step_consume_payload(suffix: str): + """A worker that turns its inbound items into a state update.""" + + class _Step: + async def execute(self, ctx, inputs): + seen = list(ctx.state.items) + return {"items": [f"{s}+{suffix}" for s in seen]} + + return _Step() + + +async def test_send_dispatches_workers_concurrently(): + """One node returns list[Send]; targets run concurrently and their outputs merge.""" + planner = _step_emit_sends(["a", "b"]) + worker_a = _step_consume_payload("A") + worker_b = _step_consume_payload("B") + dag = DAG("fanout") + dag.add_node(DAGNode(node_id="planner", step=planner)) + dag.add_node(DAGNode(node_id="a", step=worker_a)) + dag.add_node(DAGNode(node_id="b", step=worker_b)) + dag.add_edge(DAGEdge(source="planner", target="a")) + dag.add_edge(DAGEdge(source="planner", target="b")) + engine = PipelineEngine(dag, state_schema=_LoopState) + result = await engine.run(inputs="") + assert result.success + # Each worker sees its own payload (a sees "sent-a", b sees "sent-b"). + assert sorted(result.final_state.items) == sorted(["sent-a+A", "sent-b+B"]) + + +async def test_single_send_is_treated_as_list_of_one(): + planner = _step_emit_sends(["a"]) + # Override to emit a single Send (not a list). + + class _Solo: + async def execute(self, ctx, inputs): + return Send(target="a", payload={"items": ["just-a"]}) + + worker_a = _step_consume_payload("X") + dag = DAG("solo") + dag.add_node(DAGNode(node_id="planner", step=_Solo())) + dag.add_node(DAGNode(node_id="a", step=worker_a)) + dag.add_edge(DAGEdge(source="planner", target="a")) + engine = PipelineEngine(dag, state_schema=_LoopState) + result = await engine.run(inputs="") + assert result.success + assert result.final_state.items == ["just-a+X"] + + +async def test_send_to_unknown_target_raises(): + class _Bad: + async def execute(self, ctx, inputs): + return [Send(target="ghost", payload={})] + + dag = DAG("unknown-send") + dag.add_node(DAGNode(node_id="planner", step=_Bad())) + engine = PipelineEngine(dag, state_schema=_LoopState) + result = await engine.run(inputs="") + # The fan-out fails; the pipeline reports failure. + assert not result.success + + +# ---- Pause exports --------------------------------------------------------- + + +def test_pause_and_send_reexported_from_pipeline_package(): + from fireflyframework_agentic.pipeline import Pause as P + from fireflyframework_agentic.pipeline import Send as S + + assert P is Pause + assert S is Send