Skip to content

feat(pipeline): observability for state pipelines (#147 phase 3b)#235

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
issue-147-phase-3b
May 27, 2026
Merged

feat(pipeline): observability for state pipelines (#147 phase 3b)#235
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
issue-147-phase-3b

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Stacked on `issue-147-pipeline-evolution` (now containing Phase 1+2+3a). Part of #147, phase 3b.

What

Mirrors the legacy `PipelineEngine`'s observability story for state pipelines:

  • `StatePipelineEventHandler` Protocol with `on_pipeline_start` / `on_node_start` / `on_node_complete` / `on_node_error` / `on_pipeline_complete`.
  • Per-node OTel spans nested under one pipeline-level span.
  • `event_handler` kwarg on `PipelineBuilder`.
class ProgressHandler:
    async def on_pipeline_start(self, pipeline_name, run_id):
        print(f\"▶ [{pipeline_name}] run {run_id} starting\")

    async def on_node_start(self, pipeline_name, run_id, node_id, visit):
        print(f\"  ▶ {node_id} (visit #{visit})\")

    async def on_node_complete(self, pipeline_name, run_id, node_id, latency_ms):
        print(f\"  ✔ {node_id} ({latency_ms:.0f}ms)\")

    async def on_node_error(self, pipeline_name, run_id, node_id, error):
        print(f\"  ✗ {node_id}: {error}\")

    async def on_pipeline_complete(self, pipeline_name, run_id, success, duration_ms):
        print(f\"═ [{pipeline_name}] {'OK' if success else 'FAILED'} in {duration_ms:.0f}ms\")


PipelineBuilder(\"agent\", state=AgentState, event_handler=ProgressHandler())
    .add_node(classify).add_node(answer).add_node(escalate)
    .branch(classify, route)
    .build()

Differences from `PipelineEventHandler`

  • `run_id` everywhere — state pipelines are stateful and resumable; ops needs to correlate events across resumes.
  • `visit` counter on `on_node_start` — cyclic graphs and `Send` fan-out invoke the same node multiple times; the counter disambiguates them.
  • No `on_node_skip` — state pipelines abort on failure rather than skipping downstream nodes.

Partial implementations work: `StatePipeline._emit` uses `hasattr` so a handler that only defines `on_node_error` is valid. Handler exceptions are swallowed — observability never breaks business logic.

OTel spans

When `observability_enabled` and `opentelemetry` is installed, each `invoke` emits:

  • One `pipeline.state.` span (attributes: `firefly.pipeline`, `firefly.run_id`).
  • One `pipeline.state.node.<node_id>` span per node visit, parented under the pipeline span (attributes: `firefly.node`, `firefly.visit`).
  • For `Send` fan-out: one per-Send span as a sibling under the pipeline span. Each Send's visit number is snapshotted at increment time, not post-loop, so concurrent workers each see their own visit count.

The existing `PipelineEngine._start_otel_span` was lifted to a module-level `start_otel_span` function shared by both pipeline types (eliminates the duplicate).

Tests

`tests/unit/pipeline/test_state_pipeline_observability.py` (8 tests):

  1. Linear pipeline emits events in expected order.
  2. Failure emits `on_node_error` and `on_pipeline_complete(success=False)`.
  3. Cyclic graph increments the visit counter per loop iteration.
  4. Fan-out via `Send` emits per-Send node-start/complete pairs with distinct visit numbers.
  5. Resume from a checkpoint emits events only for the remaining nodes.
  6. Partial handler (only `on_node_error` defined) doesn't crash on other callbacks.
  7. Handler that raises `RuntimeError` doesn't break the pipeline.
  8. OTel spans emitted with the right names and attributes (monkeypatched tracer).

Suite-wide:

  • `pytest tests/unit/pipeline/` → 122 passed (114 baseline + 8 new)
  • Full unit suite green
  • `ruff check` + `ruff format --check` clean
  • `pyright` clean on touched files

Docs + example

  • `docs/pipeline.md` State-Based Pipelines section gains an Observability subsection covering the handler Protocol and OTel.
  • `examples/pipeline_state.py` gains a `ProgressHandler` wired into the software-factory scenario:
=== 2. Software factory with checkpoint/resume + live progress ===

  ▶ [software-factory] run 413b4741… starting
    ▶ architect (visit #1)
    ✔ architect (0ms)
    ▶ python_dev (visit #1)
    ✔ python_dev (0ms)
    ▶ deployer (visit #1)
    ✗ deployer: network blip — try again
  ═ [software-factory] FAILED in 2ms
  …
  ▶ [software-factory] run 413b4741… starting
    ▶ deployer (visit #1)
    ✔ deployer (0ms)
    ▶ evaluator (visit #1)
    ✔ evaluator (0ms)
  ═ [software-factory] OK in 0ms

Stacking

Base = `issue-147-pipeline-evolution` (Phase 1+2+3a). Will retarget to main automatically once #232 merges.

What lands next

  • Phase 3c — `Pause(reason)` HITL primitive + `AuditLog` Protocol with Postgres impl reusing the connection from 3a.

Adds observability to state pipelines, mirroring the legacy PipelineEngine
story but with state-mode semantics: run_id is plumbed through every
callback, on_node_start carries a per-node visit counter, and there is no
on_node_skip (state pipelines abort on failure rather than skipping).

- New StatePipelineEventHandler Protocol in pipeline/engine.py with
  on_pipeline_start / on_node_start / on_node_complete / on_node_error /
  on_pipeline_complete. Partial handlers are valid (hasattr-checked).
- Per-node OTel spans nested under one pipeline-level span. The existing
  _start_otel_span helper on PipelineEngine is lifted to a module-level
  start_otel_span function shared by both pipeline types.
- PipelineBuilder gains an event_handler kwarg that flows into StatePipeline.
- Fan-out via Send emits per-Send node-start/complete pairs with each
  Send's own visit number (snapshot at increment time, not post-loop).
- Handler exceptions are swallowed — observability never breaks business
  logic.

Tests: 8 new in test_state_pipeline_observability.py covering event
ordering, failure path, cyclic visit counts, fan-out per-Send events,
resume-from-checkpoint, partial handler, swallowed handler exceptions,
and OTel span emission with attribute snapshots.

Example: examples/pipeline_state.py gains a ProgressHandler that prints
live progress for the software-factory scenario.

Verification:
- pytest tests/unit/pipeline/ → 122 passed (114 baseline + 8 new)
- ruff check + format clean
- pyright clean on touched modules
@miguelgfierro miguelgfierro marked this pull request as ready for review May 27, 2026 15:25
@miguelgfierro miguelgfierro merged commit b0c0f93 into issue-147-pipeline-evolution May 27, 2026
@miguelgfierro miguelgfierro deleted the issue-147-phase-3b branch May 27, 2026 15:25
ancongui pushed a commit that referenced this pull request May 31, 2026
feat(pipeline): observability for state pipelines (#147 phase 3b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant