feat(pipeline): HITL Pause + AuditLog (#147 phase 3c)#236
Merged
Conversation
Two enterprise features for state pipelines: human-in-the-loop pause/approve
gates, and a structured audit trail of every node visit.
HITL via Pause:
- New Pause(reason=...) sentinel returned by a node halts the pipeline cleanly.
- StatePipeline writes a paused checkpoint (new optional CheckpointRecord
fields: paused=False, pause_reason=None — backward compatible).
- StatePipelineResult gains paused/paused_node/pause_reason.
- StatePipelineEventHandler gains an optional on_node_pause callback.
- invoke(run_id=..., approve_pause=True) resumes from the SUCCESSOR of the
paused node. Without approve_pause=True, resuming a paused run raises
PipelineError — pauses are sticky until explicitly released.
Audit log via four backends in new pipeline/audit.py:
- AuditEntry pydantic model + split Protocol: AuditLog (write-only) +
QueryableAuditLog (adds list_entries).
- FileAuditLog — JSONL per (pipeline, run_id). Implements QueryableAuditLog.
- PostgresAuditLog — single firefly_audit table, idempotent DDL, reuses the
psycopg connection from Phase 3a's postgres extra. Implements QueryableAuditLog.
- LoggingAuditLog — stdlib logging.Logger; pairs with any log-aggregation
stack (Splunk-HEC, Loki, Datadog, OTel-LoggingHandler-bridge). Write-only.
No new dep.
- OtelAuditLog — OTel logs API directly; emits LogRecord with trace_id /
span_id correlation. Requires opentelemetry-sdk. Write-only.
- StatePipeline records one AuditEntry per node visit (success/error/pause)
with inputs_snapshot, outputs_snapshot, latency_ms, started_at/completed_at,
status, plus error_message or pause_reason as appropriate.
- Audit-write failures are non-fatal — logged and swallowed.
API additions exported from fireflyframework_agentic.pipeline:
Pause, AuditEntry, AuditLog, QueryableAuditLog,
FileAuditLog, PostgresAuditLog, LoggingAuditLog, OtelAuditLog
No new required deps. Postgres extra (already added in 3a) covers
PostgresAuditLog. opentelemetry-sdk is the same optional dep already used
by Phase 3b for OTel spans.
Tests: 20 new across two files.
- test_state_pipeline_hitl.py (6): pause halts pipeline; resume without
approval raises; resume with approve_pause continues from successor;
on_node_pause fires; backward-compat for old checkpoints; pause→fail→retry.
- test_audit_log.py (14): per-backend (File JSONL, Postgres mocked,
Logging via caplog, OTel via mocked logger); pipeline writes one entry
per visit; status reflects success/error/paused; audit write failures
don't abort the pipeline.
Full pipeline suite 142 passed (122 baseline + 20 new). Lints clean,
pyright clean on touched modules. Example pipeline_state.py gains a 5th
scenario showing HITL + audit end-to-end.
a411ce6 to
0baab84
Compare
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
feat(pipeline): HITL Pause + AuditLog (#147 phase 3c)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on `issue-147-pipeline-evolution` (now containing Phase 1+2+3a+3b). Part of #147, phase 3c.
Two enterprise features in one PR — they share executor wiring (audit captures pause events, pause writes a paused checkpoint that audit also records).
HITL via Pause
AuditLog with 4 backends
New file `pipeline/audit.py`. `AuditEntry` pydantic model + split Protocol: `AuditLog` (write-only) + `QueryableAuditLog` (adds `list_entries`).
`StatePipeline` writes one `AuditEntry` per node visit with `inputs_snapshot`, `outputs_snapshot`, `latency_ms`, timestamps, status (success/error/paused), and `error_message` or `pause_reason` as appropriate. Audit-write failures are non-fatal — logged and swallowed.
API additions
Exported from `fireflyframework_agentic.pipeline`:
`PipelineBuilder(...)` gains an `audit_log` kwarg. `invoke(...)` gains `approve_pause: bool = False`.
No new required deps
Tests
20 new across two files (142 pipeline tests total = 122 baseline + 20 new):
`tests/unit/pipeline/test_state_pipeline_hitl.py` (6):
`tests/unit/pipeline/test_audit_log.py` (14):
Suite-wide:
Docs + example
```
=== 5. Human-in-the-loop deploy gate with audit log ===
first run: paused=True, paused_node=await_approval
reason: awaiting human approval to deploy build-prod.tar.gz to prod
run_id: 51dd7234fdb2
(human reviews and approves)
resumed: success=True, deployed_to=https://prod.example.com
completed: ['build_artifact', 'await_approval', 'deploy_artifact']
audit trail:
seq=1 node=build_artifact status=success
seq=2 node=await_approval status=paused reason='awaiting human approval…'
seq=3 node=deploy_artifact status=success
```
Stacking
Base = `issue-147-pipeline-evolution` (Phase 1+2+3a+3b). Will retarget to `main` automatically once #232 merges.
What's left in #147
This closes the deferred items from Phase 1/2 and the enterprise additions from Phase 3. The remaining open work is whatever the team decides to tackle next — streaming partial state and typed-edge `Literal[...]` validation were the two items in the original deferred list that haven't shipped.