Skip to content

feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)#251

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-5-pause-send
May 28, 2026
Merged

feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)#251
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-5-pause-send

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Fifth layer of the unification (#245). `PipelineEngine` now recognizes the same control sentinels that `StatePipeline` uses today — `Pause` (HITL) and `Send` (runtime fan-out).

Stacked on layer 4 (#250) which is already merged into `issue-147-pipeline-evolution`.

Pause: HITL halt + resume

```python
async def await_approval(ctx, inputs):
return Pause(reason="awaiting human approval to deploy")

engine = PipelineEngine(dag, state_schema=State, checkpointer=cp)
paused = await engine.run(inputs="...")
assert paused.paused is True
assert paused.paused_node == "await_approval"

Later, after approval:

resumed = await engine.run(run_id=paused.run_id, approve_pause=True)
```

Behavior:

  • The engine writes a checkpoint with `paused=True` and the reason.
  • `PipelineResult` carries `paused`, `paused_node`, `pause_reason`.
  • Resuming a paused checkpoint without `approve_pause=True` raises `PipelineError` with the reason surfaced.
  • The pause node is not re-executed on resume; the successor runs next.

Send: parallel fan-out

```python
async def planner(ctx, inputs):
return [Send(target="worker", payload={"items": [i]}) for i in items]
```

  • A node may return a single `Send` or `list[Send]`.
  • Each Send's `payload` is applied to a per-worker state copy via reducers — workers do not race on shared state.
  • Workers run concurrently via `asyncio.gather`.
  • Worker outputs merge back into shared state via reducers.
  • Workers are marked completed so the topological scheduler doesn't re-run them.

Refactor

`Pause` and `Send` dataclasses moved from `state_pipeline.py` to `engine.py`. This breaks what would otherwise be a circular import; `state_pipeline.py` now imports them from `engine`. Public re-exports from `fireflyframework_agentic.pipeline` are unchanged — `from fireflyframework_agentic.pipeline import Pause, Send` still works.

Engine changes

  • `run()` accepts `approve_pause: bool = False`.
  • `_save_checkpoint` accepts `paused=`, `pause_reason=` kwargs (persisted on `CheckpointRecord`).
  • `_load_for_resume` enforces the `approve_pause` gate.
  • New `_run_sends` helper drives the fan-out.
  • New `_is_send_payload` module-level discriminator.

Result changes

`PipelineResult` gains: `paused: bool`, `paused_node: str | None`, `pause_reason: str | None`. Mirrors `StatePipelineResult`.

Tests

7 new in `tests/unit/pipeline/test_pipeline_engine_pause_send.py`:

  • Pause halts pipeline; result + checkpoint reflect paused state
  • Resume without `approve_pause=True` raises with reason
  • Resume with `approve_pause=True` continues from successor
  • `list[Send]` dispatches workers concurrently with per-worker state
  • Single `Send` is treated as list of one
  • Unknown Send target marks the pipeline as failed
  • `Pause` / `Send` re-exports stable

Full suite: 1587 passed.

What's NOT in this PR

  • Pause inside cyclic mode — `_run_cyclic` doesn't yet handle Pause; that's a follow-up. The acyclic scheduler covers most HITL use cases.
  • Cross-fan-out resume — same caveat `StatePipeline` already has. A run paused mid-fan-out can't be cleanly resumed across worker boundaries.

Refs

Fifth layer of the unification. PipelineEngine now recognizes the same
control sentinels that StatePipeline uses today:

- A node returning Pause(reason=...) halts the pipeline cleanly. The run
  resumes with engine.run(run_id=..., approve_pause=True).
- A node returning Send or list[Send] triggers parallel fan-out where
  each Send's target runs concurrently with the payload merged into a
  per-worker state copy. Reducers merge worker outputs back into shared
  state.

Refactor (no behavior change):

- Pause and Send dataclasses moved from state_pipeline.py to engine.py
  so PipelineEngine can recognize them without a circular import.
  state_pipeline.py now imports them from engine. Public re-exports from
  fireflyframework_agentic.pipeline are unchanged.

Engine changes:

- run() accepts approve_pause: bool = False kwarg. Resuming a paused
  checkpoint without approve_pause=True raises PipelineError with the
  pause reason.
- _save_checkpoint accepts paused=, pause_reason= kwargs, persisted on
  CheckpointRecord (the fields landed in Layer 1A).
- _load_for_resume enforces the approve_pause gate.
- Main loop branches on Pause: emits on_node_pause event, checkpoints
  with paused=True, sets pending_pause and aborts.
- Main loop branches on Send: dispatches workers via the new _run_sends
  helper, marks workers as completed so the scheduler does not re-run them.
- _run_sends: validates target IDs up front; per-worker PipelineContext
  with its own state copy (payload applied via reducers); asyncio.gather
  across workers; results merge back into shared state via reducers; any
  worker failure aborts the pipeline.
- _is_send_payload helper at module level.

Result changes:

- PipelineResult gains paused: bool, paused_node: str | None,
  pause_reason: str | None. Mirrors StatePipelineResult.

Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_pause_send.py
covering:
- Pause halts the pipeline and records paused state in result + checkpoint
- Resume without approve_pause=True raises
- Resume with approve_pause=True continues from the paused node's successor
- list[Send] dispatches workers concurrently with per-worker state copies
- Single Send is treated as list[Send] of one
- Unknown Send target marks the pipeline as failed
- Pause and Send remain re-exported from the pipeline package

Full suite: 1587 passed.

Refs: #245
@miguelgfierro miguelgfierro merged commit c17c7cf into issue-147-pipeline-evolution May 28, 2026
@miguelgfierro miguelgfierro deleted the unify-pipeline-5-pause-send branch May 28, 2026 13:11
ancongui pushed a commit that referenced this pull request May 31, 2026
…send

feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant