Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,69 @@ In parallel, the pipeline emits OTel spans automatically when

Handler exceptions are swallowed — observability never breaks business logic.

### Human-in-the-loop (Pause)

Any node may return ``Pause(reason="...")`` instead of a state update to halt
the pipeline cleanly. The current state is checkpointed with a paused marker;
``invoke`` returns with ``result.paused=True`` and ``result.success=False``.

```python
from fireflyframework_agentic.pipeline import Pause

async def await_deploy_approval(state: DeployState) -> Pause:
return Pause(reason="awaiting human approval to deploy to production")
```

To resume after the external approval comes in, call ``invoke`` with the same
``run_id`` and ``approve_pause=True``. Without ``approve_pause=True``, the
resume raises a ``PipelineError`` — the pause is sticky until explicitly
released. The successor of the paused node runs next; the pause node itself
is not re-executed.

```python
first = await pipeline.invoke(DeployState(...))
assert first.paused
# ...later, after approval...
done = await pipeline.invoke(run_id=first.run_id, approve_pause=True)
assert done.success
```

The configured ``StatePipelineEventHandler`` receives an ``on_node_pause``
callback when this happens (the callback is optional — partial handlers
without it continue to work).

### Audit Log

Distinct from the ``Checkpointer`` (which stores the *latest* state for
crash recovery), an ``AuditLog`` is an append-only record of *every* node
visit for compliance, debugging, and replay. Wire one in via the
``audit_log`` kwarg:

```python
from fireflyframework_agentic.pipeline import (
PipelineBuilder, FileAuditLog, PostgresAuditLog, LoggingAuditLog, OtelAuditLog,
)

PipelineBuilder("agent", state=AgentState, audit_log=FileAuditLog("./audit"))
```

Four backends ship, each conforming to the ``AuditLog`` Protocol:

| Backend | Use when | Read API | Trace-correlated | Install |
|---|---|---|---|---|
| ``FileAuditLog`` | Dev / single-host | yes | no | (default) |
| ``PostgresAuditLog`` | Compliance, retention, cross-run queries | yes | no | ``[postgres]`` |
| ``LoggingAuditLog`` | Generic log stacks (Splunk-HEC, Loki, JSON-logging) | no (write-only) | no | (default — stdlib) |
| ``OtelAuditLog`` | OTel-native stacks (Application Insights, Datadog APM, OTel Collector) | no (write-only) | **yes** | ``opentelemetry-sdk`` |

``FileAuditLog`` and ``PostgresAuditLog`` also implement
``QueryableAuditLog`` with ``list_entries(pipeline_name, run_id)``. The
write-only backends delegate query/search to the user's existing
observability stack.

Audit-log write failures are non-fatal — logged but never abort the
pipeline.

### Mermaid Export

`StatePipeline.to_mermaid()` and `DAG.to_mermaid()` render the topology as a
Expand Down
69 changes: 69 additions & 0 deletions examples/pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
from pydantic import BaseModel

from fireflyframework_agentic.pipeline import (
FileAuditLog,
FileCheckpointer,
Pause,
PipelineBuilder,
PostgresCheckpointer,
Send,
Expand Down Expand Up @@ -172,6 +174,9 @@ async def on_node_complete(self, pipeline_name: str, run_id: str, node_id: str,
async def on_node_error(self, pipeline_name: str, run_id: str, node_id: str, error: str) -> None:
print(f" ✗ {node_id}: {error}")

async def on_node_pause(self, pipeline_name: str, run_id: str, node_id: str, reason: str) -> None:
print(f" ⏸ {node_id} paused: {reason}")

async def on_pipeline_complete(self, pipeline_name: str, run_id: str, success: bool, duration_ms: float) -> None:
status = "OK" if success else "FAILED"
print(f" ═ [{pipeline_name}] {status} in {duration_ms:.0f}ms")
Expand Down Expand Up @@ -299,11 +304,75 @@ async def run_software_factory_postgres() -> None:
print(f" eval: {second.state.evaluation}\n")


class HitlState(BaseModel):
"""State threaded through a deploy pipeline gated by human approval."""

target_env: str
artifact: str | None = None
deployed_to: str | None = None


async def build_artifact(state: HitlState) -> dict:
return {"artifact": f"build-{state.target_env}.tar.gz"}


async def await_approval(state: HitlState) -> Pause:
return Pause(reason=f"awaiting human approval to deploy {state.artifact} to {state.target_env}")


async def deploy_artifact(state: HitlState) -> dict:
return {"deployed_to": f"https://{state.target_env}.example.com"}


async def run_hitl_with_audit() -> None:
print("=== 5. Human-in-the-loop deploy gate with audit log ===\n")

with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
ckpt = FileCheckpointer(root / "ckpt")
audit = FileAuditLog(root / "audit")
pipeline = (
PipelineBuilder(
"hitl-deploy",
state=HitlState,
checkpointer=ckpt,
audit_log=audit,
)
.add_node(build_artifact)
.add_node(await_approval)
.add_node(deploy_artifact)
.chain(build_artifact, await_approval, deploy_artifact)
.build()
)

# First run halts at the approval gate.
first = await pipeline.invoke(HitlState(target_env="prod"))
print(f" first run: paused={first.paused}, paused_node={first.paused_node}")
print(f" reason: {first.pause_reason}")
print(f" run_id: {first.run_id}\n")

# ...time passes; a human reviews and approves...
print(" (human reviews and approves)\n")

# Resume with explicit approval.
done = await pipeline.invoke(run_id=first.run_id, approve_pause=True)
print(f" resumed: success={done.success}, deployed_to={done.state.deployed_to}")
print(f" completed: {done.completed_nodes}\n")

# Audit log captures every node visit with its status.
entries = audit.list_entries("hitl-deploy", first.run_id)
print(" audit trail:")
for e in entries:
extra = f" reason={e.pause_reason!r}" if e.pause_reason else ""
print(f" seq={e.sequence} node={e.node_id} status={e.status}{extra}")


async def main() -> None:
await run_branching()
await run_software_factory()
await run_map_reduce()
await run_software_factory_postgres()
await run_hitl_with_audit()


if __name__ == "__main__":
Expand Down
20 changes: 19 additions & 1 deletion fireflyframework_agentic/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@
via :class:`Checkpointer` enables resume after failure and mid-pipeline start.
"""

from fireflyframework_agentic.pipeline.audit import (
AuditEntry,
AuditLog,
FileAuditLog,
LoggingAuditLog,
OtelAuditLog,
PostgresAuditLog,
QueryableAuditLog,
)
from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.checkpoint import (
Checkpointer,
Expand All @@ -46,6 +55,7 @@
from fireflyframework_agentic.pipeline.reducers import append, extend, merge_dict, replace
from fireflyframework_agentic.pipeline.result import ExecutionTraceEntry, NodeResult, PipelineResult
from fireflyframework_agentic.pipeline.state_pipeline import (
Pause,
RecursionLimitError,
Send,
StatePipeline,
Expand All @@ -67,6 +77,8 @@
__all__ = [
"DAG",
"AgentStep",
"AuditEntry",
"AuditLog",
"BatchLLMStep",
"BranchStep",
"CallableStep",
Expand All @@ -79,14 +91,20 @@
"FailureStrategy",
"FanInStep",
"FanOutStep",
"FileAuditLog",
"FileCheckpointer",
"LoggingAuditLog",
"NodeResult",
"PostgresCheckpointer",
"OtelAuditLog",
"Pause",
"PipelineBuilder",
"PipelineContext",
"PipelineEngine",
"PipelineEventHandler",
"PipelineResult",
"PostgresAuditLog",
"PostgresCheckpointer",
"QueryableAuditLog",
"ReasoningStep",
"RecursionLimitError",
"RedisCheckpointer",
Expand Down
Loading