Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *Service) Start(ctx context.Context, deps coreapi.Deps) error {
return fmt.Errorf("eventstream: listen on port %d: %w", protocol.PortEventStream, err)
}
s.listener = ln
s.broker = newBroker(deps.Events)
s.broker = newBroker(deps.Events, deps.Trust)

runCtx, cancel := context.WithCancel(ctx)
s.cancel = cancel
Expand Down Expand Up @@ -146,13 +146,15 @@ type broker struct {
subs map[string][]*subscriber
rateMu sync.Mutex
rate map[*subscriber]*publishBucket
events coreapi.EventBus // for pubsub.* observability events
events coreapi.EventBus // for pubsub.* observability events
trust coreapi.TrustChecker // optional — nil if trustedagents not loaded
}

func newBroker(events coreapi.EventBus) *broker {
func newBroker(events coreapi.EventBus, trust coreapi.TrustChecker) *broker {
return &broker{
subs: make(map[string][]*subscriber),
events: events,
trust: trust,
}
}

Expand Down Expand Up @@ -228,6 +230,26 @@ func (b *broker) handleConn(sub *subscriber) {
}
slog.Info("eventstream broker: subscribe received", "remote", sub.remote(), "topic", subEvt.Topic)
topic = subEvt.Topic

// PILOT-251: gate subscription. Any peer that completes L6 key
// exchange can connect to port 1002; without an auth gate, any
// peer can subscribe to any topic (metadata leak + content read
// for unencrypted topics). If a TrustChecker is available, only
// trusted peers may subscribe. If none is loaded (unusual),
// subscription is denied — fail-closed is the security-safe default.
if b.trust != nil {
_, ok := b.trust.IsTrusted(sub.conn.RemoteAddr().Node)
if !ok {
slog.Warn("eventstream broker: subscription rejected — peer not trusted",
"remote", sub.remote(), "topic", topic)
return
}
} else {
slog.Warn("eventstream broker: subscription rejected — no TrustChecker loaded, denying by default",
"remote", sub.remote(), "topic", topic)
return
}

b.addSub(topic, sub)
if b.events != nil {
b.events.Publish("pubsub.subscribed", map[string]any{
Expand Down
82 changes: 73 additions & 9 deletions zz_added_coverage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ func (s *stubEventBus) count(topic string) int {
return n
}

// --- stub TrustChecker -------------------------------------------------------

// stubTrustChecker is a TrustChecker that trusts (or denies) a configurable
// set of node IDs. Used by broker handleConn tests that exercise the
// subscribe/publish path — the trust gate must pass for those tests to
// reach the code under test.
type stubTrustChecker struct {
trusted map[uint32]bool
allowAll bool
}

func newAllowAllTrustChecker() *stubTrustChecker {
return &stubTrustChecker{allowAll: true}
}

func (t *stubTrustChecker) IsTrusted(nodeID uint32) (string, bool) {
if t.allowAll {
return "test-agent", true
}
if t.trusted != nil && t.trusted[nodeID] {
return "test-agent", true
}
return "", false
}

// --- net.Pipe-backed coreapi.Stream -----------------------------------------

// pipeStream adapts net.Pipe to coreapi.Stream so we can drive broker
Expand Down Expand Up @@ -100,7 +125,7 @@ func (s *pipeStream) SetWriteDeadline(time.Time) error { return nil }
func TestBroker_HandleConn_SubscribeEmitsEvent(t *testing.T) {
t.Parallel()
bus := &stubEventBus{}
b := newBroker(bus)
b := newBroker(bus, newAllowAllTrustChecker())

subStream, clientEnd := newPipeStreamPair()
sub := newSubscriber(subStream)
Expand Down Expand Up @@ -153,7 +178,7 @@ func TestBroker_HandleConn_SubscribeEmitsEvent(t *testing.T) {
func TestBroker_HandleConn_SubscribeReadFails(t *testing.T) {
t.Parallel()
bus := &stubEventBus{}
b := newBroker(bus)
b := newBroker(bus, newAllowAllTrustChecker())

subStream, clientEnd := newPipeStreamPair()
sub := newSubscriber(subStream)
Expand Down Expand Up @@ -190,7 +215,7 @@ func TestBroker_HandleConn_SubscribeReadFails(t *testing.T) {
func TestBroker_HandleConn_PublishedEventBusBranch(t *testing.T) {
t.Parallel()
bus := &stubEventBus{}
b := newBroker(bus)
b := newBroker(bus, newAllowAllTrustChecker())

subStream, clientEnd := newPipeStreamPair()
sub := newSubscriber(subStream)
Expand Down Expand Up @@ -248,7 +273,7 @@ func TestBroker_HandleConn_PublishedEventBusBranch(t *testing.T) {
func TestBroker_HandleConn_RateLimitBusBranch(t *testing.T) {
t.Parallel()
bus := &stubEventBus{}
b := newBroker(bus)
b := newBroker(bus, newAllowAllTrustChecker())

subStream, clientEnd := newPipeStreamPair()
sub := newSubscriber(subStream)
Expand Down Expand Up @@ -315,7 +340,7 @@ func TestBroker_HandleConn_RateLimitBusBranch(t *testing.T) {
// (lines 312-315 — wildcard subscriber receives non-wildcard event).
func TestPublishWith_WildcardFanout(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)

topicSub := stubSubscriber()
wildSub := stubSubscriber()
Expand Down Expand Up @@ -348,7 +373,7 @@ func TestPublishWith_WildcardFanout(t *testing.T) {
// should not receive its own publish.
func TestPublishWith_WildcardSenderSkipped(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)

wildSender := stubSubscriber()
wildOther := stubSubscriber()
Expand Down Expand Up @@ -381,7 +406,7 @@ func TestPublishWith_WildcardSenderSkipped(t *testing.T) {
// otherwise double-deliver).
func TestPublishWith_TopicIsWildcardDoesNotDoubleDeliver(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
wildSub := stubSubscriber()
b.addSub("*", wildSub)

Expand All @@ -408,7 +433,7 @@ func TestPublishWith_TopicIsWildcardDoesNotDoubleDeliver(t *testing.T) {
func TestPublishWith_BusPublishedBranch(t *testing.T) {
t.Parallel()
bus := &stubEventBus{}
b := newBroker(bus)
b := newBroker(bus, nil)
sub := stubSubscriber()
b.addSub("t", sub)

Expand Down Expand Up @@ -452,7 +477,7 @@ func TestService_StopContextCancel(t *testing.T) {
// so it must be clamped.
func TestTakeToken_RefillCappedAtBurst(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub := stubSubscriber()

// Seed a bucket with a lastRefill far in the past so the elapsed
Expand Down Expand Up @@ -674,3 +699,42 @@ func TestServer_Publish_TopicLiteralStar(t *testing.T) {
// expected — no second delivery.
}
}

// TestBroker_HandleConn_SubscriptionRejectedByTrustGate covers PILOT-251:
// when a TrustChecker is present and the peer is NOT trusted, handleConn
// must reject the subscription immediately without adding to subs.
func TestBroker_HandleConn_SubscriptionRejectedByTrustGate(t *testing.T) {
t.Parallel()

// Trust checker that trusts nobody.
denyAll := &stubTrustChecker{trusted: map[uint32]bool{}}
b := newBroker(nil, denyAll)

subStream, clientEnd := newPipeStreamPair()
sub := newSubscriber(subStream)

done := make(chan struct{})
go func() {
defer close(done)
b.handleConn(sub)
}()

// Write a subscribe event — should be rejected.
if err := WriteEvent(clientEnd, &Event{Topic: "secret"}); err != nil {
t.Fatalf("WriteEvent subscribe: %v", err)
}

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("handleConn did not return after trust rejection")
}

// Verify the subscription was NOT registered.
b.mu.RLock()
n := len(b.subs["secret"])
b.mu.RUnlock()
if n != 0 {
t.Errorf("subscription should not be registered when trust check fails, got %d subs", n)
}
}
6 changes: 3 additions & 3 deletions zz_more_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestNewSubscriber_NilWrap(t *testing.T) {
// without going through handleConn.
func TestBrokerAddRemoveSub_DirectCalls(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub1 := stubSubscriber()
sub2 := stubSubscriber()

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestBrokerAddRemoveSub_DirectCalls(t *testing.T) {
// publisher's bucket should be gone after the call.
func TestBrokerForgetPublisher(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub := stubSubscriber()
// Prime the bucket.
if !b.takeToken(sub) {
Expand All @@ -107,7 +107,7 @@ func TestBrokerForgetPublisher(t *testing.T) {
// TestTakeToken_BurstAndRecover exercises the rate-limit drain + refill.
func TestTakeToken_BurstAndRecover(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub := stubSubscriber()
// Drain the entire burst budget.
drained := 0
Expand Down
8 changes: 4 additions & 4 deletions zz_resilience_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func stubSubscriber() *subscriber {
// the retry succeeds — subscriber stays alive, counter never increments.
func TestPubsubRetryAbsorbsSingleTransientFailure(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub := stubSubscriber()
b.addSub("topic-a", sub)

Expand All @@ -73,7 +73,7 @@ func TestPubsubRetryAbsorbsSingleTransientFailure(t *testing.T) {
// maxConsecutivePublishFailures across publishes.
func TestPubsubKeepsSubscriberBelowFailureThreshold(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub := stubSubscriber()
b.addSub("topic-fail", sub)

Expand Down Expand Up @@ -107,7 +107,7 @@ func TestPubsubKeepsSubscriberBelowFailureThreshold(t *testing.T) {
// counter restarted from 0, threshold is 3).
func TestPubsubFailureCounterResetsOnSuccess(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
sub := stubSubscriber()
b.addSub("topic-mix", sub)

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestPubsubFailureCounterResetsOnSuccess(t *testing.T) {
// the flaky sub drops.
func TestPubsubOneSubscribersFailureDoesNotKillOthers(t *testing.T) {
t.Parallel()
b := newBroker(nil)
b := newBroker(nil, nil)
healthy := stubSubscriber()
flaky := stubSubscriber()
b.addSub("shared", healthy)
Expand Down
2 changes: 1 addition & 1 deletion zz_service_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestService_AcceptLoopAndPubSub(t *testing.T) {
pubStream := newSinkStream(pubC2sR)

listener := newESListener(subStream, pubStream)
deps := coreapi.Deps{Streams: &esFakeStreams{ln: listener}}
deps := coreapi.Deps{Streams: &esFakeStreams{ln: listener}, Trust: newAllowAllTrustChecker()}

s := NewService()
if err := s.Start(context.Background(), deps); err != nil {
Expand Down
Loading