diff --git a/service.go b/service.go index a346de8..305a6b2 100644 --- a/service.go +++ b/service.go @@ -66,7 +66,7 @@ func (s *Service) Start(ctx context.Context, deps coreapi.Deps) error { return fmt.Errorf("eventstream: listen on port %d: %w", protocol.PortEventStream, err) } s.listener = ln - s.broker = newBroker(deps.Events) + s.broker = newBroker(deps.Events, deps.Trust) runCtx, cancel := context.WithCancel(ctx) s.cancel = cancel @@ -146,13 +146,15 @@ type broker struct { subs map[string][]*subscriber rateMu sync.Mutex rate map[*subscriber]*publishBucket - events coreapi.EventBus // for pubsub.* observability events + events coreapi.EventBus // for pubsub.* observability events + trust coreapi.TrustChecker // optional — nil if trustedagents not loaded } -func newBroker(events coreapi.EventBus) *broker { +func newBroker(events coreapi.EventBus, trust coreapi.TrustChecker) *broker { return &broker{ subs: make(map[string][]*subscriber), events: events, + trust: trust, } } @@ -228,6 +230,26 @@ func (b *broker) handleConn(sub *subscriber) { } slog.Info("eventstream broker: subscribe received", "remote", sub.remote(), "topic", subEvt.Topic) topic = subEvt.Topic + + // PILOT-251: gate subscription. Any peer that completes L6 key + // exchange can connect to port 1002; without an auth gate, any + // peer can subscribe to any topic (metadata leak + content read + // for unencrypted topics). If a TrustChecker is available, only + // trusted peers may subscribe. If none is loaded (unusual), + // subscription is denied — fail-closed is the security-safe default. + if b.trust != nil { + _, ok := b.trust.IsTrusted(sub.conn.RemoteAddr().Node) + if !ok { + slog.Warn("eventstream broker: subscription rejected — peer not trusted", + "remote", sub.remote(), "topic", topic) + return + } + } else { + slog.Warn("eventstream broker: subscription rejected — no TrustChecker loaded, denying by default", + "remote", sub.remote(), "topic", topic) + return + } + b.addSub(topic, sub) if b.events != nil { b.events.Publish("pubsub.subscribed", map[string]any{ diff --git a/zz_added_coverage_test.go b/zz_added_coverage_test.go index 3ab3a7f..30b01bf 100644 --- a/zz_added_coverage_test.go +++ b/zz_added_coverage_test.go @@ -63,6 +63,31 @@ func (s *stubEventBus) count(topic string) int { return n } +// --- stub TrustChecker ------------------------------------------------------- + +// stubTrustChecker is a TrustChecker that trusts (or denies) a configurable +// set of node IDs. Used by broker handleConn tests that exercise the +// subscribe/publish path — the trust gate must pass for those tests to +// reach the code under test. +type stubTrustChecker struct { + trusted map[uint32]bool + allowAll bool +} + +func newAllowAllTrustChecker() *stubTrustChecker { + return &stubTrustChecker{allowAll: true} +} + +func (t *stubTrustChecker) IsTrusted(nodeID uint32) (string, bool) { + if t.allowAll { + return "test-agent", true + } + if t.trusted != nil && t.trusted[nodeID] { + return "test-agent", true + } + return "", false +} + // --- net.Pipe-backed coreapi.Stream ----------------------------------------- // pipeStream adapts net.Pipe to coreapi.Stream so we can drive broker @@ -100,7 +125,7 @@ func (s *pipeStream) SetWriteDeadline(time.Time) error { return nil } func TestBroker_HandleConn_SubscribeEmitsEvent(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -153,7 +178,7 @@ func TestBroker_HandleConn_SubscribeEmitsEvent(t *testing.T) { func TestBroker_HandleConn_SubscribeReadFails(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -190,7 +215,7 @@ func TestBroker_HandleConn_SubscribeReadFails(t *testing.T) { func TestBroker_HandleConn_PublishedEventBusBranch(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -248,7 +273,7 @@ func TestBroker_HandleConn_PublishedEventBusBranch(t *testing.T) { func TestBroker_HandleConn_RateLimitBusBranch(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -315,7 +340,7 @@ func TestBroker_HandleConn_RateLimitBusBranch(t *testing.T) { // (lines 312-315 — wildcard subscriber receives non-wildcard event). func TestPublishWith_WildcardFanout(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) topicSub := stubSubscriber() wildSub := stubSubscriber() @@ -348,7 +373,7 @@ func TestPublishWith_WildcardFanout(t *testing.T) { // should not receive its own publish. func TestPublishWith_WildcardSenderSkipped(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) wildSender := stubSubscriber() wildOther := stubSubscriber() @@ -381,7 +406,7 @@ func TestPublishWith_WildcardSenderSkipped(t *testing.T) { // otherwise double-deliver). func TestPublishWith_TopicIsWildcardDoesNotDoubleDeliver(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) wildSub := stubSubscriber() b.addSub("*", wildSub) @@ -408,7 +433,7 @@ func TestPublishWith_TopicIsWildcardDoesNotDoubleDeliver(t *testing.T) { func TestPublishWith_BusPublishedBranch(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, nil) sub := stubSubscriber() b.addSub("t", sub) @@ -452,7 +477,7 @@ func TestService_StopContextCancel(t *testing.T) { // so it must be clamped. func TestTakeToken_RefillCappedAtBurst(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() // Seed a bucket with a lastRefill far in the past so the elapsed @@ -674,3 +699,42 @@ func TestServer_Publish_TopicLiteralStar(t *testing.T) { // expected — no second delivery. } } + +// TestBroker_HandleConn_SubscriptionRejectedByTrustGate covers PILOT-251: +// when a TrustChecker is present and the peer is NOT trusted, handleConn +// must reject the subscription immediately without adding to subs. +func TestBroker_HandleConn_SubscriptionRejectedByTrustGate(t *testing.T) { + t.Parallel() + + // Trust checker that trusts nobody. + denyAll := &stubTrustChecker{trusted: map[uint32]bool{}} + b := newBroker(nil, denyAll) + + subStream, clientEnd := newPipeStreamPair() + sub := newSubscriber(subStream) + + done := make(chan struct{}) + go func() { + defer close(done) + b.handleConn(sub) + }() + + // Write a subscribe event — should be rejected. + if err := WriteEvent(clientEnd, &Event{Topic: "secret"}); err != nil { + t.Fatalf("WriteEvent subscribe: %v", err) + } + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("handleConn did not return after trust rejection") + } + + // Verify the subscription was NOT registered. + b.mu.RLock() + n := len(b.subs["secret"]) + b.mu.RUnlock() + if n != 0 { + t.Errorf("subscription should not be registered when trust check fails, got %d subs", n) + } +} diff --git a/zz_more_test.go b/zz_more_test.go index 9ec3ffe..b919391 100644 --- a/zz_more_test.go +++ b/zz_more_test.go @@ -55,7 +55,7 @@ func TestNewSubscriber_NilWrap(t *testing.T) { // without going through handleConn. func TestBrokerAddRemoveSub_DirectCalls(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub1 := stubSubscriber() sub2 := stubSubscriber() @@ -89,7 +89,7 @@ func TestBrokerAddRemoveSub_DirectCalls(t *testing.T) { // publisher's bucket should be gone after the call. func TestBrokerForgetPublisher(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() // Prime the bucket. if !b.takeToken(sub) { @@ -107,7 +107,7 @@ func TestBrokerForgetPublisher(t *testing.T) { // TestTakeToken_BurstAndRecover exercises the rate-limit drain + refill. func TestTakeToken_BurstAndRecover(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() // Drain the entire burst budget. drained := 0 diff --git a/zz_resilience_test.go b/zz_resilience_test.go index bdb1ee5..5ebf4d3 100644 --- a/zz_resilience_test.go +++ b/zz_resilience_test.go @@ -47,7 +47,7 @@ func stubSubscriber() *subscriber { // the retry succeeds — subscriber stays alive, counter never increments. func TestPubsubRetryAbsorbsSingleTransientFailure(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() b.addSub("topic-a", sub) @@ -73,7 +73,7 @@ func TestPubsubRetryAbsorbsSingleTransientFailure(t *testing.T) { // maxConsecutivePublishFailures across publishes. func TestPubsubKeepsSubscriberBelowFailureThreshold(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() b.addSub("topic-fail", sub) @@ -107,7 +107,7 @@ func TestPubsubKeepsSubscriberBelowFailureThreshold(t *testing.T) { // counter restarted from 0, threshold is 3). func TestPubsubFailureCounterResetsOnSuccess(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() b.addSub("topic-mix", sub) @@ -151,7 +151,7 @@ func TestPubsubFailureCounterResetsOnSuccess(t *testing.T) { // the flaky sub drops. func TestPubsubOneSubscribersFailureDoesNotKillOthers(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) healthy := stubSubscriber() flaky := stubSubscriber() b.addSub("shared", healthy) diff --git a/zz_service_e2e_test.go b/zz_service_e2e_test.go index 82aad0b..fed03f7 100644 --- a/zz_service_e2e_test.go +++ b/zz_service_e2e_test.go @@ -115,7 +115,7 @@ func TestService_AcceptLoopAndPubSub(t *testing.T) { pubStream := newSinkStream(pubC2sR) listener := newESListener(subStream, pubStream) - deps := coreapi.Deps{Streams: &esFakeStreams{ln: listener}} + deps := coreapi.Deps{Streams: &esFakeStreams{ln: listener}, Trust: newAllowAllTrustChecker()} s := NewService() if err := s.Start(context.Background(), deps); err != nil {