diff --git a/service.go b/service.go index 4b7d524..a295f24 100644 --- a/service.go +++ b/service.go @@ -37,6 +37,11 @@ type ServiceConfig struct { // misbehaving peer or sustained inbound load fills the operator's // disk indefinitely. InboxMaxFiles int + // InboxMaxBytes caps the total on-disk bytes used by the inbox. + // When > 0, saveInboxMessage checks the accumulated size before + // every write and evictExpandOverflow uses bytes instead of file + // count. Zero ⇒ no byte cap (backward-compatible). + InboxMaxBytes int64 } // inboxEvictCheckEvery: only run the eviction-scan once every N saves @@ -259,6 +264,35 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error { return fmt.Errorf("mkdir: %w", err) } + // Byte-budget check: if InboxMaxBytes is set, confirm there is room + // BEFORE writing. Evict if over, then re-check. + if s.cfg.InboxMaxBytes > 0 { + current, _ := inboxTotalBytes(dir) + // Estimate the JSON overhead: type, from, data, bytes, received_at, + // and possibly data_b64. + estimated := int64(len(frame.Payload)) + 256 + if current+estimated > s.cfg.InboxMaxBytes { + s.evictInboxOverflowByBytes(dir) + after, _ := inboxTotalBytes(dir) + if after+estimated > s.cfg.InboxMaxBytes { + slog.Warn("inbox byte budget exceeded after eviction", + "current_bytes", after, + "max_bytes", s.cfg.InboxMaxBytes, + "frame_bytes", len(frame.Payload)) + if s.deps.Events != nil { + s.deps.Events.Publish("inbox.full", map[string]any{ + "from": from.String(), + "type": TypeName(frame.Type), + "frame_bytes": len(frame.Payload), + "max_bytes": s.cfg.InboxMaxBytes, + }) + } + return fmt.Errorf("inbox byte budget exceeded: %d + %d > %d", + after, estimated, s.cfg.InboxMaxBytes) + } + } + } + ts := time.Now() msg := map[string]interface{}{ "type": TypeName(frame.Type), @@ -298,10 +332,16 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error { } // evictInboxOverflow trims the inbox to at most cfg.InboxMaxFiles by -// deleting the oldest files (by mtime). Best-effort: I/O errors are -// logged and the loop continues. Called periodically from -// saveInboxMessage. +// deleting the oldest files (by mtime). When InboxMaxBytes > 0, the +// eviction target is total bytes rather than file count. Best-effort: +// I/O errors are logged and the loop continues. Called periodically +// from saveInboxMessage. func (s *Service) evictInboxOverflow(dir string) { + // Byte-based eviction when InboxMaxBytes is configured. + if s.cfg.InboxMaxBytes > 0 { + s.evictInboxOverflowByBytes(dir) + return + } maxFiles := s.cfg.InboxMaxFiles if maxFiles <= 0 { maxFiles = 10000 @@ -340,3 +380,66 @@ func (s *Service) evictInboxOverflow(dir string) { } slog.Info("inbox eviction", "dir", dir, "evicted", toEvict, "remaining", maxFiles) } + +// evictInboxOverflowByBytes trims total inbox size to InboxMaxBytes. +func (s *Service) evictInboxOverflowByBytes(dir string) { + entries, err := os.ReadDir(dir) + if err != nil { + slog.Debug("inbox evict: readdir", "dir", dir, "err", err) + return + } + type aged struct { + name string + mod time.Time + size int64 + } + files := make([]aged, 0, len(entries)) + var total int64 + for _, e := range entries { + if e.IsDir() { + continue + } + info, err := e.Info() + if err != nil { + continue + } + sz := info.Size() + files = append(files, aged{name: e.Name(), mod: info.ModTime(), size: sz}) + total += sz + } + if total <= s.cfg.InboxMaxBytes { + return + } + // Oldest first. + sort.Slice(files, func(i, j int) bool { return files[i].mod.Before(files[j].mod) }) + evicted := 0 + for i := 0; i < len(files) && total > s.cfg.InboxMaxBytes; i++ { + p := filepath.Join(dir, files[i].name) + if err := os.Remove(p); err != nil { + continue + } + total -= files[i].size + evicted++ + } + slog.Info("inbox eviction (bytes)", "dir", dir, "evicted", evicted, "total_bytes_after", total, "max_bytes", s.cfg.InboxMaxBytes) +} + +// inboxTotalBytes sums the on-disk size of all regular files in dir. +func inboxTotalBytes(dir string) (int64, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return 0, err + } + var total int64 + for _, e := range entries { + if e.IsDir() { + continue + } + info, err := e.Info() + if err != nil { + continue + } + total += info.Size() + } + return total, nil +} diff --git a/zz_service_errors_test.go b/zz_service_errors_test.go index 718d2d0..42beb64 100644 --- a/zz_service_errors_test.go +++ b/zz_service_errors_test.go @@ -424,6 +424,108 @@ func (e *capturingEvents) Subscribe(pattern string) (<-chan coreapi.Event, func( return ch, func() {} } +// ---- PILOT-276: inbox byte-budget cap ---------------------------------- + +// TestSaveInboxMessage_ByteBudgetEnforced verifies that when InboxMaxBytes is +// set, saveInboxMessage rejects writes that would exceed the budget. +func TestSaveInboxMessage_ByteBudgetEnforced(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + s := NewService(ServiceConfig{InboxDir: tmp, InboxMaxBytes: 512}) + + // First message (small) must succeed. + frame := &Frame{Type: TypeText, Payload: []byte("hello")} + if err := s.saveInboxMessage(frame, protocol.Addr{Node: 1}); err != nil { + t.Fatalf("first save under budget must succeed: %v", err) + } + + // Second message — the inbox JSON overhead + data should push over 512. + // Force the test by writing a payload that fills the rest of the budget. + // After the first save, let's check how many files exist. + entries, _ := os.ReadDir(tmp) + t.Logf("entries after first save: %d", len(entries)) + + // Save enough messages to exceed 512 bytes total. + for i := 0; i < 20; i++ { + frame := &Frame{Type: TypeText, Payload: []byte(strings.Repeat("x", 50))} + err := s.saveInboxMessage(frame, protocol.Addr{Node: 2}) + if err != nil { + t.Logf("saveInboxMessage #%d failed as expected: %v", i+2, err) + return // success — budget enforced + } + } + t.Error("expected saveInboxMessage to eventually reject when over byte budget") +} + +// TestSaveInboxMessage_NoByteBudget_Unbounded documents that with +// InboxMaxBytes unset (zero), old behaviour is preserved — writes are +// never rejected on byte count. +func TestSaveInboxMessage_NoByteBudget_Unbounded(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + s := NewService(ServiceConfig{InboxDir: tmp}) // InboxMaxBytes=0 (unset) + + for i := 0; i < 5; i++ { + frame := &Frame{Type: TypeText, Payload: []byte(strings.Repeat("y", 200))} + if err := s.saveInboxMessage(frame, protocol.Addr{Node: 3}); err != nil { + t.Fatalf("save #%d with no byte budget must not reject: %v", i+1, err) + } + } + entries, _ := os.ReadDir(tmp) + if len(entries) != 5 { + t.Errorf("got %d files, want 5", len(entries)) + } +} + +// TestEvictInboxOverflow_ByteBasedEvictsOldest verifies that when +// InboxMaxBytes is set, evictInboxOverflow uses total bytes (not file +// count) to decide what to delete. +func TestEvictInboxOverflow_ByteBasedEvictsOldest(t *testing.T) { + t.Parallel() + tmp := t.TempDir() + s := NewService(ServiceConfig{InboxDir: tmp, InboxMaxBytes: 200}) + + now := time.Now() + // Write 3 files totalling > 200 bytes. Oldest should get evicted. + files := []struct { + name string + content string + age time.Duration + }{ + {"old.json", strings.Repeat("a", 100), -10 * time.Second}, + {"mid.json", strings.Repeat("b", 80), -5 * time.Second}, + {"new.json", strings.Repeat("c", 60), -1 * time.Second}, + } + for _, f := range files { + p := filepath.Join(tmp, f.name) + if err := os.WriteFile(p, []byte(f.content), 0600); err != nil { + t.Fatalf("write: %v", err) + } + mt := now.Add(f.age) + _ = os.Chtimes(p, mt, mt) + } + + s.evictInboxOverflow(tmp) + + entries, _ := os.ReadDir(tmp) + names := make([]string, len(entries)) + for i, e := range entries { + names[i] = e.Name() + } + t.Logf("remaining entries: %v", names) + + // old.json (oldest, 100 bytes) may have been evicted to get under 200. + // At a minimum, the total remaining bytes must not exceed 200. + var total int64 + for _, e := range entries { + info, _ := e.Info() + total += info.Size() + } + if total > 200 { + t.Errorf("total bytes after eviction = %d, want <= 200", total) + } +} + // Ensure capturingEvents satisfies coreapi.EventBus at compile time. var _ coreapi.EventBus = (*capturingEvents)(nil)