Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@

### Added

- **Workflow correlation ID threading (issue #86, FWS-2).** Forge agents
now extract orchestration headers — `X-Workflow-ID`,
`X-Workflow-Stage-ID`, `X-Workflow-Step-ID`, `X-Invocation-Caller` —
at the A2A dispatch boundary (JSON-RPC + REST handlers) and inject
them into `context.Context` as
a `WorkflowContext` value. Every audit event emitted during the
invocation is then auto-tagged via a new `AuditLogger.EmitFromContext`
with the matching `workflow_id` / `stage_id` / `step_id` /
`invocation_caller` fields, letting audit consumers correlate events
across multiple agents participating in one workflow run. Direct A2A
invocations (no orchestrator headers) leave the fields unset —
emitted JSON is byte-for-byte identical to the pre-FWS-2 shape, so
existing audit consumers keep working. A
`WorkflowContext.ApplyToHTTPHeaders` helper is exposed for tools
that want to propagate the headers onto outbound agent-to-agent A2A
calls; auto-propagation is deliberately off by default to prevent
leaking workflow identity to third-party APIs. See
`docs/security/workflow-correlation.md`.
- **A2A 0.3.0 Agent Card conformance (issue #85, FWS-1).** Forge now
serves a spec-conformant Agent Card at the A2A 0.3.0 canonical path
`/.well-known/agent-card.json`. The card carries every required A2A
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ You write a `SKILL.md`. Forge compiles it into a secure, runnable agent with egr
| [Secrets](docs/security/secret-management.md) | Encrypted secret management |
| [Build Signing](docs/security/build-signing.md) | Ed25519 signing and verification |
| [Guardrails](docs/security/guardrails.md) | Content filtering and PII detection |
| [Workflow Correlation](docs/security/workflow-correlation.md) | Orchestrator correlation IDs threaded through audit events |

### Operations

Expand Down
4 changes: 4 additions & 0 deletions docs/security/audit-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ All runtime security events are emitted as structured NDJSON to stderr with corr

The `source` field distinguishes in-process enforcer events from subprocess proxy events.

### Workflow correlation

When the inbound A2A request carries the orchestrator's correlation headers (`X-Workflow-ID`, `X-Workflow-Stage-ID`, `X-Workflow-Step-ID`, `X-Invocation-Caller`), every audit event emitted during that invocation is tagged with the matching `workflow_id` / `stage_id` / `step_id` / `invocation_caller` fields. Header names are vendor-neutral so any A2A-compatible orchestrator can populate them. Direct A2A invocations (no orchestrator) omit the fields entirely — emitted JSON is byte-identical to the pre-correlation shape. See [Workflow correlation IDs](workflow-correlation.md) for the full reference, including outbound propagation for agent-to-agent flows.

### Authentication events

Every inbound request to `/tasks` emits exactly one of `auth_verify` or `auth_fail`.
Expand Down
2 changes: 2 additions & 0 deletions docs/security/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ All runtime security events are emitted as structured NDJSON to stderr with corr

The `source` field distinguishes in-process enforcer events from subprocess proxy events.

When the inbound A2A request carries orchestrator headers, events are additionally tagged with `workflow_id` / `stage_id` / `step_id` / `invocation_caller`. See [Workflow Correlation IDs](workflow-correlation.md) for the header contract and outbound propagation rules.

---

## Container Security
Expand Down
99 changes: 99 additions & 0 deletions docs/security/workflow-correlation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
title: "Workflow Correlation IDs"
description: "Orchestrator workflow / stage / step / invocation-caller identifiers threaded from inbound A2A headers through context.Context onto every audit event."
order: 7
---

# Workflow correlation IDs

Forge agents accept orchestration headers on every inbound A2A call and tag every audit event emitted during that invocation with the matching workflow / stage / step / invocation-caller identifiers. This lets a platform orchestrator (initializ Command, or any A2A-compatible orchestrator) correlate audit events across the multiple agents that participate in one workflow run.

Direct A2A invocations (no orchestrator) leave the fields unset — emitted JSON matches the pre-correlation shape exactly, so existing audit consumers keep working.

## Headers

| Header | Audit field | Identifies |
|---|---|---|
| `X-Workflow-ID` | `workflow_id` | The orchestrator-level workflow run |
| `X-Workflow-Stage-ID` | `stage_id` | A stage within the workflow (a group of steps that may run in parallel) |
| `X-Workflow-Step-ID` | `step_id` | The specific step that invoked this agent |
| `X-Invocation-Caller` | `invocation_caller` | The upstream caller (orchestrator identity, or upstream agent in an agent-to-agent flow) |

All four are optional. Forge extracts whichever are present and leaves the rest empty. The header names are vendor-neutral by design — any A2A-compatible orchestrator (initializ Command, custom registries, third-party platforms) can drive Forge's correlation surface without adopting a vendor prefix.

## How it flows

```
Inbound request
A2A dispatcher (forge-cli/server/a2a_server.go)
├─ runtime.WorkflowContextFromHTTPHeaders(r.Header)
└─ runtime.WithWorkflowContext(ctx, wc)
JSON-RPC / REST handler receives ctx with WorkflowContext baked in
auditLogger.EmitFromContext(ctx, event)
├─ reads WorkflowContext from ctx
└─ stamps workflow_id / stage_id / step_id / invocation_caller onto event
```

Every event from `session_start` through `session_end`, every `tool_exec` / `llm_call` / `egress_allowed` / `egress_blocked` emitted under that ctx carries the same workflow tags — letting an audit consumer reconstruct the full workflow-step → agent-execution → tool-call tree.

## Audit event shape

`session_start` from a workflow-orchestrated invocation:

```json
{
"ts": "2026-06-04T15:21:09Z",
"event": "session_start",
"correlation_id": "9b3d…",
"task_id": "task-42",
"workflow_id": "wf-deploy-prod-001",
"stage_id": "rollout",
"step_id": "canary-bake",
"invocation_caller": "initializ-orchestrator"
}
```

Same agent invoked directly (no orchestrator):

```json
{
"ts": "2026-06-04T15:21:09Z",
"event": "session_start",
"correlation_id": "9b3d…",
"task_id": "task-42"
}
```

Workflow fields are absent (`omitempty`) — byte-identical to pre-correlation audit consumers.

## Outbound propagation (agent-to-agent)

When a Forge agent calls another agent (or any peer), the workflow context is available via `runtime.WorkflowContextFromContext(ctx)`. To propagate, copy the headers onto the outbound request:

```go
wc := runtime.WorkflowContextFromContext(ctx)
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, peerURL, body)
wc.ApplyToHTTPHeaders(req.Header)
client.Do(req)
```

**Auto-propagation is deliberately off by default.** The `X-Workflow-*` / `X-Invocation-Caller` headers identify the workflow, so blanket-stamping them on every outbound HTTP request would leak workflow identity to third-party APIs (the egress proxy can't tell a peer agent from a vendor endpoint). Tools that know their target is a workflow peer call `ApplyToHTTPHeaders` explicitly.

## Backward compatibility

- Direct A2A invocations (no headers) → audit JSON byte-for-byte identical to pre-FWS-2.
- Existing emitters that construct `AuditEvent` literals and call `Emit(...)` continue to work unchanged.
- New `EmitFromContext(ctx, event)` is the per-request preferred path — only adds workflow fields when ctx carries a non-zero `WorkflowContext`; fields already set on the event take precedence over the ctx fallback.

## Where it's wired

| File | Role |
|---|---|
| `forge-core/runtime/workflow.go` | `WorkflowContext` type, `WithWorkflowContext` / `WorkflowContextFromContext` ctx helpers, `WorkflowContextFromHTTPHeaders` + `ApplyToHTTPHeaders` header marshalers |
| `forge-core/runtime/audit.go` | `AuditEvent` extended with `workflow_id` / `stage_id` / `step_id` / `invocation_caller` fields; `AuditLogger.EmitFromContext` auto-tags from ctx |
| `forge-cli/server/a2a_server.go` | JSON-RPC dispatcher extracts headers at boundary and injects WorkflowContext into ctx |
| `forge-cli/runtime/runner.go` | REST `POST /tasks/send` + `POST /tasks/sendSubscribe` handlers extract headers; in-request audit emit sites migrated to `EmitFromContext` |
| auth audit callback | Pulls headers directly from `req.Header` (runs before the dispatcher in middleware order) and stamps the four fields onto `auth_verify` / `auth_fail` events |
62 changes: 42 additions & 20 deletions forge-cli/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (r *Runner) Run(ctx context.Context) error {
if !allowed {
event = coreruntime.AuditEgressBlocked
}
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: event,
CorrelationID: coreruntime.CorrelationIDFromContext(ctx),
TaskID: coreruntime.TaskIDFromContext(ctx),
Expand Down Expand Up @@ -806,7 +806,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
ctx = coreruntime.WithCorrelationID(ctx, correlationID)
ctx = coreruntime.WithTaskID(ctx, params.ID)

auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionStart,
CorrelationID: correlationID,
TaskID: params.ID,
Expand All @@ -830,7 +830,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
},
}
store.Put(task)
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -858,7 +858,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
},
}
store.Put(task)
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand All @@ -878,7 +878,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
},
}
store.Put(task)
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -907,7 +907,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
}
}
store.Put(task)
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand All @@ -933,7 +933,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
ctx = coreruntime.WithCorrelationID(ctx, correlationID)
ctx = coreruntime.WithTaskID(ctx, params.ID)

auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionStart,
CorrelationID: correlationID,
TaskID: params.ID,
Expand All @@ -959,7 +959,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
}
store.Put(task)
server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -1007,7 +1007,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
}
store.Put(task)
server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent
finalState = a2a.TaskStateCompleted
}

auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -1238,7 +1238,13 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A
Message: body.Task.Message,
}

task, err := r.executeTask(req.Context(), params, store, executor, guardrails, egressClient, auditLogger)
// Pull workflow correlation headers (issue #86) so audit
// events tagged via EmitFromContext carry the orchestrator's
// workflow/stage/step identifiers. Absent headers → IsZero
// WorkflowContext → fields omitted (backward compat).
ctx := coreruntime.WithWorkflowContext(req.Context(),
coreruntime.WorkflowContextFromHTTPHeaders(req.Header))
task, err := r.executeTask(ctx, params, store, executor, guardrails, egressClient, auditLogger)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
Expand Down Expand Up @@ -1276,8 +1282,10 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A
ctx := security.WithEgressClient(req.Context(), egressClient)
ctx = coreruntime.WithCorrelationID(ctx, correlationID)
ctx = coreruntime.WithTaskID(ctx, params.ID)
ctx = coreruntime.WithWorkflowContext(ctx,
coreruntime.WorkflowContextFromHTTPHeaders(req.Header))

auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionStart,
CorrelationID: correlationID,
TaskID: params.ID,
Expand All @@ -1301,7 +1309,7 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A
}
store.Put(task)
server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -1344,7 +1352,7 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A
}
store.Put(task)
server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck
auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -1385,7 +1393,7 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A
finalState = a2a.TaskStateCompleted
}

auditLogger.Emit(coreruntime.AuditEvent{
auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{
Event: coreruntime.AuditSessionEnd,
CorrelationID: correlationID,
TaskID: params.ID,
Expand Down Expand Up @@ -1965,6 +1973,12 @@ func makeAuthAuditCallback(auditLogger *coreruntime.AuditLogger) func(*http.Requ
}
return func(req *http.Request, id *auth.Identity, err error, tokenKind string) {
correlationID := coreruntime.CorrelationIDFromContext(req.Context())
// Auth middleware runs BEFORE handleJSONRPC has had a chance to
// extract workflow correlation headers into ctx. Pull them
// directly from req.Header here so auth events still carry
// workflow tags. Empty when the orchestrator didn't send them
// — fields then omit (backward compat).
wc := coreruntime.WorkflowContextFromHTTPHeaders(req.Header)

if err == nil && id != nil {
// Success → auth_verify.
Expand All @@ -1979,17 +1993,25 @@ func makeAuthAuditCallback(auditLogger *coreruntime.AuditLogger) func(*http.Requ
"remote_addr": req.RemoteAddr,
}
auditLogger.Emit(coreruntime.AuditEvent{
Event: coreruntime.EventAuthVerify,
CorrelationID: correlationID,
Fields: fields,
Event: coreruntime.EventAuthVerify,
CorrelationID: correlationID,
WorkflowID: wc.WorkflowID,
StageID: wc.StageID,
StepID: wc.StepID,
InvocationCaller: wc.InvocationCaller,
Fields: fields,
})
return
}

// Failure → auth_fail with reason code.
auditLogger.Emit(coreruntime.AuditEvent{
Event: coreruntime.EventAuthFail,
CorrelationID: correlationID,
Event: coreruntime.EventAuthFail,
CorrelationID: correlationID,
WorkflowID: wc.WorkflowID,
StageID: wc.StageID,
StepID: wc.StepID,
InvocationCaller: wc.InvocationCaller,
Fields: map[string]any{
"reason": authFailReason(err),
"token_kind": tokenKind,
Expand Down
14 changes: 12 additions & 2 deletions forge-cli/server/a2a_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/initializ/forge/forge-core/a2a"
coreruntime "github.com/initializ/forge/forge-core/runtime"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -255,6 +256,15 @@ func (s *Server) handleJSONRPC(w http.ResponseWriter, r *http.Request) {
return
}

// Extract initializ orchestration headers (issue #86 / FWS-2) ONCE
// at the dispatch boundary so every downstream handler sees the
// same WorkflowContext via ctx without having to parse headers
// itself. Absent headers produce an IsZero WorkflowContext —
// audit events then omit the workflow fields, matching pre-FWS-2
// shape (backward compatible).
ctx := coreruntime.WithWorkflowContext(r.Context(),
coreruntime.WorkflowContextFromHTTPHeaders(r.Header))

// Check SSE handlers first (for streaming methods)
if h, ok := s.sseHandlers[req.Method]; ok {
flusher, ok := w.(http.Flusher)
Expand All @@ -265,13 +275,13 @@ func (s *Server) handleJSONRPC(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
h(r.Context(), req.ID, req.Params, w, flusher)
h(ctx, req.ID, req.Params, w, flusher)
return
}

// Check regular handlers
if h, ok := s.handlers[req.Method]; ok {
resp := h(r.Context(), req.ID, req.Params)
resp := h(ctx, req.ID, req.Params)
writeJSON(w, http.StatusOK, resp)
return
}
Expand Down
Loading
Loading