diff --git a/CHANGELOG.md b/CHANGELOG.md index efa5a44..9959f87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,24 @@ ### Added +- **Workflow correlation ID threading (issue #86, FWS-2).** Forge agents + now extract orchestration headers — `X-Workflow-ID`, + `X-Workflow-Stage-ID`, `X-Workflow-Step-ID`, `X-Invocation-Caller` — + at the A2A dispatch boundary (JSON-RPC + REST handlers) and inject + them into `context.Context` as + a `WorkflowContext` value. Every audit event emitted during the + invocation is then auto-tagged via a new `AuditLogger.EmitFromContext` + with the matching `workflow_id` / `stage_id` / `step_id` / + `invocation_caller` fields, letting audit consumers correlate events + across multiple agents participating in one workflow run. Direct A2A + invocations (no orchestrator headers) leave the fields unset — + emitted JSON is byte-for-byte identical to the pre-FWS-2 shape, so + existing audit consumers keep working. A + `WorkflowContext.ApplyToHTTPHeaders` helper is exposed for tools + that want to propagate the headers onto outbound agent-to-agent A2A + calls; auto-propagation is deliberately off by default to prevent + leaking workflow identity to third-party APIs. See + `docs/security/workflow-correlation.md`. - **A2A 0.3.0 Agent Card conformance (issue #85, FWS-1).** Forge now serves a spec-conformant Agent Card at the A2A 0.3.0 canonical path `/.well-known/agent-card.json`. The card carries every required A2A diff --git a/README.md b/README.md index 00c2bb2..8965f12 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ You write a `SKILL.md`. Forge compiles it into a secure, runnable agent with egr | [Secrets](docs/security/secret-management.md) | Encrypted secret management | | [Build Signing](docs/security/build-signing.md) | Ed25519 signing and verification | | [Guardrails](docs/security/guardrails.md) | Content filtering and PII detection | +| [Workflow Correlation](docs/security/workflow-correlation.md) | Orchestrator correlation IDs threaded through audit events | ### Operations diff --git a/docs/security/audit-logging.md b/docs/security/audit-logging.md index bbe2981..5eb25d2 100644 --- a/docs/security/audit-logging.md +++ b/docs/security/audit-logging.md @@ -35,6 +35,10 @@ All runtime security events are emitted as structured NDJSON to stderr with corr The `source` field distinguishes in-process enforcer events from subprocess proxy events. +### Workflow correlation + +When the inbound A2A request carries the orchestrator's correlation headers (`X-Workflow-ID`, `X-Workflow-Stage-ID`, `X-Workflow-Step-ID`, `X-Invocation-Caller`), every audit event emitted during that invocation is tagged with the matching `workflow_id` / `stage_id` / `step_id` / `invocation_caller` fields. Header names are vendor-neutral so any A2A-compatible orchestrator can populate them. Direct A2A invocations (no orchestrator) omit the fields entirely — emitted JSON is byte-identical to the pre-correlation shape. See [Workflow correlation IDs](workflow-correlation.md) for the full reference, including outbound propagation for agent-to-agent flows. + ### Authentication events Every inbound request to `/tasks` emits exactly one of `auth_verify` or `auth_fail`. diff --git a/docs/security/overview.md b/docs/security/overview.md index a2e4515..6f27c8d 100644 --- a/docs/security/overview.md +++ b/docs/security/overview.md @@ -257,6 +257,8 @@ All runtime security events are emitted as structured NDJSON to stderr with corr The `source` field distinguishes in-process enforcer events from subprocess proxy events. +When the inbound A2A request carries orchestrator headers, events are additionally tagged with `workflow_id` / `stage_id` / `step_id` / `invocation_caller`. See [Workflow Correlation IDs](workflow-correlation.md) for the header contract and outbound propagation rules. + --- ## Container Security diff --git a/docs/security/workflow-correlation.md b/docs/security/workflow-correlation.md new file mode 100644 index 0000000..a191f67 --- /dev/null +++ b/docs/security/workflow-correlation.md @@ -0,0 +1,99 @@ +--- +title: "Workflow Correlation IDs" +description: "Orchestrator workflow / stage / step / invocation-caller identifiers threaded from inbound A2A headers through context.Context onto every audit event." +order: 7 +--- + +# Workflow correlation IDs + +Forge agents accept orchestration headers on every inbound A2A call and tag every audit event emitted during that invocation with the matching workflow / stage / step / invocation-caller identifiers. This lets a platform orchestrator (initializ Command, or any A2A-compatible orchestrator) correlate audit events across the multiple agents that participate in one workflow run. + +Direct A2A invocations (no orchestrator) leave the fields unset — emitted JSON matches the pre-correlation shape exactly, so existing audit consumers keep working. + +## Headers + +| Header | Audit field | Identifies | +|---|---|---| +| `X-Workflow-ID` | `workflow_id` | The orchestrator-level workflow run | +| `X-Workflow-Stage-ID` | `stage_id` | A stage within the workflow (a group of steps that may run in parallel) | +| `X-Workflow-Step-ID` | `step_id` | The specific step that invoked this agent | +| `X-Invocation-Caller` | `invocation_caller` | The upstream caller (orchestrator identity, or upstream agent in an agent-to-agent flow) | + +All four are optional. Forge extracts whichever are present and leaves the rest empty. The header names are vendor-neutral by design — any A2A-compatible orchestrator (initializ Command, custom registries, third-party platforms) can drive Forge's correlation surface without adopting a vendor prefix. + +## How it flows + +``` +Inbound request + ↓ +A2A dispatcher (forge-cli/server/a2a_server.go) + ├─ runtime.WorkflowContextFromHTTPHeaders(r.Header) + └─ runtime.WithWorkflowContext(ctx, wc) + ↓ +JSON-RPC / REST handler receives ctx with WorkflowContext baked in + ↓ +auditLogger.EmitFromContext(ctx, event) + ├─ reads WorkflowContext from ctx + └─ stamps workflow_id / stage_id / step_id / invocation_caller onto event +``` + +Every event from `session_start` through `session_end`, every `tool_exec` / `llm_call` / `egress_allowed` / `egress_blocked` emitted under that ctx carries the same workflow tags — letting an audit consumer reconstruct the full workflow-step → agent-execution → tool-call tree. + +## Audit event shape + +`session_start` from a workflow-orchestrated invocation: + +```json +{ + "ts": "2026-06-04T15:21:09Z", + "event": "session_start", + "correlation_id": "9b3d…", + "task_id": "task-42", + "workflow_id": "wf-deploy-prod-001", + "stage_id": "rollout", + "step_id": "canary-bake", + "invocation_caller": "initializ-orchestrator" +} +``` + +Same agent invoked directly (no orchestrator): + +```json +{ + "ts": "2026-06-04T15:21:09Z", + "event": "session_start", + "correlation_id": "9b3d…", + "task_id": "task-42" +} +``` + +Workflow fields are absent (`omitempty`) — byte-identical to pre-correlation audit consumers. + +## Outbound propagation (agent-to-agent) + +When a Forge agent calls another agent (or any peer), the workflow context is available via `runtime.WorkflowContextFromContext(ctx)`. To propagate, copy the headers onto the outbound request: + +```go +wc := runtime.WorkflowContextFromContext(ctx) +req, _ := http.NewRequestWithContext(ctx, http.MethodPost, peerURL, body) +wc.ApplyToHTTPHeaders(req.Header) +client.Do(req) +``` + +**Auto-propagation is deliberately off by default.** The `X-Workflow-*` / `X-Invocation-Caller` headers identify the workflow, so blanket-stamping them on every outbound HTTP request would leak workflow identity to third-party APIs (the egress proxy can't tell a peer agent from a vendor endpoint). Tools that know their target is a workflow peer call `ApplyToHTTPHeaders` explicitly. + +## Backward compatibility + +- Direct A2A invocations (no headers) → audit JSON byte-for-byte identical to pre-FWS-2. +- Existing emitters that construct `AuditEvent` literals and call `Emit(...)` continue to work unchanged. +- New `EmitFromContext(ctx, event)` is the per-request preferred path — only adds workflow fields when ctx carries a non-zero `WorkflowContext`; fields already set on the event take precedence over the ctx fallback. + +## Where it's wired + +| File | Role | +|---|---| +| `forge-core/runtime/workflow.go` | `WorkflowContext` type, `WithWorkflowContext` / `WorkflowContextFromContext` ctx helpers, `WorkflowContextFromHTTPHeaders` + `ApplyToHTTPHeaders` header marshalers | +| `forge-core/runtime/audit.go` | `AuditEvent` extended with `workflow_id` / `stage_id` / `step_id` / `invocation_caller` fields; `AuditLogger.EmitFromContext` auto-tags from ctx | +| `forge-cli/server/a2a_server.go` | JSON-RPC dispatcher extracts headers at boundary and injects WorkflowContext into ctx | +| `forge-cli/runtime/runner.go` | REST `POST /tasks/send` + `POST /tasks/sendSubscribe` handlers extract headers; in-request audit emit sites migrated to `EmitFromContext` | +| auth audit callback | Pulls headers directly from `req.Header` (runs before the dispatcher in middleware order) and stamps the four fields onto `auth_verify` / `auth_fail` events | diff --git a/forge-cli/runtime/runner.go b/forge-cli/runtime/runner.go index bd20893..0b7e747 100644 --- a/forge-cli/runtime/runner.go +++ b/forge-cli/runtime/runner.go @@ -305,7 +305,7 @@ func (r *Runner) Run(ctx context.Context) error { if !allowed { event = coreruntime.AuditEgressBlocked } - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: event, CorrelationID: coreruntime.CorrelationIDFromContext(ctx), TaskID: coreruntime.TaskIDFromContext(ctx), @@ -806,7 +806,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent ctx = coreruntime.WithCorrelationID(ctx, correlationID) ctx = coreruntime.WithTaskID(ctx, params.ID) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionStart, CorrelationID: correlationID, TaskID: params.ID, @@ -830,7 +830,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent }, } store.Put(task) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -858,7 +858,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent }, } store.Put(task) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -878,7 +878,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent }, } store.Put(task) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -907,7 +907,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent } } store.Put(task) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -933,7 +933,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent ctx = coreruntime.WithCorrelationID(ctx, correlationID) ctx = coreruntime.WithTaskID(ctx, params.ID) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionStart, CorrelationID: correlationID, TaskID: params.ID, @@ -959,7 +959,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent } store.Put(task) server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -1007,7 +1007,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent } store.Put(task) server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -1052,7 +1052,7 @@ func (r *Runner) registerHandlers(srv *server.Server, executor coreruntime.Agent finalState = a2a.TaskStateCompleted } - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -1238,7 +1238,13 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A Message: body.Task.Message, } - task, err := r.executeTask(req.Context(), params, store, executor, guardrails, egressClient, auditLogger) + // Pull workflow correlation headers (issue #86) so audit + // events tagged via EmitFromContext carry the orchestrator's + // workflow/stage/step identifiers. Absent headers → IsZero + // WorkflowContext → fields omitted (backward compat). + ctx := coreruntime.WithWorkflowContext(req.Context(), + coreruntime.WorkflowContextFromHTTPHeaders(req.Header)) + task, err := r.executeTask(ctx, params, store, executor, guardrails, egressClient, auditLogger) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return @@ -1276,8 +1282,10 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A ctx := security.WithEgressClient(req.Context(), egressClient) ctx = coreruntime.WithCorrelationID(ctx, correlationID) ctx = coreruntime.WithTaskID(ctx, params.ID) + ctx = coreruntime.WithWorkflowContext(ctx, + coreruntime.WorkflowContextFromHTTPHeaders(req.Header)) - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionStart, CorrelationID: correlationID, TaskID: params.ID, @@ -1301,7 +1309,7 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A } store.Put(task) server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -1344,7 +1352,7 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A } store.Put(task) server.WriteSSEEvent(w, flusher, "status", task) //nolint:errcheck - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -1385,7 +1393,7 @@ func (r *Runner) registerRESTHandlers(srv *server.Server, executor coreruntime.A finalState = a2a.TaskStateCompleted } - auditLogger.Emit(coreruntime.AuditEvent{ + auditLogger.EmitFromContext(ctx, coreruntime.AuditEvent{ Event: coreruntime.AuditSessionEnd, CorrelationID: correlationID, TaskID: params.ID, @@ -1965,6 +1973,12 @@ func makeAuthAuditCallback(auditLogger *coreruntime.AuditLogger) func(*http.Requ } return func(req *http.Request, id *auth.Identity, err error, tokenKind string) { correlationID := coreruntime.CorrelationIDFromContext(req.Context()) + // Auth middleware runs BEFORE handleJSONRPC has had a chance to + // extract workflow correlation headers into ctx. Pull them + // directly from req.Header here so auth events still carry + // workflow tags. Empty when the orchestrator didn't send them + // — fields then omit (backward compat). + wc := coreruntime.WorkflowContextFromHTTPHeaders(req.Header) if err == nil && id != nil { // Success → auth_verify. @@ -1979,17 +1993,25 @@ func makeAuthAuditCallback(auditLogger *coreruntime.AuditLogger) func(*http.Requ "remote_addr": req.RemoteAddr, } auditLogger.Emit(coreruntime.AuditEvent{ - Event: coreruntime.EventAuthVerify, - CorrelationID: correlationID, - Fields: fields, + Event: coreruntime.EventAuthVerify, + CorrelationID: correlationID, + WorkflowID: wc.WorkflowID, + StageID: wc.StageID, + StepID: wc.StepID, + InvocationCaller: wc.InvocationCaller, + Fields: fields, }) return } // Failure → auth_fail with reason code. auditLogger.Emit(coreruntime.AuditEvent{ - Event: coreruntime.EventAuthFail, - CorrelationID: correlationID, + Event: coreruntime.EventAuthFail, + CorrelationID: correlationID, + WorkflowID: wc.WorkflowID, + StageID: wc.StageID, + StepID: wc.StepID, + InvocationCaller: wc.InvocationCaller, Fields: map[string]any{ "reason": authFailReason(err), "token_kind": tokenKind, diff --git a/forge-cli/server/a2a_server.go b/forge-cli/server/a2a_server.go index 81a9bc6..a389b0c 100644 --- a/forge-cli/server/a2a_server.go +++ b/forge-cli/server/a2a_server.go @@ -16,6 +16,7 @@ import ( "time" "github.com/initializ/forge/forge-core/a2a" + coreruntime "github.com/initializ/forge/forge-core/runtime" "golang.org/x/time/rate" ) @@ -255,6 +256,15 @@ func (s *Server) handleJSONRPC(w http.ResponseWriter, r *http.Request) { return } + // Extract initializ orchestration headers (issue #86 / FWS-2) ONCE + // at the dispatch boundary so every downstream handler sees the + // same WorkflowContext via ctx without having to parse headers + // itself. Absent headers produce an IsZero WorkflowContext — + // audit events then omit the workflow fields, matching pre-FWS-2 + // shape (backward compatible). + ctx := coreruntime.WithWorkflowContext(r.Context(), + coreruntime.WorkflowContextFromHTTPHeaders(r.Header)) + // Check SSE handlers first (for streaming methods) if h, ok := s.sseHandlers[req.Method]; ok { flusher, ok := w.(http.Flusher) @@ -265,13 +275,13 @@ func (s *Server) handleJSONRPC(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - h(r.Context(), req.ID, req.Params, w, flusher) + h(ctx, req.ID, req.Params, w, flusher) return } // Check regular handlers if h, ok := s.handlers[req.Method]; ok { - resp := h(r.Context(), req.ID, req.Params) + resp := h(ctx, req.ID, req.Params) writeJSON(w, http.StatusOK, resp) return } diff --git a/forge-cli/server/a2a_server_workflow_test.go b/forge-cli/server/a2a_server_workflow_test.go new file mode 100644 index 0000000..d5044f1 --- /dev/null +++ b/forge-cli/server/a2a_server_workflow_test.go @@ -0,0 +1,163 @@ +package server + +import ( + "bytes" + "context" + "encoding/json" + "net" + "net/http" + "testing" + "time" + + "github.com/initializ/forge/forge-core/a2a" + coreruntime "github.com/initializ/forge/forge-core/runtime" +) + +// Regression test for issue #86 / FWS-2: the JSON-RPC dispatcher must +// extract X-Workflow-* / X-Invocation-Caller correlation headers and +// inject the resulting WorkflowContext into the ctx passed to method +// handlers. + +func TestHandleJSONRPC_ExtractsWorkflowHeadersIntoContext(t *testing.T) { + // Pick a free port deterministically (httptest doesn't help here + // because we need the real Server's dispatcher). + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + port := lis.Addr().(*net.TCPAddr).Port + _ = lis.Close() + + srv := NewServer(ServerConfig{ + Port: port, + Host: "127.0.0.1", + ShutdownTimeout: 1 * time.Second, + AgentCard: &a2a.AgentCard{ + Name: "test", URL: "http://x", Version: "0.1.0", ProtocolVersion: "0.3.0", + }, + }) + + // Capture the ctx the handler receives so the test can inspect it. + var capturedCtx context.Context + srv.RegisterHandler("test/echo", func(ctx context.Context, id any, params json.RawMessage) *a2a.JSONRPCResponse { + capturedCtx = ctx + return a2a.NewResponse(id, map[string]string{"ok": "1"}) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = srv.Start(ctx) }() + + // Wait for listener. + addr := "127.0.0.1:" + itoaShim(port) + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond); err == nil { + _ = c.Close() + break + } + time.Sleep(20 * time.Millisecond) + } + + body, _ := json.Marshal(a2a.JSONRPCRequest{ + JSONRPC: "2.0", + Method: "test/echo", + Params: json.RawMessage(`{}`), + ID: "1", + }) + req, _ := http.NewRequest(http.MethodPost, "http://"+addr+"/", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(coreruntime.HeaderWorkflowID, "wf-orchestrated") + req.Header.Set(coreruntime.HeaderWorkflowStageID, "stage-deploy") + req.Header.Set(coreruntime.HeaderWorkflowStepID, "step-3") + req.Header.Set(coreruntime.HeaderInvocationCaller, "initializ-orchestrator") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("POST: %v", err) + } + _ = resp.Body.Close() + + if capturedCtx == nil { + t.Fatal("handler was not invoked") + } + wc := coreruntime.WorkflowContextFromContext(capturedCtx) + if wc.WorkflowID != "wf-orchestrated" { + t.Errorf("WorkflowID = %q, want wf-orchestrated", wc.WorkflowID) + } + if wc.StageID != "stage-deploy" || wc.StepID != "step-3" || wc.InvocationCaller != "initializ-orchestrator" { + t.Errorf("WorkflowContext fields = %+v, want all four populated from headers", wc) + } +} + +func TestHandleJSONRPC_MissingHeadersYieldZeroWorkflowContext(t *testing.T) { + // Backward compat: direct A2A invocation (no headers) must produce + // an IsZero WorkflowContext — audit events then omit the workflow + // fields entirely, matching pre-FWS-2 consumers' expectations. + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + port := lis.Addr().(*net.TCPAddr).Port + _ = lis.Close() + + srv := NewServer(ServerConfig{ + Port: port, Host: "127.0.0.1", ShutdownTimeout: 1 * time.Second, + AgentCard: &a2a.AgentCard{ + Name: "test", URL: "http://x", Version: "0.1.0", ProtocolVersion: "0.3.0", + }, + }) + + var capturedCtx context.Context + srv.RegisterHandler("test/echo", func(ctx context.Context, id any, params json.RawMessage) *a2a.JSONRPCResponse { + capturedCtx = ctx + return a2a.NewResponse(id, map[string]string{"ok": "1"}) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = srv.Start(ctx) }() + + addr := "127.0.0.1:" + itoaShim(port) + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond); err == nil { + _ = c.Close() + break + } + time.Sleep(20 * time.Millisecond) + } + + body, _ := json.Marshal(a2a.JSONRPCRequest{ + JSONRPC: "2.0", Method: "test/echo", Params: json.RawMessage(`{}`), ID: "1", + }) + req, _ := http.NewRequest(http.MethodPost, "http://"+addr+"/", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + // No X-Workflow-* headers. + + resp, _ := http.DefaultClient.Do(req) + _ = resp.Body.Close() + + if capturedCtx == nil { + t.Fatal("handler was not invoked") + } + wc := coreruntime.WorkflowContextFromContext(capturedCtx) + if !wc.IsZero() { + t.Errorf("no headers should yield IsZero WorkflowContext, got %+v", wc) + } +} + +// itoaShim avoids depending on the package's other test helper. +func itoaShim(n int) string { + if n == 0 { + return "0" + } + var buf [10]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + return string(buf[i:]) +} diff --git a/forge-core/runtime/audit.go b/forge-core/runtime/audit.go index c7fc3c5..0eb9a39 100644 --- a/forge-core/runtime/audit.go +++ b/forge-core/runtime/audit.go @@ -60,12 +60,40 @@ const ( ) // AuditEvent is a single structured audit record emitted as NDJSON. +// +// Workflow correlation fields (WorkflowID, StageID, StepID, +// InvocationCaller) are tagged onto every event emitted via +// EmitFromContext when the request carries `X-Workflow-*` / +// `X-Invocation-Caller` headers from any A2A-compatible orchestrator. +// Direct A2A invocations omit them entirely so the JSON shape matches +// the pre-FWS-2 audit consumers. type AuditEvent struct { - Timestamp string `json:"ts"` - Event string `json:"event"` - CorrelationID string `json:"correlation_id,omitempty"` - TaskID string `json:"task_id,omitempty"` - Fields map[string]any `json:"fields,omitempty"` + Timestamp string `json:"ts"` + Event string `json:"event"` + + // CorrelationID groups events from a single agent invocation — + // generated by the A2A handler at request entry. + CorrelationID string `json:"correlation_id,omitempty"` + + // TaskID is the A2A task identifier (params.id on tasks/send). + TaskID string `json:"task_id,omitempty"` + + // WorkflowID identifies the orchestrator-level workflow run that + // invoked this agent. Sourced from X-Workflow-ID at request entry; + // absent for direct A2A invocations. + WorkflowID string `json:"workflow_id,omitempty"` + + // StageID identifies the workflow stage that invoked this agent. + StageID string `json:"stage_id,omitempty"` + + // StepID identifies the workflow step that invoked this agent. + StepID string `json:"step_id,omitempty"` + + // InvocationCaller identifies the upstream caller (orchestrator + // or upstream agent in an agent-to-agent flow). + InvocationCaller string `json:"invocation_caller,omitempty"` + + Fields map[string]any `json:"fields,omitempty"` } // AuditLogger writes structured NDJSON audit events to an io.Writer. @@ -81,6 +109,13 @@ func NewAuditLogger(w io.Writer) *AuditLogger { // Emit writes an audit event as a single NDJSON line. If Timestamp is empty // it is set to the current time in RFC3339 format. +// +// Callers that have a request context.Context in scope should prefer +// EmitFromContext, which automatically tags CorrelationID, TaskID, and +// the workflow-correlation fields (WorkflowID, StageID, StepID, +// InvocationCaller) from the context. Emit is kept for the few sites +// that emit outside a request scope (e.g. agent_card_published at +// startup). func (a *AuditLogger) Emit(event AuditEvent) { if event.Timestamp == "" { event.Timestamp = time.Now().UTC().Format(time.RFC3339) @@ -96,6 +131,37 @@ func (a *AuditLogger) Emit(event AuditEvent) { a.mu.Unlock() } +// EmitFromContext writes an audit event after auto-tagging +// CorrelationID, TaskID, and workflow-correlation fields from the +// request context. Fields already set on the passed event are +// preserved — the context is a fallback, not an override. This makes +// it safe to migrate callers from Emit to EmitFromContext: any +// already-explicit value continues to win. +func (a *AuditLogger) EmitFromContext(ctx context.Context, event AuditEvent) { + if event.CorrelationID == "" { + event.CorrelationID = CorrelationIDFromContext(ctx) + } + if event.TaskID == "" { + event.TaskID = TaskIDFromContext(ctx) + } + if event.WorkflowID == "" || event.StageID == "" || event.StepID == "" || event.InvocationCaller == "" { + wc := WorkflowContextFromContext(ctx) + if event.WorkflowID == "" { + event.WorkflowID = wc.WorkflowID + } + if event.StageID == "" { + event.StageID = wc.StageID + } + if event.StepID == "" { + event.StepID = wc.StepID + } + if event.InvocationCaller == "" { + event.InvocationCaller = wc.InvocationCaller + } + } + a.Emit(event) +} + // Context key types for correlation IDs, task IDs, and file directories. type correlationIDKey struct{} type taskIDKey struct{} diff --git a/forge-core/runtime/audit_workflow_test.go b/forge-core/runtime/audit_workflow_test.go new file mode 100644 index 0000000..4c96bbc --- /dev/null +++ b/forge-core/runtime/audit_workflow_test.go @@ -0,0 +1,141 @@ +package runtime + +import ( + "bytes" + "context" + "encoding/json" + "strings" + "testing" +) + +// Regression tests for issue #86 / FWS-2 — EmitFromContext auto-tags +// the four workflow correlation fields on every audit event when a +// WorkflowContext is in the request ctx. Absence is the backward-compat +// case: fields are omitted from the JSON entirely. + +func TestEmitFromContext_TagsWorkflowFieldsWhenContextHasThem(t *testing.T) { + var buf bytes.Buffer + audit := NewAuditLogger(&buf) + + ctx := WithWorkflowContext(context.Background(), WorkflowContext{ + WorkflowID: "wf-100", + StageID: "stage-A", + StepID: "step-1", + InvocationCaller: "orchestrator", + }) + + audit.EmitFromContext(ctx, AuditEvent{Event: "tool_exec"}) + + var got AuditEvent + if err := json.Unmarshal(bytes.TrimSpace(buf.Bytes()), &got); err != nil { + t.Fatalf("decode: %v\n%s", err, buf.String()) + } + if got.WorkflowID != "wf-100" || got.StageID != "stage-A" || got.StepID != "step-1" || got.InvocationCaller != "orchestrator" { + t.Errorf("workflow fields not tagged: %+v", got) + } +} + +func TestEmitFromContext_OmitsWorkflowFieldsWhenContextEmpty(t *testing.T) { + // Backward compatibility: direct A2A invocation (no orchestrator + // headers) must produce audit JSON without workflow_id / + // stage_id / step_id / invocation_caller fields at all. + var buf bytes.Buffer + audit := NewAuditLogger(&buf) + + audit.EmitFromContext(context.Background(), AuditEvent{Event: "session_start"}) + + got := buf.String() + for _, forbidden := range []string{`"workflow_id"`, `"stage_id"`, `"step_id"`, `"invocation_caller"`} { + if strings.Contains(got, forbidden) { + t.Errorf("empty ctx should omit %s from JSON, got:\n%s", forbidden, got) + } + } +} + +func TestEmitFromContext_ExplicitFieldsTakePrecedenceOverContext(t *testing.T) { + // EmitFromContext is a fallback: any field already set on the + // AuditEvent literal wins. Lets callers stamp authoritative + // values when needed (e.g. cross-cutting events that span multiple + // invocation ctxs). + var buf bytes.Buffer + audit := NewAuditLogger(&buf) + + ctx := WithWorkflowContext(context.Background(), WorkflowContext{ + WorkflowID: "from-ctx", + }) + audit.EmitFromContext(ctx, AuditEvent{ + Event: "tool_exec", + WorkflowID: "explicit-wins", + }) + + var got AuditEvent + if err := json.Unmarshal(bytes.TrimSpace(buf.Bytes()), &got); err != nil { + t.Fatalf("decode: %v", err) + } + if got.WorkflowID != "explicit-wins" { + t.Errorf("explicit WorkflowID should win over ctx, got %q", got.WorkflowID) + } +} + +func TestEmitFromContext_AlsoTagsCorrelationAndTaskID(t *testing.T) { + // EmitFromContext is the unified path for ctx-derived tagging — + // it must cover CorrelationID + TaskID alongside the workflow + // fields, mirroring the contract handlers expect. + var buf bytes.Buffer + audit := NewAuditLogger(&buf) + + ctx := WithCorrelationID(context.Background(), "corr-X") + ctx = WithTaskID(ctx, "task-Y") + ctx = WithWorkflowContext(ctx, WorkflowContext{WorkflowID: "wf-Z"}) + + audit.EmitFromContext(ctx, AuditEvent{Event: "session_start"}) + + var got AuditEvent + _ = json.Unmarshal(bytes.TrimSpace(buf.Bytes()), &got) + if got.CorrelationID != "corr-X" || got.TaskID != "task-Y" || got.WorkflowID != "wf-Z" { + t.Errorf("expected all three ctx-derived fields tagged, got %+v", got) + } +} + +func TestEmitFromContext_PartialWorkflowContextOmitsAbsentFields(t *testing.T) { + // Operator might forward only the workflow ID. The remaining + // fields must stay absent from the JSON (omitempty). + var buf bytes.Buffer + audit := NewAuditLogger(&buf) + + ctx := WithWorkflowContext(context.Background(), WorkflowContext{ + WorkflowID: "wf-only", + }) + audit.EmitFromContext(ctx, AuditEvent{Event: "tool_exec"}) + + js := buf.String() + if !strings.Contains(js, `"workflow_id":"wf-only"`) { + t.Errorf("WorkflowID should be present in JSON, got:\n%s", js) + } + if strings.Contains(js, `"stage_id"`) || strings.Contains(js, `"step_id"`) || strings.Contains(js, `"invocation_caller"`) { + t.Errorf("empty workflow sub-fields should be omitted, got:\n%s", js) + } +} + +func TestEmit_DirectPathStillWorksWithoutAnyCtx(t *testing.T) { + // The classic Emit (no ctx) keeps working — pre-FWS-2 callers + // continue to function identically. + var buf bytes.Buffer + audit := NewAuditLogger(&buf) + + audit.Emit(AuditEvent{ + Event: "session_start", + CorrelationID: "explicit", + TaskID: "explicit", + }) + + var got AuditEvent + _ = json.Unmarshal(bytes.TrimSpace(buf.Bytes()), &got) + if got.CorrelationID != "explicit" || got.TaskID != "explicit" { + t.Errorf("Emit should pass through explicit fields, got %+v", got) + } + // Workflow fields should be absent (Emit doesn't touch ctx). + if got.WorkflowID != "" { + t.Errorf("Emit must not pull from ctx; WorkflowID should be empty") + } +} diff --git a/forge-core/runtime/workflow.go b/forge-core/runtime/workflow.go new file mode 100644 index 0000000..89b2dba --- /dev/null +++ b/forge-core/runtime/workflow.go @@ -0,0 +1,117 @@ +package runtime + +import ( + "context" + "net/http" +) + +// Workflow correlation header names (issue #86 / FWS-2). Sent by any +// A2A-compatible orchestrator on every request that's part of a +// workflow execution. Header names are deliberately vendor-neutral so +// any orchestrator (initializ Command, custom registries, third-party +// platforms) can drive Forge's correlation surface without adopting a +// vendor prefix. Forge agents extract them at the request boundary, +// stash them in context.Context, and tag every audit event with the +// matching workflow / stage / step identifiers so audit consumers can +// correlate events across multiple agents participating in the same +// workflow. +// +// Absence of these headers is the normal case for direct A2A +// invocations (e.g. local development, peer agents not orchestrated). +// When absent, audit events emit without the workflow fields — full +// backward compatibility with pre-FWS-2 audit consumers. +const ( + HeaderWorkflowID = "X-Workflow-ID" + HeaderWorkflowStageID = "X-Workflow-Stage-ID" + HeaderWorkflowStepID = "X-Workflow-Step-ID" + HeaderInvocationCaller = "X-Invocation-Caller" +) + +// WorkflowContext carries the orchestration identifiers a Forge agent +// extracts from inbound A2A request headers. Zero value is meaningful +// — it represents "no workflow context" (direct A2A invocation). +type WorkflowContext struct { + // WorkflowID identifies the orchestrator-level workflow run. + WorkflowID string + + // StageID identifies a stage within the workflow (a group of + // steps that may run in parallel). + StageID string + + // StepID identifies the specific step within the stage that + // invoked this agent. + StepID string + + // InvocationCaller identifies the upstream caller — typically the + // orchestrator's identity, but for agent-to-agent calls within a + // workflow it carries the upstream agent's identifier. + InvocationCaller string +} + +// IsZero reports whether the WorkflowContext carries no orchestration +// identifiers. Used by audit and helpers to decide whether to stamp +// workflow fields (when zero, fields are omitted entirely so the +// emitted JSON matches the pre-FWS-2 shape). +func (w WorkflowContext) IsZero() bool { + return w.WorkflowID == "" && + w.StageID == "" && + w.StepID == "" && + w.InvocationCaller == "" +} + +// WorkflowContextFromHTTPHeaders extracts the orchestration identifiers +// from an inbound HTTP request's headers. Missing headers map to empty +// fields; the returned WorkflowContext is `IsZero` when none are set. +func WorkflowContextFromHTTPHeaders(h http.Header) WorkflowContext { + return WorkflowContext{ + WorkflowID: h.Get(HeaderWorkflowID), + StageID: h.Get(HeaderWorkflowStageID), + StepID: h.Get(HeaderWorkflowStepID), + InvocationCaller: h.Get(HeaderInvocationCaller), + } +} + +// ApplyToHTTPHeaders writes any non-empty WorkflowContext fields onto +// outbound request headers. Used by tools that explicitly propagate +// workflow context to downstream A2A calls (the issue's "agent +// invoking another agent during workflow execution" path). +// +// Auto-propagation is deliberately not built into the egress proxy — +// the X-Workflow-* headers identify the workflow and would leak if +// the agent calls a non-workflow third-party API. Tools propagate +// explicitly when they know the target is a workflow peer. +func (w WorkflowContext) ApplyToHTTPHeaders(h http.Header) { + if w.WorkflowID != "" { + h.Set(HeaderWorkflowID, w.WorkflowID) + } + if w.StageID != "" { + h.Set(HeaderWorkflowStageID, w.StageID) + } + if w.StepID != "" { + h.Set(HeaderWorkflowStepID, w.StepID) + } + if w.InvocationCaller != "" { + h.Set(HeaderInvocationCaller, w.InvocationCaller) + } +} + +// Context key for the WorkflowContext. Kept unexported — callers go +// through WithWorkflowContext / WorkflowContextFromContext so the +// key type can never collide with another package's context key. +type workflowContextKey struct{} + +// WithWorkflowContext stores a WorkflowContext in the request context. +// Mirrors the WithCorrelationID / WithTaskID pattern already used by +// the audit layer. +func WithWorkflowContext(ctx context.Context, w WorkflowContext) context.Context { + return context.WithValue(ctx, workflowContextKey{}, w) +} + +// WorkflowContextFromContext retrieves the WorkflowContext from the +// context. Returns the zero value (IsZero == true) when none was set. +func WorkflowContextFromContext(ctx context.Context) WorkflowContext { + if w, ok := ctx.Value(workflowContextKey{}).(WorkflowContext); ok { + return w + } + return WorkflowContext{} +} diff --git a/forge-core/runtime/workflow_test.go b/forge-core/runtime/workflow_test.go new file mode 100644 index 0000000..5b0e7c7 --- /dev/null +++ b/forge-core/runtime/workflow_test.go @@ -0,0 +1,151 @@ +package runtime + +import ( + "context" + "net/http" + "testing" +) + +// Regression tests for issue #86 / FWS-2 — Workflow correlation ID +// threading. The four orchestrator-supplied identifiers must flow +// through HTTP headers → context.Context → AuditEvent without loss, +// and audit events must omit the workflow fields entirely when no +// headers were sent (backward compat with pre-FWS-2 consumers). + +func TestWorkflowContext_IsZero(t *testing.T) { + if !(WorkflowContext{}).IsZero() { + t.Errorf("zero-value WorkflowContext should be IsZero=true") + } + if (WorkflowContext{WorkflowID: "wf-1"}).IsZero() { + t.Errorf("WorkflowContext with any non-empty field must NOT be IsZero") + } +} + +func TestWorkflowContextFromHTTPHeaders_ExtractsAllFour(t *testing.T) { + h := http.Header{} + h.Set(HeaderWorkflowID, "wf-42") + h.Set(HeaderWorkflowStageID, "stage-rollout") + h.Set(HeaderWorkflowStepID, "step-canary") + h.Set(HeaderInvocationCaller, "orchestrator") + + wc := WorkflowContextFromHTTPHeaders(h) + if wc.WorkflowID != "wf-42" || wc.StageID != "stage-rollout" || wc.StepID != "step-canary" || wc.InvocationCaller != "orchestrator" { + t.Errorf("WorkflowContext = %+v, want all four fields populated", wc) + } +} + +func TestWorkflowContextFromHTTPHeaders_MissingHeadersYieldZero(t *testing.T) { + wc := WorkflowContextFromHTTPHeaders(http.Header{}) + if !wc.IsZero() { + t.Errorf("missing headers should produce IsZero WorkflowContext, got %+v", wc) + } +} + +func TestWorkflowContextFromHTTPHeaders_PartialHeadersPropagateNonEmpty(t *testing.T) { + // Operator might supply only some headers (e.g. workflow without + // stage during pre-stage init). Partial extraction must work. + h := http.Header{} + h.Set(HeaderWorkflowID, "wf-only") + + wc := WorkflowContextFromHTTPHeaders(h) + if wc.IsZero() { + t.Errorf("partial headers should produce non-IsZero WorkflowContext") + } + if wc.WorkflowID != "wf-only" { + t.Errorf("WorkflowID = %q, want wf-only", wc.WorkflowID) + } + if wc.StageID != "" || wc.StepID != "" || wc.InvocationCaller != "" { + t.Errorf("missing fields should remain empty, got %+v", wc) + } +} + +func TestWorkflowContext_ContextRoundTrip(t *testing.T) { + wc := WorkflowContext{ + WorkflowID: "wf-1", + StageID: "stage-1", + StepID: "step-1", + InvocationCaller: "caller-1", + } + ctx := WithWorkflowContext(context.Background(), wc) + + got := WorkflowContextFromContext(ctx) + if got != wc { + t.Errorf("round-trip mismatch: got %+v, want %+v", got, wc) + } +} + +func TestWorkflowContextFromContext_MissingReturnsZero(t *testing.T) { + got := WorkflowContextFromContext(context.Background()) + if !got.IsZero() { + t.Errorf("missing context value should return IsZero, got %+v", got) + } +} + +func TestApplyToHTTPHeaders_WritesNonEmptyFields(t *testing.T) { + wc := WorkflowContext{ + WorkflowID: "wf-9", + StageID: "stage-9", + StepID: "step-9", + InvocationCaller: "agent-upstream", + } + h := http.Header{} + wc.ApplyToHTTPHeaders(h) + + if h.Get(HeaderWorkflowID) != "wf-9" { + t.Errorf("WorkflowID header = %q, want wf-9", h.Get(HeaderWorkflowID)) + } + if h.Get(HeaderWorkflowStageID) != "stage-9" { + t.Errorf("StageID header missing") + } + if h.Get(HeaderWorkflowStepID) != "step-9" { + t.Errorf("StepID header missing") + } + if h.Get(HeaderInvocationCaller) != "agent-upstream" { + t.Errorf("InvocationCaller header missing") + } +} + +func TestApplyToHTTPHeaders_OmitsEmptyFields(t *testing.T) { + // Workflow without stage — outbound headers should mirror what's + // set, not stamp empty values that downstream agents might + // misinterpret as "explicitly empty stage." + wc := WorkflowContext{WorkflowID: "wf-only"} + h := http.Header{} + wc.ApplyToHTTPHeaders(h) + + if h.Get(HeaderWorkflowID) != "wf-only" { + t.Errorf("WorkflowID should be set") + } + // Use Values() (not direct map access) — http.Header canonicalizes + // keys, so h[HeaderWorkflowStageID] would always be empty even + // when the canonical-cased key is present. + if len(h.Values(HeaderWorkflowStageID)) > 0 { + t.Errorf("StageID should NOT be set when empty, got %v", h) + } +} + +func TestRoundTripHTTPHeaders(t *testing.T) { + // Headers → ctx → headers round-trip preserves all fields. This + // is the contract initializ's orchestrator + Forge runner depend on. + in := http.Header{} + in.Set(HeaderWorkflowID, "wf") + in.Set(HeaderWorkflowStageID, "stg") + in.Set(HeaderWorkflowStepID, "stp") + in.Set(HeaderInvocationCaller, "ic") + + ctx := WithWorkflowContext(context.Background(), WorkflowContextFromHTTPHeaders(in)) + + out := http.Header{} + WorkflowContextFromContext(ctx).ApplyToHTTPHeaders(out) + + for _, k := range []string{ + HeaderWorkflowID, + HeaderWorkflowStageID, + HeaderWorkflowStepID, + HeaderInvocationCaller, + } { + if out.Get(k) != in.Get(k) { + t.Errorf("header %q round-trip mismatch: in=%q out=%q", k, in.Get(k), out.Get(k)) + } + } +}