Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ go.work.sum

# ignore the log directory
configs/development/logs/
.worktrees/
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ Once you have built the project, you can run the `vault-audit-filter` executable
audit_address: "127.0.0.1:1269"
audit_description: "Vault Audit Filter Device"

async:
queue_size: 20
workers: 2
timeout: 5s

rule_groups:
- name: "normal_operations"
rules:
Expand Down Expand Up @@ -129,6 +134,64 @@ Once you have built the project, you can run the `vault-audit-filter` executable
- `messaging.token`: The bot token for Slack (when using "slack" type).
- `messaging.channel`: The channel ID for Slack messages (when using "slack" type).

- **Async Settings**:
- `async.queue_size`: Bounded queue length for async side effects (drop on full).
- `async.workers`: Number of async side-effect workers (`0` disables worker execution).
- `async.timeout`: Timeout for Slack API/webhook and forwarding operations.

### Async Tuning Profiles

Use these as baseline profiles and tune from there:

| Profile | `async.queue_size` | `async.workers` | `async.timeout` | Expected Behavior |
| --- | ---: | ---: | --- | --- |
| Latency-first (default) | 20 | 2 | 5s | Lowest request latency; highest drop risk during bursts |
| Balanced | 256 | 8 | 5s | Low request latency with lower drop rate than default |
| Throughput-biased | 1024 | 16 | 5s | Low request latency with significantly fewer drops; higher CPU/memory usage |

Representative stress-test results (2000 requests, concurrency 64, slow downstream side effects):

| Config | Downstream Delay | Request Avg | Request P95 | Inferred Drops |
| --- | ---: | ---: | ---: | ---: |
| Sync (`main`) | 20ms | 21.0ms | 21.4ms | 0 |
| Async `q=64,w=2` | 20ms | 0.30ms | 1.06ms | 1934/2000 |
| Async `q=256,w=8` | 20ms | 0.34ms | 1.39ms | 1736/2000 |
| Async `q=1024,w=16` | 20ms | 0.37ms | 1.80ms | 960/2000 |

Interpretation:
- Current async design is appropriate when request-path latency protection is the top priority.
- If side-effect delivery reliability is required, increase queue/workers and monitor drops, or move to a durable retry design.

### Performance Findings and Decision

Performance findings from the latest comparison are:
- Synchronous mode (`main`) preserves side-effect delivery in the test scenario, but request latency tracks downstream delay (about 21ms average at 20ms downstream delay).
- Asynchronous mode keeps request latency low (sub-millisecond average in measured scenarios), but can drop a large fraction of side effects during bursts depending on queue/workers settings.

Caveats:
- The numbers above come from synthetic stress tests (2000 requests, concurrency 64) and are intended as directional guidance.
- Real production behavior depends on traffic burst shape, downstream service health, and host resource limits.

Current decision:
- Operate in latency-first async mode by default.
- Treat side effects as best-effort unless deployment requirements explicitly demand stronger delivery guarantees.

When durable/retry work is needed:
- Side-effect drops remain sustained and unacceptable after tuning `async.workers` and `async.queue_size`.
- Audit/operational requirements require stronger guarantees than best-effort delivery.

Follow-up options:
1. Add bounded blocking enqueue mode to trade some request latency for fewer drops.
2. Add durable retry and dead-letter flow for stronger side-effect delivery guarantees.
3. Re-run workload-specific tuning tests and adjust profile recommendations.

Tuning checklist:
1. Start with `Latency-first` or `Balanced`.
2. Monitor side-effect drop count and request tail latency.
3. If drops are sustained and unacceptable, raise `async.workers` first, then `async.queue_size`.
4. If request tail latency regresses, reduce workers or move to a balanced profile.
5. If drops remain unacceptable, adopt durable retry architecture rather than unbounded tuning.

### Rule Syntax

Rules are written using the `expr` language, a simple and safe expression language for Go. Rules can be based on the following properties of audit logs:
Expand Down
5 changes: 5 additions & 0 deletions configs/development/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
async:
queue_size: 20
workers: 2
timeout: 5s

rule_groups:
- name: "normal_operations"
rules:
Expand Down
144 changes: 144 additions & 0 deletions pkg/auditserver/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package auditserver

import (
"strconv"
"time"

"github.com/ncode/vault-audit-filter/pkg/forwarder"
"github.com/ncode/vault-audit-filter/pkg/messaging"
)

var defaultSideWorkers = 2

type sideTask struct {
id string
groupName string
attempts int
payload []byte
payloadStr string
messenger messaging.Messenger
forwarder forwarder.Forwarder
}

func (as *AuditServer) enqueueSide(task sideTask) bool {
if as.asyncDurableEnabled && as.sideStore != nil {
if task.id == "" {
task.id = as.nextSideTaskID()
}
if err := as.sideStore.Save(task); err != nil {
as.logger.Error("Failed to persist durable side task", "error", err)
return false
}
}

if as.asyncEnqueueMode == "wait" {
timeout := as.asyncEnqueueTimeout
if timeout <= 0 {
timeout = 5 * time.Millisecond
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case as.sideQueue <- task:
return true
case <-timer.C:
if as.asyncDurableEnabled {
time.AfterFunc(as.asyncRetryBackoff, func() {
_ = as.enqueueSide(task)
})
return true
}
as.sideDrops.Add(1)
return false
}
}

select {
case as.sideQueue <- task:
return true
default:
if as.asyncDurableEnabled {
time.AfterFunc(as.asyncRetryBackoff, func() {
_ = as.enqueueSide(task)
})
return true
}
as.sideDrops.Add(1)
return false
}
}

func (as *AuditServer) nextSideTaskID() string {
n := as.sideTaskSeq.Add(1)
return time.Now().Format("20060102150405.000000000") + "-" + strconv.FormatUint(n, 10)
}

func (as *AuditServer) startSideWorkers(n int) {
if n <= 0 {
return
}
for i := 0; i < n; i++ {
go func() {
for task := range as.sideQueue {
as.processSideTask(task)
}
}()
}
}

func (as *AuditServer) processSideTask(task sideTask) {
messenger := task.messenger
fwd := task.forwarder
if task.groupName != "" {
for i := range as.ruleGroups {
if as.ruleGroups[i].Name == task.groupName {
if messenger == nil {
messenger = as.ruleGroups[i].Messenger
}
if fwd == nil {
fwd = as.ruleGroups[i].Forwarder
}
break
}
}
}

var sendErr error
if messenger != nil {
if err := messenger.Send(task.payloadStr); err != nil {
as.logger.Error("Failed to send notification", "error", err)
sendErr = err
}
}
if fwd != nil {
if err := fwd.Forward(task.payload); err != nil {
as.logger.Error("Failed to forward message", "error", err)
if sendErr == nil {
sendErr = err
}
}
}

if !as.asyncDurableEnabled || as.sideStore == nil || task.id == "" {
return
}

if sendErr == nil {
_ = as.sideStore.Delete(task.id)
return
}

task.attempts++
if task.attempts >= as.asyncRetryMaxAttempts {
_ = as.sideStore.MoveToDeadLetter(task, sendErr.Error())
_ = as.sideStore.Delete(task.id)
return
}

if err := as.sideStore.Save(task); err != nil {
as.logger.Error("Failed to save retry side task", "error", err)
}
time.AfterFunc(as.asyncRetryBackoff, func() {
_ = as.enqueueSide(task)
})
}
43 changes: 43 additions & 0 deletions pkg/auditserver/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package auditserver

import (
"io"
"log/slog"
"testing"

"github.com/expr-lang/expr"
"github.com/spf13/viper"
)

func BenchmarkReact(b *testing.B) {
logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelInfo}))
viper.Reset()
viper.Set("rule_groups", []map[string]interface{}{
{
"name": "rg",
"rules": []string{"true"},
"log_file": map[string]interface{}{
"file_path": "/tmp/test.log",
"max_size": 1,
},
},
})
server, _ := New(logger)
frame := []byte(`{"type":"request","time":"2000-01-01T00:00:00Z","auth":{},"request":{},"response":{}}`)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
server.React(frame, nil)
}
}

func BenchmarkShouldLog(b *testing.B) {
p, _ := expr.Compile("true", expr.Env(&AuditLog{}))
rg := &RuleGroup{CompiledRules: []CompiledRule{{Program: p}}}
al := &AuditLog{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = rg.shouldLog(al)
}
}
Loading