Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 106 additions & 3 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type ServiceConfig struct {
// misbehaving peer or sustained inbound load fills the operator's
// disk indefinitely.
InboxMaxFiles int
// InboxMaxBytes caps the total on-disk bytes used by the inbox.
// When > 0, saveInboxMessage checks the accumulated size before
// every write and evictExpandOverflow uses bytes instead of file
// count. Zero ⇒ no byte cap (backward-compatible).
InboxMaxBytes int64
}

// inboxEvictCheckEvery: only run the eviction-scan once every N saves
Expand Down Expand Up @@ -259,6 +264,35 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error {
return fmt.Errorf("mkdir: %w", err)
}

// Byte-budget check: if InboxMaxBytes is set, confirm there is room
// BEFORE writing. Evict if over, then re-check.
if s.cfg.InboxMaxBytes > 0 {
current, _ := inboxTotalBytes(dir)
// Estimate the JSON overhead: type, from, data, bytes, received_at,
// and possibly data_b64.
estimated := int64(len(frame.Payload)) + 256
if current+estimated > s.cfg.InboxMaxBytes {
s.evictInboxOverflowByBytes(dir)
after, _ := inboxTotalBytes(dir)
if after+estimated > s.cfg.InboxMaxBytes {
slog.Warn("inbox byte budget exceeded after eviction",
"current_bytes", after,
"max_bytes", s.cfg.InboxMaxBytes,
"frame_bytes", len(frame.Payload))
if s.deps.Events != nil {
s.deps.Events.Publish("inbox.full", map[string]any{
"from": from.String(),
"type": TypeName(frame.Type),
"frame_bytes": len(frame.Payload),
"max_bytes": s.cfg.InboxMaxBytes,
})
}
return fmt.Errorf("inbox byte budget exceeded: %d + %d > %d",
after, estimated, s.cfg.InboxMaxBytes)
}
}
}

ts := time.Now()
msg := map[string]interface{}{
"type": TypeName(frame.Type),
Expand Down Expand Up @@ -298,10 +332,16 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error {
}

// evictInboxOverflow trims the inbox to at most cfg.InboxMaxFiles by
// deleting the oldest files (by mtime). Best-effort: I/O errors are
// logged and the loop continues. Called periodically from
// saveInboxMessage.
// deleting the oldest files (by mtime). When InboxMaxBytes > 0, the
// eviction target is total bytes rather than file count. Best-effort:
// I/O errors are logged and the loop continues. Called periodically
// from saveInboxMessage.
func (s *Service) evictInboxOverflow(dir string) {
// Byte-based eviction when InboxMaxBytes is configured.
if s.cfg.InboxMaxBytes > 0 {
s.evictInboxOverflowByBytes(dir)
return
}
maxFiles := s.cfg.InboxMaxFiles
if maxFiles <= 0 {
maxFiles = 10000
Expand Down Expand Up @@ -340,3 +380,66 @@ func (s *Service) evictInboxOverflow(dir string) {
}
slog.Info("inbox eviction", "dir", dir, "evicted", toEvict, "remaining", maxFiles)
}

// evictInboxOverflowByBytes trims total inbox size to InboxMaxBytes.
func (s *Service) evictInboxOverflowByBytes(dir string) {
entries, err := os.ReadDir(dir)
if err != nil {
slog.Debug("inbox evict: readdir", "dir", dir, "err", err)
return
}
type aged struct {
name string
mod time.Time
size int64
}
files := make([]aged, 0, len(entries))
var total int64
for _, e := range entries {
if e.IsDir() {
continue
}
info, err := e.Info()
if err != nil {
continue
}
sz := info.Size()
files = append(files, aged{name: e.Name(), mod: info.ModTime(), size: sz})
total += sz
}
if total <= s.cfg.InboxMaxBytes {
return
}
// Oldest first.
sort.Slice(files, func(i, j int) bool { return files[i].mod.Before(files[j].mod) })
evicted := 0
for i := 0; i < len(files) && total > s.cfg.InboxMaxBytes; i++ {
p := filepath.Join(dir, files[i].name)
if err := os.Remove(p); err != nil {
continue
}
total -= files[i].size
evicted++
}
slog.Info("inbox eviction (bytes)", "dir", dir, "evicted", evicted, "total_bytes_after", total, "max_bytes", s.cfg.InboxMaxBytes)
}

// inboxTotalBytes sums the on-disk size of all regular files in dir.
func inboxTotalBytes(dir string) (int64, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return 0, err
}
var total int64
for _, e := range entries {
if e.IsDir() {
continue
}
info, err := e.Info()
if err != nil {
continue
}
total += info.Size()
}
return total, nil
}
102 changes: 102 additions & 0 deletions zz_service_errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,108 @@ func (e *capturingEvents) Subscribe(pattern string) (<-chan coreapi.Event, func(
return ch, func() {}
}

// ---- PILOT-276: inbox byte-budget cap ----------------------------------

// TestSaveInboxMessage_ByteBudgetEnforced verifies that when InboxMaxBytes is
// set, saveInboxMessage rejects writes that would exceed the budget.
func TestSaveInboxMessage_ByteBudgetEnforced(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
s := NewService(ServiceConfig{InboxDir: tmp, InboxMaxBytes: 512})

// First message (small) must succeed.
frame := &Frame{Type: TypeText, Payload: []byte("hello")}
if err := s.saveInboxMessage(frame, protocol.Addr{Node: 1}); err != nil {
t.Fatalf("first save under budget must succeed: %v", err)
}

// Second message — the inbox JSON overhead + data should push over 512.
// Force the test by writing a payload that fills the rest of the budget.
// After the first save, let's check how many files exist.
entries, _ := os.ReadDir(tmp)
t.Logf("entries after first save: %d", len(entries))

// Save enough messages to exceed 512 bytes total.
for i := 0; i < 20; i++ {
frame := &Frame{Type: TypeText, Payload: []byte(strings.Repeat("x", 50))}
err := s.saveInboxMessage(frame, protocol.Addr{Node: 2})
if err != nil {
t.Logf("saveInboxMessage #%d failed as expected: %v", i+2, err)
return // success — budget enforced
}
}
t.Error("expected saveInboxMessage to eventually reject when over byte budget")
}

// TestSaveInboxMessage_NoByteBudget_Unbounded documents that with
// InboxMaxBytes unset (zero), old behaviour is preserved — writes are
// never rejected on byte count.
func TestSaveInboxMessage_NoByteBudget_Unbounded(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
s := NewService(ServiceConfig{InboxDir: tmp}) // InboxMaxBytes=0 (unset)

for i := 0; i < 5; i++ {
frame := &Frame{Type: TypeText, Payload: []byte(strings.Repeat("y", 200))}
if err := s.saveInboxMessage(frame, protocol.Addr{Node: 3}); err != nil {
t.Fatalf("save #%d with no byte budget must not reject: %v", i+1, err)
}
}
entries, _ := os.ReadDir(tmp)
if len(entries) != 5 {
t.Errorf("got %d files, want 5", len(entries))
}
}

// TestEvictInboxOverflow_ByteBasedEvictsOldest verifies that when
// InboxMaxBytes is set, evictInboxOverflow uses total bytes (not file
// count) to decide what to delete.
func TestEvictInboxOverflow_ByteBasedEvictsOldest(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
s := NewService(ServiceConfig{InboxDir: tmp, InboxMaxBytes: 200})

now := time.Now()
// Write 3 files totalling > 200 bytes. Oldest should get evicted.
files := []struct {
name string
content string
age time.Duration
}{
{"old.json", strings.Repeat("a", 100), -10 * time.Second},
{"mid.json", strings.Repeat("b", 80), -5 * time.Second},
{"new.json", strings.Repeat("c", 60), -1 * time.Second},
}
for _, f := range files {
p := filepath.Join(tmp, f.name)
if err := os.WriteFile(p, []byte(f.content), 0600); err != nil {
t.Fatalf("write: %v", err)
}
mt := now.Add(f.age)
_ = os.Chtimes(p, mt, mt)
}

s.evictInboxOverflow(tmp)

entries, _ := os.ReadDir(tmp)
names := make([]string, len(entries))
for i, e := range entries {
names[i] = e.Name()
}
t.Logf("remaining entries: %v", names)

// old.json (oldest, 100 bytes) may have been evicted to get under 200.
// At a minimum, the total remaining bytes must not exceed 200.
var total int64
for _, e := range entries {
info, _ := e.Info()
total += info.Size()
}
if total > 200 {
t.Errorf("total bytes after eviction = %d, want <= 200", total)
}
}

// Ensure capturingEvents satisfies coreapi.EventBus at compile time.
var _ coreapi.EventBus = (*capturingEvents)(nil)

Expand Down
Loading