Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,59 @@ When all worker targets share a common successor, the engine continues there
once the fan-out completes; the aggregator runs once with all results in
shared state.

### Observability

State pipelines emit lifecycle callbacks and OTel spans so ops can see what
an agent workflow is doing in real time.

`StatePipelineEventHandler` mirrors the legacy `PipelineEventHandler` but
every callback carries the `run_id` (so events can be correlated across
resumes) and `on_node_start` carries a per-node visit counter (so cyclic
graphs and `Send` fan-outs are distinguishable). Implement any subset of
methods; missing ones are no-ops.

```python
from fireflyframework_agentic.pipeline import PipelineBuilder, StatePipelineEventHandler


class ProgressHandler:
async def on_pipeline_start(self, name, run_id):
print(f"▶ [{name}] run {run_id} starting")

async def on_node_start(self, name, run_id, node_id, visit):
print(f" ▶ {node_id} (visit #{visit})")

async def on_node_complete(self, name, run_id, node_id, latency_ms):
print(f" ✔ {node_id} ({latency_ms:.0f}ms)")

async def on_node_error(self, name, run_id, node_id, error):
print(f" ✗ {node_id}: {error}")

async def on_pipeline_complete(self, name, run_id, success, duration_ms):
status = "OK" if success else "FAILED"
print(f"═ [{name}] {status} in {duration_ms:.0f}ms")


pipeline = (
PipelineBuilder("agent", state=AgentState, event_handler=ProgressHandler())
.add_node(classify).add_node(answer).add_node(escalate)
.branch(classify, route)
.build()
)
```

In parallel, the pipeline emits OTel spans automatically when
`observability_enabled` is True and `opentelemetry` is installed:

- One pipeline-level span `pipeline.state.<name>` around each `invoke`,
attributes `firefly.pipeline`, `firefly.run_id`.
- One per-node span `pipeline.state.node.<node_id>` for each `fn(state)`
call, parented under the pipeline span, attributes `firefly.node`,
`firefly.visit`.
- For `Send` fan-out: one per-Send span as a sibling under the pipeline span.

Handler exceptions are swallowed — observability never breaks business logic.

### Mermaid Export

`StatePipeline.to_mermaid()` and `DAG.to_mermaid()` render the topology as a
Expand Down
43 changes: 41 additions & 2 deletions examples/pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,52 @@ async def evaluator(state: BuildState) -> dict:
return {"evaluation": f"PASS — deployed at {state.deploy_url}"}


class ProgressHandler:
"""Prints live progress for the software-factory scenario.

Implements only a subset of :class:`StatePipelineEventHandler` — missing
methods are no-ops, which is fine because the pipeline tolerates partial
handlers.
"""

async def on_pipeline_start(self, pipeline_name: str, run_id: str) -> None:
print(f" ▶ [{pipeline_name}] run {run_id[:8]}… starting")

async def on_node_start(
self, pipeline_name: str, run_id: str, node_id: str, visit: int
) -> None:
print(f" ▶ {node_id} (visit #{visit})")

async def on_node_complete(
self, pipeline_name: str, run_id: str, node_id: str, latency_ms: float
) -> None:
print(f" ✔ {node_id} ({latency_ms:.0f}ms)")

async def on_node_error(
self, pipeline_name: str, run_id: str, node_id: str, error: str
) -> None:
print(f" ✗ {node_id}: {error}")

async def on_pipeline_complete(
self, pipeline_name: str, run_id: str, success: bool, duration_ms: float
) -> None:
status = "OK" if success else "FAILED"
print(f" ═ [{pipeline_name}] {status} in {duration_ms:.0f}ms")


async def run_software_factory() -> None:
print("=== 2. Software factory with checkpoint/resume ===\n")
print("=== 2. Software factory with checkpoint/resume + live progress ===\n")

handler = ProgressHandler()
with tempfile.TemporaryDirectory() as tmp:
ckpt = FileCheckpointer(Path(tmp))
pipeline = (
PipelineBuilder("software-factory", state=BuildState, checkpointer=ckpt)
PipelineBuilder(
"software-factory",
state=BuildState,
checkpointer=ckpt,
event_handler=handler,
)
.add_node(architect)
.add_node(python_dev)
.add_node(deployer)
Expand Down
7 changes: 6 additions & 1 deletion fireflyframework_agentic/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
)
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.pipeline.dag import DAG, DAGEdge, DAGNode, FailureStrategy
from fireflyframework_agentic.pipeline.engine import PipelineEngine, PipelineEventHandler
from fireflyframework_agentic.pipeline.engine import (
PipelineEngine,
PipelineEventHandler,
StatePipelineEventHandler,
)
from fireflyframework_agentic.pipeline.reducers import append, extend, merge_dict, replace
from fireflyframework_agentic.pipeline.result import ExecutionTraceEntry, NodeResult, PipelineResult
from fireflyframework_agentic.pipeline.state_pipeline import (
Expand Down Expand Up @@ -89,6 +93,7 @@
"RetrievalStep",
"Send",
"StatePipeline",
"StatePipelineEventHandler",
"StatePipelineResult",
"StepExecutor",
"append",
Expand Down
5 changes: 4 additions & 1 deletion fireflyframework_agentic/pipeline/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from fireflyframework_agentic.exceptions import PipelineError
from fireflyframework_agentic.pipeline.checkpoint import Checkpointer
from fireflyframework_agentic.pipeline.dag import DAG, DAGEdge, DAGNode, FailureStrategy
from fireflyframework_agentic.pipeline.engine import PipelineEngine
from fireflyframework_agentic.pipeline.engine import PipelineEngine, StatePipelineEventHandler
from fireflyframework_agentic.pipeline.state_pipeline import (
BranchSpec,
RouterFn,
Expand Down Expand Up @@ -86,6 +86,7 @@ def __init__(
state: type[BaseModel] | None = None,
checkpointer: Checkpointer | None = None,
recursion_limit: int = 25,
event_handler: StatePipelineEventHandler | None = None,
) -> None:
# State pipelines may use cyclic graphs (ReAct loops, retry-with-critique).
# The legacy port-based path keeps acyclicity as an invariant.
Expand All @@ -94,6 +95,7 @@ def __init__(
self._state_schema = state
self._checkpointer = checkpointer
self._recursion_limit = recursion_limit
self._event_handler = event_handler
self._pending_nodes: list[DAGNode] = []
self._pending_edges: list[DAGEdge] = []
# State-based mode bookkeeping. Keyed by node id.
Expand Down Expand Up @@ -267,6 +269,7 @@ def build(self) -> PipelineEngine | StatePipeline:
branches=self._branches,
checkpointer=self._checkpointer,
recursion_limit=self._recursion_limit,
event_handler=self._event_handler,
)

return PipelineEngine(self._dag)
Expand Down
69 changes: 56 additions & 13 deletions fireflyframework_agentic/pipeline/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,60 @@ async def on_pipeline_complete(self, pipeline_name: str, success: bool, duration
...


@runtime_checkable
class StatePipelineEventHandler(Protocol):
"""Protocol for state-pipeline progress callbacks.

Mirrors :class:`PipelineEventHandler` but every callback carries the
``run_id`` so ops can correlate events across resumes, and
``on_node_start`` carries a ``visit`` counter so cyclic graphs are
distinguishable per iteration. There is no ``on_node_skip`` — state
pipelines abort on failure rather than skipping downstream nodes.

Implement any subset of methods; missing ones are no-ops.
"""

async def on_pipeline_start(self, pipeline_name: str, run_id: str) -> None:
"""Called once when ``invoke`` begins."""
...

async def on_node_start(self, pipeline_name: str, run_id: str, node_id: str, visit: int) -> None:
"""Called each time a node is about to run. ``visit`` starts at 1
and increments per re-entry (cycles, Send fan-out)."""
...

async def on_node_complete(self, pipeline_name: str, run_id: str, node_id: str, latency_ms: float) -> None:
"""Called when a node completes successfully."""
...

async def on_node_error(self, pipeline_name: str, run_id: str, node_id: str, error: str) -> None:
"""Called when a node raises an exception."""
...

async def on_pipeline_complete(self, pipeline_name: str, run_id: str, success: bool, duration_ms: float) -> None:
"""Called once when ``invoke`` returns."""
...


def start_otel_span(name: str, **attributes: Any) -> Any:
"""Start an OTel span if observability is enabled, else return ``None``.

Module-level helper shared by :class:`PipelineEngine` and
:class:`fireflyframework_agentic.pipeline.state_pipeline.StatePipeline`.
"""
try:
if not get_config().observability_enabled:
return None
if otel_trace is None:
return None
return otel_trace.get_tracer("fireflyframework_agentic").start_span(
name,
attributes={f"firefly.{k}": str(v) for k, v in attributes.items()},
)
except Exception: # noqa: BLE001
return None


class PipelineEngine:
"""Executes a :class:`DAG` by computing topological levels and running
nodes within each level concurrently.
Expand Down Expand Up @@ -347,19 +401,8 @@ async def _execute_node(

@staticmethod
def _start_otel_span(name: str, **attributes: Any) -> Any:
"""Start an OTel span if observability is enabled, else return *None*."""
try:
if not get_config().observability_enabled:
return None
if otel_trace is None:
return None

return otel_trace.get_tracer("fireflyframework_agentic").start_span(
name,
attributes={f"firefly.{k}": str(v) for k, v in attributes.items()},
)
except Exception: # noqa: BLE001
return None
"""Backwards-compatible wrapper around the module-level :func:`start_otel_span`."""
return start_otel_span(name, **attributes)

@staticmethod
def _aggregate_usage(correlation_id: str) -> Any:
Expand Down
Loading