From 1d53f8648fd1bdb7557428b31828af41e6a928aa Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Fri, 29 May 2026 22:10:59 +0000 Subject: [PATCH] fix(eventstream): gate subscription on TrustChecker (PILOT-251) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handleConn previously added any connecting peer to the requested topic map without authorization. Any peer that completed L6 key exchange (no pre-trust required) could subscribe to any topic, leaking metadata and unencrypted content. Fix: after reading the subscribe envelope, check the peer against the TrustChecker (optional Deps field). If the checker is present and the peer is not trusted, reject the subscription. If no TrustChecker is loaded, deny by default (fail-closed) — a missing trust subsystem is an unusual state and the security-safe default is to reject. This mirrors the registryBound gate pattern from the handshake plugin (PILOT-228, handshake/handshake.go:603/630): trust must be established before automatic access grants. Closes PILOT-251 --- service.go | 28 +++++++++++-- zz_added_coverage_test.go | 82 ++++++++++++++++++++++++++++++++++----- zz_more_test.go | 6 +-- zz_resilience_test.go | 8 ++-- zz_service_e2e_test.go | 2 +- 5 files changed, 106 insertions(+), 20 deletions(-) diff --git a/service.go b/service.go index a346de8..305a6b2 100644 --- a/service.go +++ b/service.go @@ -66,7 +66,7 @@ func (s *Service) Start(ctx context.Context, deps coreapi.Deps) error { return fmt.Errorf("eventstream: listen on port %d: %w", protocol.PortEventStream, err) } s.listener = ln - s.broker = newBroker(deps.Events) + s.broker = newBroker(deps.Events, deps.Trust) runCtx, cancel := context.WithCancel(ctx) s.cancel = cancel @@ -146,13 +146,15 @@ type broker struct { subs map[string][]*subscriber rateMu sync.Mutex rate map[*subscriber]*publishBucket - events coreapi.EventBus // for pubsub.* observability events + events coreapi.EventBus // for pubsub.* observability events + trust coreapi.TrustChecker // optional — nil if trustedagents not loaded } -func newBroker(events coreapi.EventBus) *broker { +func newBroker(events coreapi.EventBus, trust coreapi.TrustChecker) *broker { return &broker{ subs: make(map[string][]*subscriber), events: events, + trust: trust, } } @@ -228,6 +230,26 @@ func (b *broker) handleConn(sub *subscriber) { } slog.Info("eventstream broker: subscribe received", "remote", sub.remote(), "topic", subEvt.Topic) topic = subEvt.Topic + + // PILOT-251: gate subscription. Any peer that completes L6 key + // exchange can connect to port 1002; without an auth gate, any + // peer can subscribe to any topic (metadata leak + content read + // for unencrypted topics). If a TrustChecker is available, only + // trusted peers may subscribe. If none is loaded (unusual), + // subscription is denied — fail-closed is the security-safe default. + if b.trust != nil { + _, ok := b.trust.IsTrusted(sub.conn.RemoteAddr().Node) + if !ok { + slog.Warn("eventstream broker: subscription rejected — peer not trusted", + "remote", sub.remote(), "topic", topic) + return + } + } else { + slog.Warn("eventstream broker: subscription rejected — no TrustChecker loaded, denying by default", + "remote", sub.remote(), "topic", topic) + return + } + b.addSub(topic, sub) if b.events != nil { b.events.Publish("pubsub.subscribed", map[string]any{ diff --git a/zz_added_coverage_test.go b/zz_added_coverage_test.go index 3ab3a7f..30b01bf 100644 --- a/zz_added_coverage_test.go +++ b/zz_added_coverage_test.go @@ -63,6 +63,31 @@ func (s *stubEventBus) count(topic string) int { return n } +// --- stub TrustChecker ------------------------------------------------------- + +// stubTrustChecker is a TrustChecker that trusts (or denies) a configurable +// set of node IDs. Used by broker handleConn tests that exercise the +// subscribe/publish path — the trust gate must pass for those tests to +// reach the code under test. +type stubTrustChecker struct { + trusted map[uint32]bool + allowAll bool +} + +func newAllowAllTrustChecker() *stubTrustChecker { + return &stubTrustChecker{allowAll: true} +} + +func (t *stubTrustChecker) IsTrusted(nodeID uint32) (string, bool) { + if t.allowAll { + return "test-agent", true + } + if t.trusted != nil && t.trusted[nodeID] { + return "test-agent", true + } + return "", false +} + // --- net.Pipe-backed coreapi.Stream ----------------------------------------- // pipeStream adapts net.Pipe to coreapi.Stream so we can drive broker @@ -100,7 +125,7 @@ func (s *pipeStream) SetWriteDeadline(time.Time) error { return nil } func TestBroker_HandleConn_SubscribeEmitsEvent(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -153,7 +178,7 @@ func TestBroker_HandleConn_SubscribeEmitsEvent(t *testing.T) { func TestBroker_HandleConn_SubscribeReadFails(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -190,7 +215,7 @@ func TestBroker_HandleConn_SubscribeReadFails(t *testing.T) { func TestBroker_HandleConn_PublishedEventBusBranch(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -248,7 +273,7 @@ func TestBroker_HandleConn_PublishedEventBusBranch(t *testing.T) { func TestBroker_HandleConn_RateLimitBusBranch(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, newAllowAllTrustChecker()) subStream, clientEnd := newPipeStreamPair() sub := newSubscriber(subStream) @@ -315,7 +340,7 @@ func TestBroker_HandleConn_RateLimitBusBranch(t *testing.T) { // (lines 312-315 — wildcard subscriber receives non-wildcard event). func TestPublishWith_WildcardFanout(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) topicSub := stubSubscriber() wildSub := stubSubscriber() @@ -348,7 +373,7 @@ func TestPublishWith_WildcardFanout(t *testing.T) { // should not receive its own publish. func TestPublishWith_WildcardSenderSkipped(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) wildSender := stubSubscriber() wildOther := stubSubscriber() @@ -381,7 +406,7 @@ func TestPublishWith_WildcardSenderSkipped(t *testing.T) { // otherwise double-deliver). func TestPublishWith_TopicIsWildcardDoesNotDoubleDeliver(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) wildSub := stubSubscriber() b.addSub("*", wildSub) @@ -408,7 +433,7 @@ func TestPublishWith_TopicIsWildcardDoesNotDoubleDeliver(t *testing.T) { func TestPublishWith_BusPublishedBranch(t *testing.T) { t.Parallel() bus := &stubEventBus{} - b := newBroker(bus) + b := newBroker(bus, nil) sub := stubSubscriber() b.addSub("t", sub) @@ -452,7 +477,7 @@ func TestService_StopContextCancel(t *testing.T) { // so it must be clamped. func TestTakeToken_RefillCappedAtBurst(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() // Seed a bucket with a lastRefill far in the past so the elapsed @@ -674,3 +699,42 @@ func TestServer_Publish_TopicLiteralStar(t *testing.T) { // expected — no second delivery. } } + +// TestBroker_HandleConn_SubscriptionRejectedByTrustGate covers PILOT-251: +// when a TrustChecker is present and the peer is NOT trusted, handleConn +// must reject the subscription immediately without adding to subs. +func TestBroker_HandleConn_SubscriptionRejectedByTrustGate(t *testing.T) { + t.Parallel() + + // Trust checker that trusts nobody. + denyAll := &stubTrustChecker{trusted: map[uint32]bool{}} + b := newBroker(nil, denyAll) + + subStream, clientEnd := newPipeStreamPair() + sub := newSubscriber(subStream) + + done := make(chan struct{}) + go func() { + defer close(done) + b.handleConn(sub) + }() + + // Write a subscribe event — should be rejected. + if err := WriteEvent(clientEnd, &Event{Topic: "secret"}); err != nil { + t.Fatalf("WriteEvent subscribe: %v", err) + } + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("handleConn did not return after trust rejection") + } + + // Verify the subscription was NOT registered. + b.mu.RLock() + n := len(b.subs["secret"]) + b.mu.RUnlock() + if n != 0 { + t.Errorf("subscription should not be registered when trust check fails, got %d subs", n) + } +} diff --git a/zz_more_test.go b/zz_more_test.go index 9ec3ffe..b919391 100644 --- a/zz_more_test.go +++ b/zz_more_test.go @@ -55,7 +55,7 @@ func TestNewSubscriber_NilWrap(t *testing.T) { // without going through handleConn. func TestBrokerAddRemoveSub_DirectCalls(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub1 := stubSubscriber() sub2 := stubSubscriber() @@ -89,7 +89,7 @@ func TestBrokerAddRemoveSub_DirectCalls(t *testing.T) { // publisher's bucket should be gone after the call. func TestBrokerForgetPublisher(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() // Prime the bucket. if !b.takeToken(sub) { @@ -107,7 +107,7 @@ func TestBrokerForgetPublisher(t *testing.T) { // TestTakeToken_BurstAndRecover exercises the rate-limit drain + refill. func TestTakeToken_BurstAndRecover(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() // Drain the entire burst budget. drained := 0 diff --git a/zz_resilience_test.go b/zz_resilience_test.go index bdb1ee5..5ebf4d3 100644 --- a/zz_resilience_test.go +++ b/zz_resilience_test.go @@ -47,7 +47,7 @@ func stubSubscriber() *subscriber { // the retry succeeds — subscriber stays alive, counter never increments. func TestPubsubRetryAbsorbsSingleTransientFailure(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() b.addSub("topic-a", sub) @@ -73,7 +73,7 @@ func TestPubsubRetryAbsorbsSingleTransientFailure(t *testing.T) { // maxConsecutivePublishFailures across publishes. func TestPubsubKeepsSubscriberBelowFailureThreshold(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() b.addSub("topic-fail", sub) @@ -107,7 +107,7 @@ func TestPubsubKeepsSubscriberBelowFailureThreshold(t *testing.T) { // counter restarted from 0, threshold is 3). func TestPubsubFailureCounterResetsOnSuccess(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) sub := stubSubscriber() b.addSub("topic-mix", sub) @@ -151,7 +151,7 @@ func TestPubsubFailureCounterResetsOnSuccess(t *testing.T) { // the flaky sub drops. func TestPubsubOneSubscribersFailureDoesNotKillOthers(t *testing.T) { t.Parallel() - b := newBroker(nil) + b := newBroker(nil, nil) healthy := stubSubscriber() flaky := stubSubscriber() b.addSub("shared", healthy) diff --git a/zz_service_e2e_test.go b/zz_service_e2e_test.go index 82aad0b..fed03f7 100644 --- a/zz_service_e2e_test.go +++ b/zz_service_e2e_test.go @@ -115,7 +115,7 @@ func TestService_AcceptLoopAndPubSub(t *testing.T) { pubStream := newSinkStream(pubC2sR) listener := newESListener(subStream, pubStream) - deps := coreapi.Deps{Streams: &esFakeStreams{ln: listener}} + deps := coreapi.Deps{Streams: &esFakeStreams{ln: listener}, Trust: newAllowAllTrustChecker()} s := NewService() if err := s.Start(context.Background(), deps); err != nil {