feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)#251
Merged
miguelgfierro merged 1 commit intoMay 28, 2026
Merged
Conversation
Fifth layer of the unification. PipelineEngine now recognizes the same control sentinels that StatePipeline uses today: - A node returning Pause(reason=...) halts the pipeline cleanly. The run resumes with engine.run(run_id=..., approve_pause=True). - A node returning Send or list[Send] triggers parallel fan-out where each Send's target runs concurrently with the payload merged into a per-worker state copy. Reducers merge worker outputs back into shared state. Refactor (no behavior change): - Pause and Send dataclasses moved from state_pipeline.py to engine.py so PipelineEngine can recognize them without a circular import. state_pipeline.py now imports them from engine. Public re-exports from fireflyframework_agentic.pipeline are unchanged. Engine changes: - run() accepts approve_pause: bool = False kwarg. Resuming a paused checkpoint without approve_pause=True raises PipelineError with the pause reason. - _save_checkpoint accepts paused=, pause_reason= kwargs, persisted on CheckpointRecord (the fields landed in Layer 1A). - _load_for_resume enforces the approve_pause gate. - Main loop branches on Pause: emits on_node_pause event, checkpoints with paused=True, sets pending_pause and aborts. - Main loop branches on Send: dispatches workers via the new _run_sends helper, marks workers as completed so the scheduler does not re-run them. - _run_sends: validates target IDs up front; per-worker PipelineContext with its own state copy (payload applied via reducers); asyncio.gather across workers; results merge back into shared state via reducers; any worker failure aborts the pipeline. - _is_send_payload helper at module level. Result changes: - PipelineResult gains paused: bool, paused_node: str | None, pause_reason: str | None. Mirrors StatePipelineResult. Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_pause_send.py covering: - Pause halts the pipeline and records paused state in result + checkpoint - Resume without approve_pause=True raises - Resume with approve_pause=True continues from the paused node's successor - list[Send] dispatches workers concurrently with per-worker state copies - Single Send is treated as list[Send] of one - Unknown Send target marks the pipeline as failed - Pause and Send remain re-exported from the pipeline package Full suite: 1587 passed. Refs: #245
This was referenced May 28, 2026
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
…send feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fifth layer of the unification (#245). `PipelineEngine` now recognizes the same control sentinels that `StatePipeline` uses today — `Pause` (HITL) and `Send` (runtime fan-out).
Stacked on layer 4 (#250) which is already merged into `issue-147-pipeline-evolution`.
Pause: HITL halt + resume
```python
async def await_approval(ctx, inputs):
return Pause(reason="awaiting human approval to deploy")
engine = PipelineEngine(dag, state_schema=State, checkpointer=cp)
paused = await engine.run(inputs="...")
assert paused.paused is True
assert paused.paused_node == "await_approval"
Later, after approval:
resumed = await engine.run(run_id=paused.run_id, approve_pause=True)
```
Behavior:
Send: parallel fan-out
```python
async def planner(ctx, inputs):
return [Send(target="worker", payload={"items": [i]}) for i in items]
```
Refactor
`Pause` and `Send` dataclasses moved from `state_pipeline.py` to `engine.py`. This breaks what would otherwise be a circular import; `state_pipeline.py` now imports them from `engine`. Public re-exports from `fireflyframework_agentic.pipeline` are unchanged — `from fireflyframework_agentic.pipeline import Pause, Send` still works.
Engine changes
Result changes
`PipelineResult` gains: `paused: bool`, `paused_node: str | None`, `pause_reason: str | None`. Mirrors `StatePipelineResult`.
Tests
7 new in `tests/unit/pipeline/test_pipeline_engine_pause_send.py`:
Full suite: 1587 passed.
What's NOT in this PR
Refs