feat(pipeline): observability for state pipelines (#147 phase 3b)#235
Merged
Merged
Conversation
Adds observability to state pipelines, mirroring the legacy PipelineEngine story but with state-mode semantics: run_id is plumbed through every callback, on_node_start carries a per-node visit counter, and there is no on_node_skip (state pipelines abort on failure rather than skipping). - New StatePipelineEventHandler Protocol in pipeline/engine.py with on_pipeline_start / on_node_start / on_node_complete / on_node_error / on_pipeline_complete. Partial handlers are valid (hasattr-checked). - Per-node OTel spans nested under one pipeline-level span. The existing _start_otel_span helper on PipelineEngine is lifted to a module-level start_otel_span function shared by both pipeline types. - PipelineBuilder gains an event_handler kwarg that flows into StatePipeline. - Fan-out via Send emits per-Send node-start/complete pairs with each Send's own visit number (snapshot at increment time, not post-loop). - Handler exceptions are swallowed — observability never breaks business logic. Tests: 8 new in test_state_pipeline_observability.py covering event ordering, failure path, cyclic visit counts, fan-out per-Send events, resume-from-checkpoint, partial handler, swallowed handler exceptions, and OTel span emission with attribute snapshots. Example: examples/pipeline_state.py gains a ProgressHandler that prints live progress for the software-factory scenario. Verification: - pytest tests/unit/pipeline/ → 122 passed (114 baseline + 8 new) - ruff check + format clean - pyright clean on touched modules
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
feat(pipeline): observability for state pipelines (#147 phase 3b)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on `issue-147-pipeline-evolution` (now containing Phase 1+2+3a). Part of #147, phase 3b.
What
Mirrors the legacy `PipelineEngine`'s observability story for state pipelines:
Differences from `PipelineEventHandler`
Partial implementations work: `StatePipeline._emit` uses `hasattr` so a handler that only defines `on_node_error` is valid. Handler exceptions are swallowed — observability never breaks business logic.
OTel spans
When `observability_enabled` and `opentelemetry` is installed, each `invoke` emits:
The existing `PipelineEngine._start_otel_span` was lifted to a module-level `start_otel_span` function shared by both pipeline types (eliminates the duplicate).
Tests
`tests/unit/pipeline/test_state_pipeline_observability.py` (8 tests):
Suite-wide:
Docs + example
Stacking
Base = `issue-147-pipeline-evolution` (Phase 1+2+3a). Will retarget to main automatically once #232 merges.
What lands next