diff --git a/service.go b/service.go index a346de8..95132c0 100644 --- a/service.go +++ b/service.go @@ -37,6 +37,12 @@ const ( maxConsecutivePublishFailures = 3 ) +// maxSubsPerTopic caps the number of subscribers per topic to prevent +// memory-DoS via unlimited b.subs[topic] growth. A peer with many keys +// can otherwise open connections subscribing to the same topic without +// bound, growing the subscriber slice linearly. +const maxSubsPerTopic = 1000 + // Service is the L11 plugin adapter. cmd/daemon/main.go (L12) and // cmd/pilotctl _daemon-run construct it via NewService and register // via daemon.RegisterPlugin. @@ -227,8 +233,15 @@ func (b *broker) handleConn(sub *subscriber) { return } slog.Info("eventstream broker: subscribe received", "remote", sub.remote(), "topic", subEvt.Topic) - topic = subEvt.Topic - b.addSub(topic, sub) + reqTopic := subEvt.Topic + if !b.addSub(reqTopic, sub) { + slog.Warn("eventstream broker: subscriber rejected, topic at cap", + "remote", sub.remote(), + "topic", reqTopic, + "cap", maxSubsPerTopic) + return + } + topic = reqTopic if b.events != nil { b.events.Publish("pubsub.subscribed", map[string]any{ "topic": topic, "remote": sub.remote(), @@ -258,10 +271,14 @@ func (b *broker) handleConn(sub *subscriber) { } } -func (b *broker) addSub(topic string, sub *subscriber) { +func (b *broker) addSub(topic string, sub *subscriber) bool { b.mu.Lock() + defer b.mu.Unlock() + if len(b.subs[topic]) >= maxSubsPerTopic { + return false + } b.subs[topic] = append(b.subs[topic], sub) - b.mu.Unlock() + return true } func (b *broker) removeSub(sub *subscriber) { diff --git a/zz_added_coverage_test.go b/zz_added_coverage_test.go index 3ab3a7f..13b0bb9 100644 --- a/zz_added_coverage_test.go +++ b/zz_added_coverage_test.go @@ -447,6 +447,41 @@ func TestService_StopContextCancel(t *testing.T) { // --- takeToken cap branch ---------------------------------------------------- +// TestBroker_AddSub_RejectsAtCap verifies that addSub returns false when +// the per-topic subscriber cap is reached (memory-DoS protection). +func TestBroker_AddSub_RejectsAtCap(t *testing.T) { + t.Parallel() + b := newBroker(nil) + // Seed the topic with exactly maxSubsPerTopic subscribers. + for i := 0; i < maxSubsPerTopic; i++ { + sub := stubSubscriber() + if !b.addSub("full-topic", sub) { + t.Fatalf("addSub #%d failed before cap", i) + } + } + // The next subscriber must be rejected. + extra := stubSubscriber() + if b.addSub("full-topic", extra) { + t.Fatal("addSub should have rejected subscriber over cap") + } + // A different topic should still accept subscribers (no cross-topic bleed). + other := stubSubscriber() + if !b.addSub("other-topic", other) { + t.Fatal("addSub rejected subscriber on a topic well below cap") + } + // Verify counts. + b.mu.RLock() + nFull := len(b.subs["full-topic"]) + nOther := len(b.subs["other-topic"]) + b.mu.RUnlock() + if nFull != maxSubsPerTopic { + t.Errorf("full-topic subscribers = %d, want %d", nFull, maxSubsPerTopic) + } + if nOther != 1 { + t.Errorf("other-topic subscribers = %d, want 1", nOther) + } +} + // TestTakeToken_RefillCappedAtBurst exercises the refill cap (line // 179-180): after a long sleep the refill would exceed the burst budget, // so it must be clamped.