Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ const (
maxConsecutivePublishFailures = 3
)

// maxSubsPerTopic caps the number of subscribers per topic to prevent
// memory-DoS via unlimited b.subs[topic] growth. A peer with many keys
// can otherwise open connections subscribing to the same topic without
// bound, growing the subscriber slice linearly.
const maxSubsPerTopic = 1000

// Service is the L11 plugin adapter. cmd/daemon/main.go (L12) and
// cmd/pilotctl _daemon-run construct it via NewService and register
// via daemon.RegisterPlugin.
Expand Down Expand Up @@ -227,8 +233,15 @@ func (b *broker) handleConn(sub *subscriber) {
return
}
slog.Info("eventstream broker: subscribe received", "remote", sub.remote(), "topic", subEvt.Topic)
topic = subEvt.Topic
b.addSub(topic, sub)
reqTopic := subEvt.Topic
if !b.addSub(reqTopic, sub) {
slog.Warn("eventstream broker: subscriber rejected, topic at cap",
"remote", sub.remote(),
"topic", reqTopic,
"cap", maxSubsPerTopic)
return
}
topic = reqTopic
if b.events != nil {
b.events.Publish("pubsub.subscribed", map[string]any{
"topic": topic, "remote": sub.remote(),
Expand Down Expand Up @@ -258,10 +271,14 @@ func (b *broker) handleConn(sub *subscriber) {
}
}

func (b *broker) addSub(topic string, sub *subscriber) {
func (b *broker) addSub(topic string, sub *subscriber) bool {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.subs[topic]) >= maxSubsPerTopic {
return false
}
b.subs[topic] = append(b.subs[topic], sub)
b.mu.Unlock()
return true
}

func (b *broker) removeSub(sub *subscriber) {
Expand Down
35 changes: 35 additions & 0 deletions zz_added_coverage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,41 @@ func TestService_StopContextCancel(t *testing.T) {

// --- takeToken cap branch ----------------------------------------------------

// TestBroker_AddSub_RejectsAtCap verifies that addSub returns false when
// the per-topic subscriber cap is reached (memory-DoS protection).
func TestBroker_AddSub_RejectsAtCap(t *testing.T) {
t.Parallel()
b := newBroker(nil)
// Seed the topic with exactly maxSubsPerTopic subscribers.
for i := 0; i < maxSubsPerTopic; i++ {
sub := stubSubscriber()
if !b.addSub("full-topic", sub) {
t.Fatalf("addSub #%d failed before cap", i)
}
}
// The next subscriber must be rejected.
extra := stubSubscriber()
if b.addSub("full-topic", extra) {
t.Fatal("addSub should have rejected subscriber over cap")
}
// A different topic should still accept subscribers (no cross-topic bleed).
other := stubSubscriber()
if !b.addSub("other-topic", other) {
t.Fatal("addSub rejected subscriber on a topic well below cap")
}
// Verify counts.
b.mu.RLock()
nFull := len(b.subs["full-topic"])
nOther := len(b.subs["other-topic"])
b.mu.RUnlock()
if nFull != maxSubsPerTopic {
t.Errorf("full-topic subscribers = %d, want %d", nFull, maxSubsPerTopic)
}
if nOther != 1 {
t.Errorf("other-topic subscribers = %d, want 1", nOther)
}
}

// TestTakeToken_RefillCappedAtBurst exercises the refill cap (line
// 179-180): after a long sleep the refill would exceed the burst budget,
// so it must be clamped.
Expand Down
Loading