Skip to content

feat(pipeline): HITL Pause + AuditLog (#147 phase 3c)#236

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
issue-147-phase-3c
May 27, 2026
Merged

feat(pipeline): HITL Pause + AuditLog (#147 phase 3c)#236
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
issue-147-phase-3c

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Stacked on `issue-147-pipeline-evolution` (now containing Phase 1+2+3a+3b). Part of #147, phase 3c.

Two enterprise features in one PR — they share executor wiring (audit captures pause events, pause writes a paused checkpoint that audit also records).

HITL via Pause

from fireflyframework_agentic.pipeline import PipelineBuilder, Pause, FileCheckpointer

async def await_approval(state):
    return Pause(reason=\"awaiting human approval to deploy to prod\")

pipeline = (
    PipelineBuilder(\"deploy\", state=DeployState, checkpointer=FileCheckpointer(\"./ckpt\"))
    .add_node(build).add_node(await_approval).add_node(deploy)
    .chain(build, await_approval, deploy)
    .build()
)

first = await pipeline.invoke(DeployState(...))
assert first.paused                     # halted at the gate
assert first.pause_reason == \"awaiting human approval to deploy to prod\"

# ...human approves out-of-band...

done = await pipeline.invoke(run_id=first.run_id, approve_pause=True)
assert done.success                     # resumed from the successor of the paused node
  • `Pause(reason=...)` returned by a node halts the pipeline cleanly.
  • The current state is checkpointed with `paused=True` and `pause_reason` set (new optional `CheckpointRecord` fields; backward compatible across all three checkpointer backends).
  • `StatePipelineResult` gains `paused` / `paused_node` / `pause_reason`.
  • `StatePipelineEventHandler` gains an optional `on_node_pause` callback.
  • `invoke(run_id=..., approve_pause=True)` resumes from the SUCCESSOR of the paused node (the pause node itself is not re-executed). Without `approve_pause=True`, a paused run raises `PipelineError` — pauses are sticky until explicitly released.

AuditLog with 4 backends

New file `pipeline/audit.py`. `AuditEntry` pydantic model + split Protocol: `AuditLog` (write-only) + `QueryableAuditLog` (adds `list_entries`).

Backend Use when Read API Trace-correlated Install
`FileAuditLog` Dev / single-host yes no (default)
`PostgresAuditLog` Compliance, retention, cross-run queries yes no `[postgres]`
`LoggingAuditLog` Generic log stacks (Splunk-HEC, Loki, JSON-logging) no (write-only) no (default — stdlib)
`OtelAuditLog` OTel-native stacks (App Insights, Datadog APM, OTel Collector) no (write-only) yes `opentelemetry-sdk`

`StatePipeline` writes one `AuditEntry` per node visit with `inputs_snapshot`, `outputs_snapshot`, `latency_ms`, timestamps, status (success/error/paused), and `error_message` or `pause_reason` as appropriate. Audit-write failures are non-fatal — logged and swallowed.

API additions

Exported from `fireflyframework_agentic.pipeline`:

  • `Pause`
  • `AuditEntry`, `AuditLog`, `QueryableAuditLog`
  • `FileAuditLog`, `PostgresAuditLog`, `LoggingAuditLog`, `OtelAuditLog`

`PipelineBuilder(...)` gains an `audit_log` kwarg. `invoke(...)` gains `approve_pause: bool = False`.

No new required deps

  • `Postgres` extra was added in Phase 3a (`psycopg[binary]>=3`) — `PostgresAuditLog` reuses it.
  • `opentelemetry-sdk` is the same optional dep Phase 3b already pulls in for span observability — `OtelAuditLog` reuses it via a guarded import.
  • `Logging` is stdlib.

Tests

20 new across two files (142 pipeline tests total = 122 baseline + 20 new):

`tests/unit/pipeline/test_state_pipeline_hitl.py` (6):

  • A node returning `Pause` halts the pipeline; result has `paused=True`.
  • Resume without `approve_pause=True` raises `PipelineError`.
  • Resume with `approve_pause=True` continues from the successor; the pause node is not re-executed.
  • `on_node_pause` event fires with the right payload.
  • Backward-compat: a pre-3c checkpoint without the new `paused`/`pause_reason` fields loads cleanly.
  • End-to-end: pause → approve → failure → retry succeeds.

`tests/unit/pipeline/test_audit_log.py` (14):

  • `FileAuditLog` writes JSONL one line per entry; `list_entries` round-trips.
  • `PostgresAuditLog` issues DDL exactly once across many `record` calls; `list_entries` orders by sequence; rejects unsafe table names; raises clear `ImportError` if the extra isn't installed.
  • `LoggingAuditLog` emits a stdlib log record with `firefly_audit` in `record.dict`.
  • `OtelAuditLog` calls the OTel logger's `.emit()` with a `LogRecord`; raises clear `ImportError` if the extra isn't installed.
  • Pipeline writes one entry per node visit; success / error / paused statuses captured correctly.
  • An audit log that raises on `record` does NOT abort the pipeline.

Suite-wide:

  • `pytest tests/unit/pipeline/` → 142 passed (122 baseline + 20 new)
  • `ruff check` + `ruff format --check` clean
  • `pyright` clean on touched files
  • Example runs end-to-end with the new HITL + audit scenario

Docs + example

  • `docs/pipeline.md` State-Based Pipelines section gains Human-in-the-loop (Pause) and Audit Log subsections with the 4-backend table.
  • `examples/pipeline_state.py` gains a fifth scenario showing a build → await-approval → deploy pipeline with audit-trail printout:

```
=== 5. Human-in-the-loop deploy gate with audit log ===

first run: paused=True, paused_node=await_approval
reason: awaiting human approval to deploy build-prod.tar.gz to prod
run_id: 51dd7234fdb2

(human reviews and approves)

resumed: success=True, deployed_to=https://prod.example.com
completed: ['build_artifact', 'await_approval', 'deploy_artifact']

audit trail:
seq=1 node=build_artifact status=success
seq=2 node=await_approval status=paused reason='awaiting human approval…'
seq=3 node=deploy_artifact status=success
```

Stacking

Base = `issue-147-pipeline-evolution` (Phase 1+2+3a+3b). Will retarget to `main` automatically once #232 merges.

What's left in #147

This closes the deferred items from Phase 1/2 and the enterprise additions from Phase 3. The remaining open work is whatever the team decides to tackle next — streaming partial state and typed-edge `Literal[...]` validation were the two items in the original deferred list that haven't shipped.

Two enterprise features for state pipelines: human-in-the-loop pause/approve
gates, and a structured audit trail of every node visit.

HITL via Pause:
- New Pause(reason=...) sentinel returned by a node halts the pipeline cleanly.
- StatePipeline writes a paused checkpoint (new optional CheckpointRecord
  fields: paused=False, pause_reason=None — backward compatible).
- StatePipelineResult gains paused/paused_node/pause_reason.
- StatePipelineEventHandler gains an optional on_node_pause callback.
- invoke(run_id=..., approve_pause=True) resumes from the SUCCESSOR of the
  paused node. Without approve_pause=True, resuming a paused run raises
  PipelineError — pauses are sticky until explicitly released.

Audit log via four backends in new pipeline/audit.py:
- AuditEntry pydantic model + split Protocol: AuditLog (write-only) +
  QueryableAuditLog (adds list_entries).
- FileAuditLog — JSONL per (pipeline, run_id). Implements QueryableAuditLog.
- PostgresAuditLog — single firefly_audit table, idempotent DDL, reuses the
  psycopg connection from Phase 3a's postgres extra. Implements QueryableAuditLog.
- LoggingAuditLog — stdlib logging.Logger; pairs with any log-aggregation
  stack (Splunk-HEC, Loki, Datadog, OTel-LoggingHandler-bridge). Write-only.
  No new dep.
- OtelAuditLog — OTel logs API directly; emits LogRecord with trace_id /
  span_id correlation. Requires opentelemetry-sdk. Write-only.
- StatePipeline records one AuditEntry per node visit (success/error/pause)
  with inputs_snapshot, outputs_snapshot, latency_ms, started_at/completed_at,
  status, plus error_message or pause_reason as appropriate.
- Audit-write failures are non-fatal — logged and swallowed.

API additions exported from fireflyframework_agentic.pipeline:
    Pause, AuditEntry, AuditLog, QueryableAuditLog,
    FileAuditLog, PostgresAuditLog, LoggingAuditLog, OtelAuditLog

No new required deps. Postgres extra (already added in 3a) covers
PostgresAuditLog. opentelemetry-sdk is the same optional dep already used
by Phase 3b for OTel spans.

Tests: 20 new across two files.
- test_state_pipeline_hitl.py (6): pause halts pipeline; resume without
  approval raises; resume with approve_pause continues from successor;
  on_node_pause fires; backward-compat for old checkpoints; pause→fail→retry.
- test_audit_log.py (14): per-backend (File JSONL, Postgres mocked,
  Logging via caplog, OTel via mocked logger); pipeline writes one entry
  per visit; status reflects success/error/paused; audit write failures
  don't abort the pipeline.

Full pipeline suite 142 passed (122 baseline + 20 new). Lints clean,
pyright clean on touched modules. Example pipeline_state.py gains a 5th
scenario showing HITL + audit end-to-end.
@miguelgfierro miguelgfierro marked this pull request as ready for review May 27, 2026 16:20
@miguelgfierro miguelgfierro merged commit 9827d72 into issue-147-pipeline-evolution May 27, 2026
@miguelgfierro miguelgfierro deleted the issue-147-phase-3c branch May 27, 2026 16:20
ancongui pushed a commit that referenced this pull request May 31, 2026
feat(pipeline): HITL Pause + AuditLog (#147 phase 3c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant