Skip to content

FWS-7 — Audit event export capability (Unix Domain Socket sink + HTTP fallback) #95

@initializ-mk

Description

@initializ-mk

FWS-7 — Audit event export capability (Unix Domain Socket sink + HTTP fallback)

Part of the Forge backlog. Effort: 2–3 engineer-days. Risk: low. Depends on: nothing. Required dependency for FWS-8 (#91, audit hardening) — sequence numbers and payload-stripping need a real export path to consume them; this issue builds that path first.

What we are building

A new audit export capability that emits audit events to a local Unix Domain Socket (or localhost HTTP fallback) in addition to the existing NDJSON-to-stderr emission. This allows an initializ-deployed sidecar in the same pod to consume audit events with low latency, while preserving the existing stderr emission as a degraded-mode fallback.

This is a separate first-class export path from OTel traces. OTel traces (already being built) carry distributed tracing data for customer observability stacks. Audit export carries execution capture for the initializ platform — cost, policy decisions, egress decisions, compliance audit. Both pipelines share signal sources in Forge but emit independently. Do not couple this to OTel. When something interesting happens in Forge, the instrumentation should emit to OTel AND to audit at the same call site, not tap one from the other.

Why two paths

Audit cannot be sampled (every policy decision and cost-relevant event must land). OTel traces can be sampled. Audit needs separate retention from observability. Failure-domain isolation: if OTel export breaks, audit must continue, and vice versa. These are operational requirements; the architecture exists to serve them.

Stream design context (why audit is on stderr today, and what FWS-7 enables)

This issue's "stderr stays as default safety net" decision rests on the design rationale for putting audit NDJSON on stderr in the first place, and the operational gap that gap leaves until FWS-7 ships. Worth documenting so reviewers of the sink work understand why the default isn't moving to stdout.

Why stderr (today's design)

  1. Unix stream convention. stdout is the program's primary data; stderr is diagnostics / observability / out-of-band logs. Forge agents are mostly servers today, but the convention keeps the door open for pipe-friendly stdout — e.g., a future forge run --once mode that pipes the agent's response to stdout shouldn't be polluted by audit lines.
  2. Container log-collector stream separation. Docker, k8s, journald, Fluent Bit, Vector and similar tools capture stdout and stderr as distinct streams and route them differently. Putting audit on stderr lets operators send it to a SIEM pipeline while routing stdout elsewhere — purely by stream, without parsing the payload.
  3. Pre-FWS-7 durability. Until this issue lands UDS / HTTP sinks, stderr is the most reliable "the process is dying and I still need this record" channel: less buffered than stdout in many runtimes, captured-by-default by every supervisor, survives even when the agent's own log layer misbehaves. For cost-relevant audit data that has to land, that matters.
  4. Sink-agnostic API. coreruntime.NewAuditLogger(io.Writer) already takes any writer — stderr is just the opinionated default. Embedders can already redirect today via forge run 2> audit.ndjson or by supplying a custom writer to the runner.

Today's operational gap (this is what FWS-7 fixes)

Forge currently puts both ops logs (r.logger.Info(...) startup banners, request logs, error logs) and audit NDJSON on the same stderr stream. A SIEM pipeline that wants only audit records can't split by stream alone — it has to filter every line by the presence of the event JSON field. That works but adds parsing cost at the collector and risks accidental rule drift if ops logs ever start carrying an event field.

The reason this gap exists today: there's no separate audit sink, so multiplexing onto stderr is the only option.

The clean end-state after FWS-7

Once the socket / HTTP sink in this issue ships, the recommended operational shape is:

Stream Carries Consumer
UDS / HTTP sink (this issue) Audit NDJSON (primary) initializ platform sidecar / SIEM
stderr Audit NDJSON (degraded-mode fallback) + ops logs Container log collector / local debugging
stdout (currently unused; reserved for future pipe-friendly modes)

The dedicated sink is the SIEM pipeline; stderr stays as the safety net for "sidecar is down, but I still want the audit record to land somewhere." Both paths emit identical NDJSON for the audit fields — the sink does not transform events.

Companion follow-up (NOT in this issue's scope)

A cleaner-still end-state would route ops logger output to stdout when a dedicated audit sink is configured, leaving stderr as the audit-only fallback. That makes the stream split clean by collector convention without any payload parsing.

This is deliberately not in FWS-7's scope because:

  • It touches every r.logger.* call site (or the logger construction) across forge-cli, which is unrelated to the sink mechanics of this issue.
  • It can ship independently after FWS-7 — the sink work doesn't depend on it.
  • Mixing it into FWS-7's PR would muddle review.

Tracked as FWS-9 (#100) — "Move ops logger output from stderr to stdout (stream separation from audit)." Single-line change at forge-cli/runtime/runner.go:123, independent of FWS-7 in code.


Where this fits in the codebase

The existing audit infrastructure lives in forge-core/runtime/audit.go. Today, AuditLogger.Emit() produces NDJSON events with CorrelationID and TaskID from context. Events go to a single sink (stderr, configurable via JSONLogger). The work in this issue extends the sink layer to support multiple concurrent sinks — stderr stays as default; a new socket sink is added.

Critical: Do not change the audit event schema, the event types, or the AuditLogger.Emit() API. The audit event format is an external contract — consumers depend on it. This work is purely about adding a new sink target, not changing what gets emitted or how callers emit it.

Module boundary rules (per FORGE_PROJECT_DESIGN.md)

  • forge-core has no OS-specific dependencies. Unix socket support uses the net package's unix network type, which is available cross-platform in Go 1.25 (works on Linux and macOS; on Windows it returns a clear error at sink construction time, falling back to localhost HTTP).
  • forge-core does not import forge-cli. Wiring of the socket path from CLI flags/env vars happens in forge-cli/cmd/, then injects the sink configuration into the core audit logger.
  • No new external dependencies. Use the standard library only: net, bufio, encoding/json, sync, time, context.

Implementation requirements

1. Sink abstraction

Introduce a Sink interface in forge-core/runtime/audit.go:

// Sink consumes serialized audit event bytes. Implementations must be safe
// for concurrent use. Sinks should never block the emitter under back-pressure
// for longer than their configured timeout; on timeout the sink drops the
// event and increments its drop counter, never returns an error to the caller.
type Sink interface {
    // Write serializes and delivers a single event. The event is already
    // marshaled NDJSON (one line, no trailing newline added by caller).
    // Returns nil even on transient failure; sinks are responsible for
    // their own retry/buffering policy. Errors returned indicate a
    // permanent sink failure that should be logged once.
    Write(ctx context.Context, eventBytes []byte) error

    // Close flushes any buffered events and releases resources.
    // Called during agent shutdown.
    Close() error

    // Name returns a stable identifier for the sink, used in audit
    // self-reporting (e.g., "stderr", "unix-socket", "localhost-http").
    Name() string
}

The existing stderr emission becomes one Sink implementation (stderrSink). The new socket emission is another (socketSink). AuditLogger holds a slice of sinks and fans out each event to all of them. The fan-out is concurrent, but each sink's Write must be non-blocking from the caller's perspective — emitters should never wait on sink I/O.

2. stderrSink (existing behavior preserved)

Wrap the current stderr writing in a stderrSink type. Behavior unchanged from today. This sink is always registered by default. It is the safety net — if all other sinks fail, audit still lands in stderr where customer log infrastructure can pick it up.

3. socketSink (the new export path)

A sink that delivers events to a local Unix Domain Socket (preferred) or localhost HTTP endpoint (fallback). Behavior:

  • Connection lazy and reconnecting. Sink does not require the socket to exist at construction time. First Write attempts to dial; on failure, increments a counter and returns nil (drop). Subsequent writes retry the dial with exponential backoff (initial 100ms, max 5s).
  • Per-write timeout configurable. Default 50ms. If the socket write does not complete within the timeout, the event is dropped and the drop counter increments.
  • No buffering in the sink. Buffering is the sidecar's job. The sink is fire-and-forget: dial, write, return. This keeps the sink small, predictable, and ensures the slowness of any consumer (including a misbehaving sidecar) cannot back-pressure Forge.
  • Frame format. Each event is one line of NDJSON. Sink appends a single \n after the event bytes if not present.
  • Connection reuse. Sink maintains one persistent connection per backoff cycle; on EPIPE or write error, the connection is closed and the next write re-dials.
  • Concurrency safety. A mutex protects the connection state. Writes block briefly on the mutex; the mutex is held only for the duration of the write call (which has its own timeout).

4. Configuration surface

A new struct in forge-core/runtime:

type AuditExportConfig struct {
    // Path to the Unix Domain Socket. If empty, socket sink is not
    // registered. Default: empty (no socket sink in default config).
    SocketPath string

    // Fallback localhost HTTP endpoint. Used only when SocketPath is empty
    // AND HTTPEndpoint is set. Format: "http://127.0.0.1:9097/v1/audit".
    // POSTs each event as JSON to this endpoint.
    HTTPEndpoint string

    // WriteTimeout per event. Default: 50ms.
    WriteTimeout time.Duration

    // DialTimeout for initial connection. Default: 1s.
    DialTimeout time.Duration
}

In forge-cli/cmd/run.go (or wherever the agent runtime is wired today), add flags and env var bindings:

Flag Env var Purpose
--audit-socket FORGE_AUDIT_SOCKET Unix socket path
--audit-http-endpoint FORGE_AUDIT_HTTP_ENDPOINT Localhost HTTP fallback
--audit-write-timeout FORGE_AUDIT_WRITE_TIMEOUT Per-write timeout (Go duration)

If both --audit-socket and --audit-http-endpoint are set, socket wins.

Default behavior: No socket sink is registered. stderr emission continues as today. The socket sink is opt-in via configuration. This means existing deployments are unaffected; the initializ deploy receiver (separate work) injects the env vars when deploying agents under the initializ platform.

5. Sink registration and lifecycle

In forge-core/runtime, wherever AuditLogger is constructed:

// Registers stderr sink by default; socket sink if configured.
func NewAuditLogger(cfg AuditExportConfig) *AuditLogger {
    sinks := []Sink{newStderrSink()}
    if cfg.SocketPath != "" {
        sinks = append(sinks, newSocketSink(cfg.SocketPath, cfg.WriteTimeout, cfg.DialTimeout))
    } else if cfg.HTTPEndpoint != "" {
        sinks = append(sinks, newHTTPSink(cfg.HTTPEndpoint, cfg.WriteTimeout))
    }
    return &AuditLogger{sinks: sinks}
}

AuditLogger.Emit() writes the marshaled event to each sink. Errors from individual sinks are logged once per sink-lifetime via the JSONLogger (use the existing structured logger pattern) but do not propagate to the caller.

On agent shutdown, call Close() on each sink. Sinks should drain any pending in-flight writes within a 2-second deadline, then return.

6. Self-reporting metrics

Each sink tracks:

  • Events written successfully
  • Events dropped due to timeout
  • Events dropped due to dial failure
  • Current connection state (connected, disconnected, dialing)

Expose these via a Stats() map[string]int64 method on the sink. The AuditLogger aggregates across sinks. These stats are not emitted to the audit stream itself (would create feedback loops); instead they are exposed via the existing health/metrics endpoint pattern in forge-core/runtime.

Audit emission of sink health: emit a periodic audit_export_status audit event every 60 seconds with the current stats per sink. This is the one exception to the "stats not in audit" rule — operators need to see in the audit stream whether export is healthy.

What we are NOT building

Spelled out so there is no scope creep:

  • No sidecar implementation. The sidecar that consumes from this socket is a separate initializ-platform-side work item. This issue is only about Forge's emission side.
  • No buffering or retry beyond the next dial attempt. Durability is the sidecar's responsibility. Forge fires events; if the sink can't accept them in 50ms, they're dropped (and counted).
  • No transformation of audit events. Events leaving via the socket are byte-identical to events leaving via stderr. Same schema, same fields.
  • No OTel integration. This is a parallel path. If OTel work is in the same PR or adjacent, ensure that audit emission and OTel emission happen at the same call site (when the event occurs) — but they remain independent code paths.
  • No new audit event types. The schema does not change.
  • No authentication on the socket. The socket is in-pod, accessible only by containers sharing the pod's volume. Pod-level security is the trust boundary.
  • No persistent file writes from this sink. The socket sink is purely network I/O. PV-backed durability is the sidecar's job.

Testing requirements

All tests use stdlib testing (no testify per Forge conventions).

Unit tests

In forge-core/runtime/audit_test.go:

  1. stderrSink writes correctly. Capture stderr, emit event, verify byte content.
  2. socketSink dial success. Spin up a Unix socket listener in the test; emit event; verify it arrives on the listener side as expected NDJSON.
  3. socketSink dial failure. Configure a non-existent socket path; emit event; verify Write returns nil and drop counter increments.
  4. socketSink reconnects after EPIPE. Listener accepts, then closes; emitter writes; verify next write triggers a re-dial.
  5. socketSink timeout. Listener accepts but never reads; emitter writes; verify drop after timeout.
  6. HTTP sink writes correctly. Use httptest.NewServer; emit event; verify request body matches.
  7. Multi-sink fan-out. Configure both stderr and socket sinks; emit one event; verify it arrives on both.
  8. One sink failure does not affect others. Socket sink configured against non-existent path; stderr should still receive events normally.
  9. Stats accuracy. Emit N events successfully and M events with drop conditions; verify stats reflect.
  10. Concurrent emission. Use t.Parallel() and a sync.WaitGroup; emit from 100 goroutines simultaneously; verify all events arrive at sinks (use a counting listener).
  11. Close drains pending writes. Emit, immediately Close; verify the in-flight write was flushed or cleanly aborted within deadline.

Integration test

Add forge-cli/cmd/audit_export_integration_test.go:

  1. Start a forge agent process with --audit-socket=/tmp/forge-audit-test.sock.
  2. In the test, listen on that Unix socket before starting the agent.
  3. Trigger an A2A invocation that produces known audit events (e.g., a simple skill execution).
  4. Verify the expected sequence of audit events arrives at the socket listener.
  5. Verify the same events also arrive at stderr (parse the agent's stderr stream).

Performance test (benchmark)

Add forge-core/runtime/audit_bench_test.go:

  1. BenchmarkEmit_StderrOnly — current baseline.
  2. BenchmarkEmit_StderrAndSocket — measure overhead when both sinks are configured. Target: < 20% overhead per emit.
  3. BenchmarkEmit_SocketUnreachable — measure that dial failures do not slow emission significantly. Target: < 2x stderr-only when socket is unreachable (cached failure state should keep this fast after the first miss).

Conventions to follow (from FORGE_PROJECT_DESIGN.md)

  • Go 1.25. Use generics where they help; not where they don't.
  • context.Context everywhere. Every public function in this work takes ctx context.Context as first parameter, even if not currently used — for future cancellation and tracing.
  • Errors: fmt.Errorf("dialing audit socket %s: %w", path, err). Wrap with context. Never panic.
  • Structured logging: Use the existing JSONLogger for any operational logs from the sinks (dial errors, sink health). Audit events themselves go through the audit pipeline, not the operational logger.
  • No magic numbers in code. Timeouts and intervals are named constants or config fields.
  • Cross-platform. Unix socket code must compile on Windows (with a runtime error if the OS doesn't support unix sockets). Use build tags only if absolutely necessary.

Acceptance criteria

The PR is acceptable when:

  1. All listed tests pass on Linux and macOS.
  2. The existing audit test suite continues to pass without modification (proves the schema and existing behavior are preserved).
  3. A Forge agent started with --audit-socket=/tmp/forge-audit.sock, paired with a simple listener process, produces the expected audit events on the socket AND on stderr.
  4. A Forge agent started without the new flag behaves identically to today — no change in stderr output, no change in audit emission timing.
  5. Benchmarks show < 20% overhead for the dual-sink case.
  6. Documentation in forge-core/runtime/audit.go package comment explains the sink model and the configuration knobs.
  7. forge-cli/cmd/run.go (or equivalent) help text documents the new flags with example usage.

Files expected to change

Expected scope (one engineer, 2–3 days):

File Change
forge-core/runtime/audit.go Add Sink interface, refactor AuditLogger to fan out to sinks slice, preserve existing API
forge-core/runtime/audit_sink_stderr.go New file — extract current stderr emission as stderrSink
forge-core/runtime/audit_sink_socket.go New file — socketSink implementation
forge-core/runtime/audit_sink_http.go New file — httpSink fallback implementation
forge-core/runtime/audit_test.go Add tests per "Unit tests" section
forge-core/runtime/audit_bench_test.go New file — benchmarks
forge-cli/cmd/run.go (and serve.go if separate) Add --audit-socket, --audit-http-endpoint, --audit-write-timeout flags and env var bindings; wire into AuditExportConfig passed to NewAuditLogger
forge-cli/cmd/audit_export_integration_test.go New file — integration test
README.md or relevant Forge docs page One-paragraph mention of the audit export configuration, with example

Do not touch:

  • The audit event schema or any event type constants
  • Any consumer of audit events (no caller of AuditLogger.Emit() should change)
  • forge-ui (no UI changes for this work)
  • forge-plugins (no plugin changes for this work)
  • forge-skills (no skill changes for this work)

Anti-patterns to avoid

If you find yourself doing any of these, stop and reconsider:

  • Adding fields to audit events to indicate which sink they went through. No. Events are byte-identical across sinks.
  • Reading from the socket sink to confirm delivery. No. The sink is fire-and-forget; delivery confirmation is the sidecar's contract with its downstream.
  • Buffering events in the socket sink. No. Buffering is the sidecar's job. The sink's job is to push bytes onto the socket within timeout or drop.
  • Tapping OTel spans to derive audit events. No. Audit and OTel are parallel; they share call sites in instrumentation but emit independently.
  • Making the sink configurable per event type. No. Either a sink is configured (and gets all events) or it isn't.
  • Adding a "sink priority" or "primary sink" concept. No. All configured sinks are peers. Each handles its own failure.
  • Synchronous wait on Close drain. Close has a 2-second deadline; honor it strictly. Slow sinks do not block shutdown.

Open questions to resolve before starting

These should be answered by reading the codebase or asking; do not guess:

  1. Where exactly is AuditLogger constructed today? Find the construction site and confirm what params it accepts. This is where AuditExportConfig gets threaded through.
  2. What is the existing pattern for cmd-level flag → core-level config wiring? Match that pattern; do not introduce a new convention.
  3. Are there existing tests that mock the audit sink for verification? If yes, those tests may need updates to the mock to satisfy the new Sink interface — verify and update.
  4. What is the Go version constraint? Confirm Go 1.25 in go.mod; some net features (e.g., net.Listen with unix domain sockets) have version-specific behavior.

Out of scope (note in commit message for traceability)

The following are deliberately deferred:

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestforge-cliAffects the forge-cli command-line tool (init, run, build, mcp commands)forge-coreAffects the forge-core library (runtime, security, types, llm, mcp, auth)

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions