From 862e2fd3b58f499013e16e069fb8a42b13fcabb2 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 15:10:08 +0900 Subject: [PATCH 1/4] =?UTF-8?q?feat(encryption):=20Stage=206D-3=20-=20Capa?= =?UTF-8?q?bilityFanout=20helper=20(Voters=20=E2=88=AA=20Learners=20GetCap?= =?UTF-8?q?ability=20fan-out)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the §4.1 capability fan-out helper that the 6D-6 EnableStorageEnvelope cutover RPC will call to verify every Voter ∪ Learner of every Raft group is reachable and reports encryption_capable=true before the cutover entry is proposed. The helper: - Walks every (group, member) pair across voters AND learners (§4.1 / §8 row "One learner unreachable" — learners count as hard refusals the same way voters do) - Dedupes by FullNodeID — a node serving multiple groups is probed exactly once - Dials all targets concurrently with one shared timeout (default = one RaftElectionTimeout per §4.3) - Surfaces dial / RPC failure as Reachable=false verdicts so the operator-facing error can name the unreachable peer - Sets Result.OK = true iff every verdict has Reachable && EncryptionCapable (no partial-success mode per §4.3) Per §9 the unit tests are table-driven and cover the four pinned cases (every-capable, one-not-capable, one-unreachable, duplicate-nodes-deduplicated) plus three input-validation cases (nil DialFunc, non-positive timeout, empty snapshot). One deliberate deviation from the design doc: the result struct is named CapabilityFanoutResult rather than FanoutResult to avoid colliding with the unrelated admin.FanoutResult that already ships in keyviz_fanout.go (same package). The contract is otherwise unchanged. Operator-inert: this helper has no callers in-tree yet. 6D-6 wires it. Self-review (5 lenses): 1. Data loss — N/A; helper is read-only. 2. Concurrency / distributed failures — fan-out is goroutine-per-target with a single context.WithTimeout governing the deadline. Result aggregation guarded by sync.Mutex. The dedup phase runs before goroutines launch so the closure-loop-var case ("for _, m := range" capturing m in a goroutine) cannot race; m is passed by value as the goroutine argument. No partial-success mode — a single unreachable learner is a hard "no" per §8. 3. Performance — concurrent dials bounded by len(unique FullNodeID) per cluster; no quadratic fan-out. dedupKeys map sized at len; results slice preallocated with capacity = len(dedupKeys). The hot path (uint64ToDecimal) skips fmt to avoid one allocation per dedup-key build. 4. Data consistency — N/A; helper is a synchronous probe with no side effects. Per §4.2 it deliberately does NOT cache results — every cutover probes fresh. 5. Test coverage — table-driven §9 cases all pin. Dedup contract asserted by counting per-address dial calls (stubDial.calls). Input-validation cases pin the bad-input refusal posture. Design-doc-first: renamed `*_proposed_*` → `*_partial_*` and updated the doc to record what shipped (6D-1, 6D-2, 6D-3) and what remains (6D-4, 6D-5, 6D-6). --- ..._18_partial_6d_enable_storage_envelope.md} | 26 +- internal/admin/capability_fanout.go | 241 +++++++++++++++ internal/admin/capability_fanout_test.go | 292 ++++++++++++++++++ 3 files changed, 558 insertions(+), 1 deletion(-) rename docs/design/{2026_05_18_proposed_6d_enable_storage_envelope.md => 2026_05_18_partial_6d_enable_storage_envelope.md} (97%) create mode 100644 internal/admin/capability_fanout.go create mode 100644 internal/admin/capability_fanout_test.go diff --git a/docs/design/2026_05_18_proposed_6d_enable_storage_envelope.md b/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md similarity index 97% rename from docs/design/2026_05_18_proposed_6d_enable_storage_envelope.md rename to docs/design/2026_05_18_partial_6d_enable_storage_envelope.md index 9fa99b08..6b6e72e8 100644 --- a/docs/design/2026_05_18_proposed_6d_enable_storage_envelope.md +++ b/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md @@ -2,12 +2,36 @@ | Field | Value | |---|---| -| Status | proposed | +| Status | partial — 6D-1 (doc), 6D-2 (startup guards), 6D-3 (capability fan-out helper) shipped; 6D-4, 6D-5, 6D-6 remain | | Date | 2026-05-18 | | Parent design | [`2026_04_29_partial_data_at_rest_encryption.md`](2026_04_29_partial_data_at_rest_encryption.md) | | Blockers (now satisfied) | 6B (KEK plumbing), 6C-1 / 6C-2 (startup guards), 6C-2d (`ErrSidecarBehindRaftLog` wiring) | | Bundles | 6C-3 (`ErrNodeIDCollision` + `ErrLocalEpochRollback` cluster-wide guards) | +### Shipped milestones (lifecycle: partial) + +- **6D-1** (doc) — this file. Landed with the initial proposal commit. +- **6D-2** (startup guards) — `ErrNodeIDCollision` primitive + (`internal/encryption/node_id_collision.go`), `ErrLocalEpochRollback` + primitive (`internal/encryption/local_epoch_rollback.go`), and the + `probe-node-id` admin CLI subcommand. Landed in PR #788. +- **6D-3** (capability fan-out helper) — `CapabilityFanout` in + `internal/admin/capability_fanout.go` with the table-driven §9 + unit tests. The §4.1 `FanoutResult` type was renamed + `CapabilityFanoutResult` to disambiguate from the unrelated + `admin.FanoutResult` already shipped in `keyviz_fanout.go`; the + contract is otherwise unchanged. + +### Open milestones + +- **6D-4** — `RotateSubEnableStorageEnvelope = 0x04` wire-format + addition + FSM apply-path dispatch + `StorageEnvelopeCutoverIndex` + sidecar field. +- **6D-5** — §6.2 storage-layer toggle (`PutAt` consults + `StorageEnvelopeActive`). +- **6D-6** — `EnableStorageEnvelope` admin RPC + CLI command + + integration test composing 6D-3 + 6D-4 + 6D-5. + ## 0. Why this doc exists Stage 6D is the first cutover that actually flips a cluster-wide diff --git a/internal/admin/capability_fanout.go b/internal/admin/capability_fanout.go new file mode 100644 index 00000000..536a318c --- /dev/null +++ b/internal/admin/capability_fanout.go @@ -0,0 +1,241 @@ +package admin + +import ( + "context" + "errors" + "sync" + "time" + + pb "github.com/bootjp/elastickv/proto" + pkgerrors "github.com/cockroachdb/errors" +) + +// errCapabilityFanoutBadInput is the sentinel wrapped by every +// input-validation refusal in this file so callers can errors.Is() +// against it without parsing strings. The concrete failure adds a +// %w-wrapped detail. +var errCapabilityFanoutBadInput = errors.New("capability fan-out: bad input") + +// RouteMember is one peer entry in a Raft group. The fan-out helper +// dials Address and identifies the node by FullNodeID for dedup +// across groups (a node serving multiple groups is probed once). +type RouteMember struct { + FullNodeID uint64 + Address string +} + +// RouteGroup is one Raft group's membership. Voters and Learners are +// kept separate at the input level so the cutover RPC handler can +// log them distinctly, but CapabilityFanout treats them identically +// per the §4.1 contract "every (voter ∪ learner) of every Raft +// group". The 6D design pins that learner unreachability is a hard +// no the same way voter unreachability is — see §8 / row "One +// learner unreachable during fan-out". +type RouteGroup struct { + GroupID uint64 + Voters []RouteMember + Learners []RouteMember +} + +// RouteSnapshot is the input the cutover RPC handler builds from +// the Raft engine's membership view and passes to CapabilityFanout. +// Independent of the route-catalog `distribution.CatalogSnapshot`, +// which only carries shard→group mappings, not per-group membership. +type RouteSnapshot struct { + Groups []RouteGroup +} + +// CapabilityVerdict is one node's per-call outcome. Reachable=false +// means the dial or the RPC timed out / failed at transport level — +// not a "no" answer from the peer. The cutover RPC handler treats +// both Reachable=false and EncryptionCapable=false as hard refusals +// per the §8 failure-modes table, but the verdict separates them so +// the operator-facing error message can name the precise reason. +type CapabilityVerdict struct { + FullNodeID uint64 + EncryptionCapable bool + BuildSHA string + SidecarPresent bool + Reachable bool + Err error +} + +// CapabilityFanoutResult is the aggregated outcome. OK is true iff +// every verdict has Reachable && EncryptionCapable — there is no +// partial-success mode per §4.3. +// +// Named CapabilityFanoutResult rather than the design-doc's +// `FanoutResult` to avoid a collision with the unrelated +// `admin.FanoutResult` in `keyviz_fanout.go` (KeyViz cluster fan-out +// shipped earlier in the same package). +type CapabilityFanoutResult struct { + Verdicts []CapabilityVerdict + OK bool +} + +// DialFunc opens a connection to one node's admin endpoint and +// returns an EncryptionAdmin client plus a cleanup closure. The +// helper invokes the closure exactly once per successful dial, +// regardless of how the RPC subsequently resolved. +// +// The 6D design says "DialFunc reuses the existing admin connection +// pool" — the concrete implementation will reach for whatever +// connection-pool helper the caller already holds (e.g. the +// `internal/admin.ForwardClient` connection pool for TLS-aware +// dials). +type DialFunc func(ctx context.Context, address string) (pb.EncryptionAdminClient, func(), error) + +// CapabilityFanout fans GetCapability out to every unique +// (voter ∪ learner) of every group in routes. Concurrent; bounded +// by timeout regardless of how many members respond. Missing +// responses surface as Reachable=false verdicts (no partial-success +// mode — see §4.3). +// +// Dedup key: FullNodeID. A node serving multiple groups is probed +// exactly once. Members with FullNodeID=0 are treated as distinct +// dedup keys per unique address; this case appears in stub catalogs +// where the dedup-by-id contract has not been satisfied — the +// helper still completes by falling back to address-based identity +// rather than silently collapsing every zero-id row into one probe. +// +// Returns (result, nil) on every input. The error slot is reserved +// for input validation failures (zero-member snapshot, etc.) so +// callers can keep their existing `err != nil → refuse` shape. +func CapabilityFanout( + ctx context.Context, + routes RouteSnapshot, + dial DialFunc, + timeout time.Duration, +) (CapabilityFanoutResult, error) { + if dial == nil { + return CapabilityFanoutResult{}, pkgerrors.Wrap(errCapabilityFanoutBadInput, "dial func is nil") + } + if timeout <= 0 { + return CapabilityFanoutResult{}, pkgerrors.Wrapf(errCapabilityFanoutBadInput, "timeout must be positive, got %v", timeout) + } + + dedupKeys := buildCapabilityFanoutDedupSet(routes) + if len(dedupKeys) == 0 { + return CapabilityFanoutResult{Verdicts: []CapabilityVerdict{}, OK: false}, nil + } + + fanCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + results := runCapabilityFanoutProbes(fanCtx, dedupKeys, dial) + return CapabilityFanoutResult{Verdicts: results, OK: capabilityFanoutAllOK(results)}, nil +} + +// buildCapabilityFanoutDedupSet folds every voter ∪ learner of +// every group into the dedup map keyed by FullNodeID (or address +// for unidentified rows). Pulled out of CapabilityFanout to keep +// the orchestration body under the cyclomatic-complexity budget. +func buildCapabilityFanoutDedupSet(routes RouteSnapshot) map[string]RouteMember { + dedupKeys := make(map[string]RouteMember) + for _, group := range routes.Groups { + for _, m := range group.Voters { + addCapabilityFanoutDedupTarget(dedupKeys, m) + } + for _, m := range group.Learners { + addCapabilityFanoutDedupTarget(dedupKeys, m) + } + } + return dedupKeys +} + +// runCapabilityFanoutProbes dials every dedup target concurrently +// and returns the per-node verdicts in unspecified order. Bounded +// by ctx — missing responses surface as Reachable=false. +func runCapabilityFanoutProbes(ctx context.Context, dedupKeys map[string]RouteMember, dial DialFunc) []CapabilityVerdict { + results := make([]CapabilityVerdict, 0, len(dedupKeys)) + var mu sync.Mutex + var wg sync.WaitGroup + for _, m := range dedupKeys { + wg.Add(1) + go func(member RouteMember) { + defer wg.Done() + verdict := probeCapability(ctx, member, dial) + mu.Lock() + results = append(results, verdict) + mu.Unlock() + }(m) + } + wg.Wait() + return results +} + +func capabilityFanoutAllOK(verdicts []CapabilityVerdict) bool { + if len(verdicts) == 0 { + return false + } + for _, v := range verdicts { + if !v.Reachable || !v.EncryptionCapable { + return false + } + } + return true +} + +// addCapabilityFanoutDedupTarget folds a member into the dedup map. +// Key choice: FullNodeID stringified when non-zero, address-prefixed +// otherwise. A zero FullNodeID is treated as "not yet identified" +// and falls back to address identity so two distinct stub rows +// don't collapse into one probe. +func addCapabilityFanoutDedupTarget(m map[string]RouteMember, member RouteMember) { + if member.Address == "" { + return + } + key := capabilityFanoutDedupKey(member) + if _, exists := m[key]; exists { + return + } + m[key] = member +} + +func capabilityFanoutDedupKey(member RouteMember) string { + if member.FullNodeID != 0 { + return "id:" + uint64ToDecimal(member.FullNodeID) + } + return "addr:" + member.Address +} + +// uint64ToDecimal avoids pulling fmt for one-call hot-path +// stringification used only to build dedup keys. +func uint64ToDecimal(v uint64) string { + if v == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for v > 0 { + i-- + buf[i] = byte('0' + v%10) + v /= 10 + } + return string(buf[i:]) +} + +func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) CapabilityVerdict { + verdict := CapabilityVerdict{FullNodeID: member.FullNodeID} + client, closer, err := dial(ctx, member.Address) + if err != nil { + verdict.Err = pkgerrors.Wrapf(err, "dial %s", member.Address) + return verdict + } + if closer != nil { + defer closer() + } + report, err := client.GetCapability(ctx, &pb.Empty{}) + if err != nil { + verdict.Err = pkgerrors.Wrapf(err, "GetCapability %s", member.Address) + return verdict + } + verdict.Reachable = true + verdict.EncryptionCapable = report.GetEncryptionCapable() + verdict.BuildSHA = report.GetBuildSha() + verdict.SidecarPresent = report.GetSidecarPresent() + if report.GetFullNodeId() != 0 { + verdict.FullNodeID = report.GetFullNodeId() + } + return verdict +} diff --git a/internal/admin/capability_fanout_test.go b/internal/admin/capability_fanout_test.go new file mode 100644 index 00000000..6c0c9e22 --- /dev/null +++ b/internal/admin/capability_fanout_test.go @@ -0,0 +1,292 @@ +package admin + +import ( + "context" + "errors" + "sort" + "sync/atomic" + "testing" + "time" + + pb "github.com/bootjp/elastickv/proto" + "google.golang.org/grpc" +) + +// stubEncryptionAdminClient is a minimal in-test client that +// implements only the methods CapabilityFanout actually invokes +// (GetCapability). Every other method panics so that an accidental +// future use is loud. +type stubEncryptionAdminClient struct { + pb.EncryptionAdminClient + report *pb.CapabilityReport + err error +} + +func (s *stubEncryptionAdminClient) GetCapability(ctx context.Context, _ *pb.Empty, _ ...grpc.CallOption) (*pb.CapabilityReport, error) { + if s.err != nil { + return nil, s.err + } + return s.report, nil +} + +// stubDial maps addresses to canned (client, dialErr) pairs and +// records how many times each address was dialled so the +// dedup-by-full_node_id contract is asserted exactly. +type stubDial struct { + clients map[string]*stubEncryptionAdminClient + dialErrs map[string]error + dialed atomic.Int64 + calls map[string]*atomic.Int64 +} + +func newStubDial() *stubDial { + return &stubDial{ + clients: map[string]*stubEncryptionAdminClient{}, + dialErrs: map[string]error{}, + calls: map[string]*atomic.Int64{}, + } +} + +func (s *stubDial) addOK(addr string, report *pb.CapabilityReport) { + s.clients[addr] = &stubEncryptionAdminClient{report: report} + var c atomic.Int64 + s.calls[addr] = &c +} + +func (s *stubDial) addDialErr(addr string, err error) { + s.dialErrs[addr] = err + var c atomic.Int64 + s.calls[addr] = &c +} + +func (s *stubDial) dial(_ context.Context, addr string) (pb.EncryptionAdminClient, func(), error) { + s.dialed.Add(1) + if c := s.calls[addr]; c != nil { + c.Add(1) + } + if err, ok := s.dialErrs[addr]; ok { + return nil, nil, err + } + client, ok := s.clients[addr] + if !ok { + return nil, nil, errors.New("stubDial: no client registered for " + addr) + } + return client, func() {}, nil +} + +// TestCapabilityFanout_AllCapable pins the §9 happy-path: every +// node returns encryption_capable=true → Result.OK=true, every +// verdict reachable. +func TestCapabilityFanout_AllCapable(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true, BuildSha: "abc"}) + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 2, EncryptionCapable: true, SidecarPresent: true, BuildSha: "abc"}) + stub.addOK("n3:9000", &pb.CapabilityReport{FullNodeId: 3, EncryptionCapable: true, SidecarPresent: true, BuildSha: "abc"}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + }, + Learners: []RouteMember{{FullNodeID: 3, Address: "n3:9000"}}, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !res.OK { + t.Fatalf("expected OK=true with all-capable verdicts, got %+v", res) + } + if len(res.Verdicts) != 3 { + t.Fatalf("expected 3 verdicts, got %d: %+v", len(res.Verdicts), res.Verdicts) + } + for _, v := range res.Verdicts { + if !v.Reachable || !v.EncryptionCapable { + t.Errorf("verdict %+v should be Reachable && EncryptionCapable", v) + } + } +} + +// TestCapabilityFanout_OneNotCapable pins the §9 case where every +// member dials successfully but one reports +// encryption_capable=false → Result.OK=false. The verdict list +// still includes every node so the operator-facing error can name +// the refuser. +func TestCapabilityFanout_OneNotCapable(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 2, EncryptionCapable: false, SidecarPresent: false}) + stub.addOK("n3:9000", &pb.CapabilityReport{FullNodeId: 3, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + {FullNodeID: 3, Address: "n3:9000"}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when one verdict has EncryptionCapable=false, got OK=true; verdicts=%+v", res.Verdicts) + } + var refuser *CapabilityVerdict + for i := range res.Verdicts { + if res.Verdicts[i].FullNodeID == 2 { + refuser = &res.Verdicts[i] + } + } + if refuser == nil { + t.Fatal("expected verdict for node 2 to be present so operator can name the refuser") + } + if refuser.Reachable != true || refuser.EncryptionCapable != false { + t.Errorf("refuser verdict shape wrong: %+v (want Reachable=true, EncryptionCapable=false)", refuser) + } +} + +// TestCapabilityFanout_OneUnreachable pins the §9 case where one +// member's dial fails → Reachable=false on that verdict and +// Result.OK=false. The verdict list still includes the unreachable +// node so the operator-facing error can name it. +func TestCapabilityFanout_OneUnreachable(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + stub.addDialErr("n2:9000", errors.New("simulated dial timeout")) + stub.addOK("n3:9000", &pb.CapabilityReport{FullNodeId: 3, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + {FullNodeID: 3, Address: "n3:9000"}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when one node is unreachable, got OK=true; verdicts=%+v", res.Verdicts) + } + var unreachable *CapabilityVerdict + for i := range res.Verdicts { + if res.Verdicts[i].FullNodeID == 2 { + unreachable = &res.Verdicts[i] + } + } + if unreachable == nil { + t.Fatal("expected verdict for node 2 to be present") + } + if unreachable.Reachable { + t.Errorf("expected Reachable=false for dial-failed node, got %+v", unreachable) + } + if unreachable.Err == nil { + t.Errorf("expected Err to be set for dial-failed verdict, got nil") + } +} + +// TestCapabilityFanout_DeduplicatesByFullNodeID pins the §4.1 +// contract: a node serving multiple groups is probed exactly once. +// The same FullNodeID appears under group 1 (as voter) and group 2 +// (as voter) and group 3 (as learner) — expect exactly one dial +// per unique FullNodeID, exactly one verdict per unique FullNodeID. +func TestCapabilityFanout_DeduplicatesByFullNodeID(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 2, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{ + {GroupID: 1, Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + }}, + {GroupID: 2, Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + }}, + {GroupID: 3, Learners: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + }}, + }} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !res.OK { + t.Fatalf("expected OK=true, got %+v", res) + } + if len(res.Verdicts) != 2 { + t.Fatalf("expected 2 deduplicated verdicts, got %d: %+v", len(res.Verdicts), res.Verdicts) + } + ids := []uint64{res.Verdicts[0].FullNodeID, res.Verdicts[1].FullNodeID} + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + if ids[0] != 1 || ids[1] != 2 { + t.Errorf("expected verdicts for {1, 2}, got %v", ids) + } + if got := stub.dialed.Load(); got != 2 { + t.Errorf("expected exactly 2 dials (one per unique FullNodeID), got %d", got) + } + for addr, c := range stub.calls { + if c.Load() != 1 { + t.Errorf("address %q dialed %d times, expected 1", addr, c.Load()) + } + } +} + +// TestCapabilityFanout_NilDialRejected pins the input-validation +// contract: a nil DialFunc is a programmer error and returns +// (zero-value, err) so callers fail loudly instead of silently +// completing with zero verdicts. +func TestCapabilityFanout_NilDialRejected(t *testing.T) { + t.Parallel() + _, err := CapabilityFanout(context.Background(), RouteSnapshot{}, nil, time.Second) + if err == nil { + t.Fatal("expected error for nil dial func") + } +} + +// TestCapabilityFanout_NonPositiveTimeoutRejected pins the input +// validation: a zero or negative timeout would behave like no +// timeout and is a misuse worth refusing. +func TestCapabilityFanout_NonPositiveTimeoutRejected(t *testing.T) { + t.Parallel() + _, err := CapabilityFanout(context.Background(), RouteSnapshot{}, func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return nil, nil, errors.New("never called") + }, 0) + if err == nil { + t.Fatal("expected error for zero timeout") + } +} + +// TestCapabilityFanout_EmptySnapshot pins the empty-input case: +// returns OK=false with zero verdicts but no error (the cutover RPC +// can decide its own semantics for "no members" — typically refuse +// because the catalog snapshot is bad). +func TestCapabilityFanout_EmptySnapshot(t *testing.T) { + t.Parallel() + res, err := CapabilityFanout(context.Background(), RouteSnapshot{}, func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return nil, nil, errors.New("never called") + }, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Errorf("expected OK=false on empty snapshot, got OK=true") + } + if len(res.Verdicts) != 0 { + t.Errorf("expected zero verdicts, got %d", len(res.Verdicts)) + } +} From b46d1f5e2499d9dccab93b4bb6bbc79c5386cf3e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 15:21:12 +0900 Subject: [PATCH 2/4] fix(encryption): PR793 round-1 - codex P1 + gemini high + coderabbit major (fail-closed on malformed members) + minor doc + medium preallocation Three reviewer findings from round-1 of PR #793, all addressed: ## codex P1 / gemini high / coderabbit major - fail-closed on empty address Pre-fix: addCapabilityFanoutDedupTarget early-returned when member.Address == "", silently dropping malformed catalog rows from Result.Verdicts. The cutover RPC would then see fewer verdicts than members, and OK=true could sneak through against an unprobed peer (the section 4.1 contract requires every voter union learner to be reached). Fix: admit every row into the dedup map; classify empty-Address members in probeCapability as Reachable=false with a new errCapabilityFanoutMalformedMember sentinel so the cutover RPC caller-facing error can name the misconfigured member. Result.OK collapses to false the same way an unreachable real peer would. Sibling case (fully-malformed row: FullNodeID=0 AND Address=""): use a synthetic ordinal-based dedup key so two malformed rows surface as two separate verdicts rather than collapsing into one (which would hide one of the two misconfigurations). Regression tests per CLAUDE.md test-then-fix convention: - TestCapabilityFanout_EmptyAddressFailsClosed - pins empty-Address row appears as Reachable=false in Result.Verdicts with Err set, OK=false. - TestCapabilityFanout_FullyMalformedRowsNotCollapsed - pins two malformed rows yield two separate verdicts. Both tests confirmed failing on the pre-fix code; both pass on the fix. ## caller audit (semantic change) addCapabilityFanoutDedupTarget and probeCapability are both file-private. Sole callers (buildCapabilityFanoutDedupSet, runCapabilityFanoutProbes) are also in this file. No external propagation. The public-facing semantic change is "Result.Verdicts now includes malformed-row entries as Reachable=false" - this is strictly safer than the pre-fix silent drop and the OK computation in capabilityFanoutAllOK already requires Reachable=true so no caller-side change is needed. ## coderabbit minor - MD001 heading-level skip Promoted "### Shipped milestones" and "### Open milestones" to H2 in the partial design doc. The pre-fix doc went directly from H1 to H3 with no H2 in between, tripping MD001. ## gemini medium - preallocate dedup map buildCapabilityFanoutDedupSet now pre-sizes the dedup map with sum(len(group.Voters) + len(group.Learners)) as the upper bound. The map shrinks at no runtime cost when dedup collapses entries. ## test/lint evidence - go test -run TestCapabilityFanout ./internal/admin/... - all 9 tests pass (7 pre-existing + 2 new regressions) - golangci-lint run internal/admin/... - 0 issues --- ...5_18_partial_6d_enable_storage_envelope.md | 4 +- internal/admin/capability_fanout.go | 62 +++++++++++--- internal/admin/capability_fanout_test.go | 82 +++++++++++++++++++ 3 files changed, 133 insertions(+), 15 deletions(-) diff --git a/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md b/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md index 6b6e72e8..10672c9e 100644 --- a/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md +++ b/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md @@ -8,7 +8,7 @@ | Blockers (now satisfied) | 6B (KEK plumbing), 6C-1 / 6C-2 (startup guards), 6C-2d (`ErrSidecarBehindRaftLog` wiring) | | Bundles | 6C-3 (`ErrNodeIDCollision` + `ErrLocalEpochRollback` cluster-wide guards) | -### Shipped milestones (lifecycle: partial) +## Shipped milestones (lifecycle: partial) - **6D-1** (doc) — this file. Landed with the initial proposal commit. - **6D-2** (startup guards) — `ErrNodeIDCollision` primitive @@ -22,7 +22,7 @@ `admin.FanoutResult` already shipped in `keyviz_fanout.go`; the contract is otherwise unchanged. -### Open milestones +## Open milestones - **6D-4** — `RotateSubEnableStorageEnvelope = 0x04` wire-format addition + FSM apply-path dispatch + `StorageEnvelopeCutoverIndex` diff --git a/internal/admin/capability_fanout.go b/internal/admin/capability_fanout.go index 536a318c..736ade89 100644 --- a/internal/admin/capability_fanout.go +++ b/internal/admin/capability_fanout.go @@ -3,6 +3,7 @@ package admin import ( "context" "errors" + "strconv" "sync" "time" @@ -16,6 +17,15 @@ import ( // %w-wrapped detail. var errCapabilityFanoutBadInput = errors.New("capability fan-out: bad input") +// errCapabilityFanoutMalformedMember is the sentinel set on the +// Err field of verdicts whose RouteMember rows arrived malformed +// (missing Address) from the caller's route-snapshot construction. +// The verdict still appears in Result.Verdicts so the cutover RPC +// fails closed (OK=false) and the operator can name the +// misconfigured member, rather than silently dropping the row and +// letting OK=true sneak through against an unprobed peer. +var errCapabilityFanoutMalformedMember = errors.New("capability fan-out: malformed route member") + // RouteMember is one peer entry in a Raft group. The fan-out helper // dials Address and identifies the node by FullNodeID for dedup // across groups (a node serving multiple groups is probed once). @@ -128,10 +138,18 @@ func CapabilityFanout( // buildCapabilityFanoutDedupSet folds every voter ∪ learner of // every group into the dedup map keyed by FullNodeID (or address -// for unidentified rows). Pulled out of CapabilityFanout to keep -// the orchestration body under the cyclomatic-complexity budget. +// for unidentified rows; a synthetic key for fully-malformed rows +// with neither FullNodeID nor Address — see capabilityFanoutDedupKey). +// Pulled out of CapabilityFanout to keep the orchestration body +// under the cyclomatic-complexity budget. The size hint is the +// upper bound on cardinality (every row distinct); the map shrinks +// at no cost when dedup collapses entries. func buildCapabilityFanoutDedupSet(routes RouteSnapshot) map[string]RouteMember { - dedupKeys := make(map[string]RouteMember) + upperBound := 0 + for _, group := range routes.Groups { + upperBound += len(group.Voters) + len(group.Learners) + } + dedupKeys := make(map[string]RouteMember, upperBound) for _, group := range routes.Groups { for _, m := range group.Voters { addCapabilityFanoutDedupTarget(dedupKeys, m) @@ -177,26 +195,40 @@ func capabilityFanoutAllOK(verdicts []CapabilityVerdict) bool { } // addCapabilityFanoutDedupTarget folds a member into the dedup map. -// Key choice: FullNodeID stringified when non-zero, address-prefixed -// otherwise. A zero FullNodeID is treated as "not yet identified" -// and falls back to address identity so two distinct stub rows -// don't collapse into one probe. +// Fail-closed posture: malformed rows (empty Address) are NOT +// dropped here — they would silently disappear from Result.Verdicts +// and let OK=true sneak through despite an unprobed peer. Instead +// every row is admitted; probeCapability classifies empty-Address +// members as Reachable=false with errCapabilityFanoutMalformedMember +// so the cutover RPC's caller-facing error can name the missing +// address. func addCapabilityFanoutDedupTarget(m map[string]RouteMember, member RouteMember) { - if member.Address == "" { - return - } - key := capabilityFanoutDedupKey(member) + key := capabilityFanoutDedupKey(member, len(m)) if _, exists := m[key]; exists { return } m[key] = member } -func capabilityFanoutDedupKey(member RouteMember) string { +// capabilityFanoutDedupKey is the dedup key for a route member. +// +// - FullNodeID != 0 → "id:" (the §4.1 dedup contract) +// - FullNodeID == 0 ∧ Address != "" → "addr:
" +// (stub catalogs where dedup-by-id isn't satisfied still +// dedupe by address rather than silently collapsing rows) +// - FullNodeID == 0 ∧ Address == "" → "synthetic:" +// (a fully-malformed row gets a unique key so two such rows +// surface as two separate Reachable=false verdicts instead +// of collapsing into one and hiding the second +// misconfiguration) +func capabilityFanoutDedupKey(member RouteMember, ordinal int) string { if member.FullNodeID != 0 { return "id:" + uint64ToDecimal(member.FullNodeID) } - return "addr:" + member.Address + if member.Address != "" { + return "addr:" + member.Address + } + return "synthetic:" + strconv.Itoa(ordinal) } // uint64ToDecimal avoids pulling fmt for one-call hot-path @@ -217,6 +249,10 @@ func uint64ToDecimal(v uint64) string { func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) CapabilityVerdict { verdict := CapabilityVerdict{FullNodeID: member.FullNodeID} + if member.Address == "" { + verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMalformedMember, "full_node_id=%d has empty address", member.FullNodeID) + return verdict + } client, closer, err := dial(ctx, member.Address) if err != nil { verdict.Err = pkgerrors.Wrapf(err, "dial %s", member.Address) diff --git a/internal/admin/capability_fanout_test.go b/internal/admin/capability_fanout_test.go index 6c0c9e22..8862d8d4 100644 --- a/internal/admin/capability_fanout_test.go +++ b/internal/admin/capability_fanout_test.go @@ -271,6 +271,88 @@ func TestCapabilityFanout_NonPositiveTimeoutRejected(t *testing.T) { } } +// TestCapabilityFanout_EmptyAddressFailsClosed pins the fail-closed +// posture for a malformed route snapshot where a member has no +// Address. The pre-review code silently dropped such rows from the +// dedup set, which would let CapabilityFanout return OK=true even +// though that member was never probed — the cutover would then +// proceed against an un-verified peer. Codex r1 P1 / gemini high / +// coderabbit major all flagged the same defect (PR #793). +// +// Contract pinned here: +// - The empty-address row is NOT dropped; it appears in +// Result.Verdicts with Reachable=false. +// - Result.OK is false because not every verdict is Reachable. +// - The malformed row's Err field is set so the operator can +// name the misconfiguration. +func TestCapabilityFanout_EmptyAddressFailsClosed(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: ""}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when a member has empty address, got OK=true (verdicts=%+v)", res.Verdicts) + } + if len(res.Verdicts) != 2 { + t.Fatalf("expected 2 verdicts (incl. empty-address malformed row), got %d: %+v", len(res.Verdicts), res.Verdicts) + } + var malformed *CapabilityVerdict + for i := range res.Verdicts { + if res.Verdicts[i].FullNodeID == 2 { + malformed = &res.Verdicts[i] + } + } + if malformed == nil { + t.Fatal("expected verdict for empty-address member to be present so operator can name it") + } + if malformed.Reachable { + t.Errorf("expected Reachable=false for empty-address verdict, got %+v", malformed) + } + if malformed.Err == nil { + t.Errorf("expected Err to be set on malformed verdict so operator can diagnose, got nil") + } +} + +// TestCapabilityFanout_FullyMalformedRowsNotCollapsed pins the +// sibling fail-closed case: two members with BOTH FullNodeID=0 AND +// Address="" must NOT collapse into one verdict (which would silently +// hide one of the two misconfigured rows). Each gets its own +// Reachable=false verdict via a synthetic dedup key. +func TestCapabilityFanout_FullyMalformedRowsNotCollapsed(t *testing.T) { + t.Parallel() + stub := newStubDial() + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 0, Address: ""}, + {FullNodeID: 0, Address: ""}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false for fully-malformed snapshot, got OK=true") + } + if len(res.Verdicts) != 2 { + t.Errorf("expected 2 separate verdicts for two fully-malformed rows, got %d (collapsing would hide one)", len(res.Verdicts)) + } +} + // TestCapabilityFanout_EmptySnapshot pins the empty-input case: // returns OK=false with zero verdicts but no error (the cutover RPC // can decide its own semantics for "no members" — typically refuse From f2bbdaa37639161c8f8a6976a42f2e6f4ed45b69 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 15:28:47 +0900 Subject: [PATCH 3/4] fix(encryption): PR793 round-2 - codex r2 P1 (nil-client guard against buggy DialFunc) Codex r2 P1: if a DialFunc returns (nil, nil, nil) - no error but also no client - probeCapability would panic on client.GetCapability(...) and crash the admin RPC path. The fix is a defensive nil-client check that converts the contract violation into a Reachable=false verdict with a new errCapabilityFanoutBadDialer sentinel. Regression test per CLAUDE.md test-then-fix convention: - TestCapabilityFanout_NilClientFromDialFailsClosed - injects a buggy dialer that returns (nil, nil, nil); confirmed panicking on the pre-fix code (goroutine traceback through probeCapability:264); passes on the fix. Caller audit: probeCapability is file-private, sole caller is runCapabilityFanoutProbes also in this file. No external propagation. Semantic change is "nil client from a buggy dialer now produces a Reachable=false verdict instead of a panic" - strictly safer than the pre-fix crash. Lint clean. All 10 TestCapabilityFanout_* tests pass. --- internal/admin/capability_fanout.go | 11 +++++++ internal/admin/capability_fanout_test.go | 39 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/internal/admin/capability_fanout.go b/internal/admin/capability_fanout.go index 736ade89..7670d7dc 100644 --- a/internal/admin/capability_fanout.go +++ b/internal/admin/capability_fanout.go @@ -26,6 +26,13 @@ var errCapabilityFanoutBadInput = errors.New("capability fan-out: bad input") // letting OK=true sneak through against an unprobed peer. var errCapabilityFanoutMalformedMember = errors.New("capability fan-out: malformed route member") +// errCapabilityFanoutBadDialer is the sentinel for a DialFunc that +// violates its contract — returns no error but also no client +// (nil, nil, nil). Without this guard the goroutine would panic on +// client.GetCapability(...) and take down the admin RPC path +// instead of producing a Reachable=false verdict. +var errCapabilityFanoutBadDialer = errors.New("capability fan-out: dialer returned nil client without error") + // RouteMember is one peer entry in a Raft group. The fan-out helper // dials Address and identifies the node by FullNodeID for dedup // across groups (a node serving multiple groups is probed once). @@ -261,6 +268,10 @@ func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) Cap if closer != nil { defer closer() } + if client == nil { + verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutBadDialer, "dial %s", member.Address) + return verdict + } report, err := client.GetCapability(ctx, &pb.Empty{}) if err != nil { verdict.Err = pkgerrors.Wrapf(err, "GetCapability %s", member.Address) diff --git a/internal/admin/capability_fanout_test.go b/internal/admin/capability_fanout_test.go index 8862d8d4..eefd9291 100644 --- a/internal/admin/capability_fanout_test.go +++ b/internal/admin/capability_fanout_test.go @@ -353,6 +353,45 @@ func TestCapabilityFanout_FullyMalformedRowsNotCollapsed(t *testing.T) { } } +// TestCapabilityFanout_NilClientFromDialFailsClosed pins the +// defensive-programming refusal for a buggy DialFunc that returns +// (nil, nil, nil) — no error but also no client. Codex r2 P1 on +// PR #793 flagged that without this guard the helper would panic on +// client.GetCapability(...) and take down the admin RPC path +// instead of producing a Reachable=false verdict. +// +// Contract pinned here: a nil client is treated as a dial-level +// failure (Reachable=false with Err set) so a misbehaving dialer +// implementation cannot crash the cutover RPC. +func TestCapabilityFanout_NilClientFromDialFailsClosed(t *testing.T) { + t.Parallel() + badDial := func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return nil, nil, nil + } + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{{FullNodeID: 1, Address: "n1:9000"}}, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, badDial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when dial returns nil client without error, got OK=true") + } + if len(res.Verdicts) != 1 { + t.Fatalf("expected 1 verdict, got %d", len(res.Verdicts)) + } + v := res.Verdicts[0] + if v.Reachable { + t.Errorf("expected Reachable=false for nil-client verdict, got %+v", v) + } + if v.Err == nil { + t.Errorf("expected Err to be set on nil-client verdict, got nil") + } +} + // TestCapabilityFanout_EmptySnapshot pins the empty-input case: // returns OK=false with zero verdicts but no error (the cutover RPC // can decide its own semantics for "no members" — typically refuse From 438482f36741a9c809aff752d448b22146a4d924 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 15:40:58 +0900 Subject: [PATCH 4/4] fix(encryption): PR793 round-3 - codex r3 P1+P2 (mismatched-responder guard + timeout-bounded helper return) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two codex r3 findings, both addressed: ## codex r3 P1 - reject capability replies from unexpected node IDs Pre-fix: probeCapability accepted any successful GetCapability response as evidence that the expected route member was reachable, even when the responder reported a different full_node_id (stale routing, two members accidentally sharing an address, etc.). Result: one healthy node could satisfy probes for another, and Result.OK could become true even though the intended member was never verified. Fix: when both the snapshot's expected FullNodeID and the response's full_node_id are non-zero AND they disagree, the verdict is Reachable=false with a new errCapabilityFanoutMismatchedResponder sentinel. The cutover RPC refuses with OK=false rather than silently passing. Regression test: TestCapabilityFanout_MismatchedResponderIDFailsClosed - snapshot says n2:9000 has FullNodeID=2, responder reports FullNodeID=99, helper now produces Reachable=false instead of the pre-fix Reachable=true. ## codex r3 P2 - honor timeout even when a probe ignores ctx Pre-fix: runCapabilityFanoutProbes did wg.Wait() unconditionally. A DialFunc/client that ignored ctx cancellation would hang the helper indefinitely, violating the §4.3 contract "Returns within `timeout`". Fix: pre-seed every dedup-key slot with a Reachable=false verdict carrying errCapabilityFanoutProbeTimeout. Each goroutine overwrites its slot when its probe finishes. After launching goroutines, the orchestrator waits on min(done, ctx.Done()) and then returns whatever slot values are present. Probes that didn't finish by the deadline surface as Reachable=false with the pre-seeded timeout error. Note on goroutine cleanup: goroutines whose probes ignore ctx continue running until they unblock on their own. The helper documents that explicitly: contract is "the HELPER returns within timeout", not "every spawned goroutine returns". A dialer/client that ignores ctx is a bug on the caller side; the helper's job is to prevent that bug from hanging the admin RPC path. Regression test: TestCapabilityFanout_TimeoutBoundsHelperReturn - injects a hanging client that ignores ctx; the pre-fix code timed out the entire 30s test process (goroutine traceback through probeCapability); the fix returns within 500ms (5x the 100ms budget; generous for CI). ## caller audit (semantic change) probeCapability and runCapabilityFanoutProbes are both file-private. The new sentinels (errCapabilityFanoutMismatchedResponder, errCapabilityFanoutProbeTimeout) are also file-private. Sole caller of probeCapability is runCapabilityFanoutProbes; sole caller of runCapabilityFanoutProbes is CapabilityFanout. The public-facing semantic change is: 1. Stale-routing scenarios that previously slipped through as OK=true now produce OK=false. Strictly safer. 2. Hung probes that previously hung the helper now produce Reachable=false verdicts. Strictly safer. capabilityFanoutAllOK already requires Reachable=true so no caller-side change is needed. ## test/lint evidence - go test -race -run TestCapabilityFanout -timeout 60s ./internal/admin/... - all 12 tests pass (10 pre-existing + 2 new regressions) under -race - golangci-lint run internal/admin/... - 0 issues --- internal/admin/capability_fanout.go | 80 ++++++++++++++++-- internal/admin/capability_fanout_test.go | 100 +++++++++++++++++++++++ 2 files changed, 172 insertions(+), 8 deletions(-) diff --git a/internal/admin/capability_fanout.go b/internal/admin/capability_fanout.go index 7670d7dc..921b7c5c 100644 --- a/internal/admin/capability_fanout.go +++ b/internal/admin/capability_fanout.go @@ -33,6 +33,23 @@ var errCapabilityFanoutMalformedMember = errors.New("capability fan-out: malform // instead of producing a Reachable=false verdict. var errCapabilityFanoutBadDialer = errors.New("capability fan-out: dialer returned nil client without error") +// errCapabilityFanoutMismatchedResponder is the sentinel for the +// stale-routing / shared-address case: the snapshot expected one +// full_node_id at an address but the responder reports a different +// one. Accepting the response would credit the expected member as +// verified even though the intended member never actually answered. +// Fail-closed: verdict carries this error with Reachable=false. +var errCapabilityFanoutMismatchedResponder = errors.New("capability fan-out: responder full_node_id does not match expected") + +// errCapabilityFanoutProbeTimeout is the sentinel pre-seeded on +// every verdict slot before the probe goroutines start. A buggy +// DialFunc/client that ignores ctx cancellation would otherwise +// hang the helper indefinitely; by pre-seeding the slot and +// returning whichever slots have been overwritten by the deadline, +// the helper honors its §4.3 "Returns within timeout" contract +// even against an uncooperative dialer. +var errCapabilityFanoutProbeTimeout = errors.New("capability fan-out: probe did not complete within timeout") + // RouteMember is one peer entry in a Raft group. The fan-out helper // dials Address and identifies the node by FullNodeID for dedup // across groups (a node serving multiple groups is probed once). @@ -170,23 +187,60 @@ func buildCapabilityFanoutDedupSet(routes RouteSnapshot) map[string]RouteMember // runCapabilityFanoutProbes dials every dedup target concurrently // and returns the per-node verdicts in unspecified order. Bounded -// by ctx — missing responses surface as Reachable=false. +// by ctx — when the deadline fires before a probe goroutine +// finishes, its pre-seeded "probe-timeout" verdict surfaces as +// Reachable=false instead of the helper waiting indefinitely for +// a buggy DialFunc/client that ignores ctx cancellation. This +// pins the §4.3 "Returns within `timeout`" contract. +// +// Note on goroutine cleanup: goroutines whose probes ignore ctx +// continue running until they unblock on their own. The helper +// does NOT promise to reclaim those goroutines — the contract is +// "the HELPER returns within timeout", not "every spawned +// goroutine returns within timeout". A dialer / client that +// ignores ctx is a bug on the caller side; the helper's job is to +// prevent that bug from hanging the admin RPC path. func runCapabilityFanoutProbes(ctx context.Context, dedupKeys map[string]RouteMember, dial DialFunc) []CapabilityVerdict { - results := make([]CapabilityVerdict, 0, len(dedupKeys)) + slots := make(map[string]*CapabilityVerdict, len(dedupKeys)) + for key, member := range dedupKeys { + v := CapabilityVerdict{ + FullNodeID: member.FullNodeID, + Err: pkgerrors.Wrap(errCapabilityFanoutProbeTimeout, member.Address), + } + slots[key] = &v + } + var mu sync.Mutex var wg sync.WaitGroup - for _, m := range dedupKeys { + for key, m := range dedupKeys { wg.Add(1) - go func(member RouteMember) { + go func(slotKey string, member RouteMember) { defer wg.Done() verdict := probeCapability(ctx, member, dial) mu.Lock() - results = append(results, verdict) + *slots[slotKey] = verdict mu.Unlock() - }(m) + }(key, m) } - wg.Wait() - return results + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-ctx.Done(): + } + + mu.Lock() + defer mu.Unlock() + out := make([]CapabilityVerdict, 0, len(slots)) + for _, v := range slots { + out = append(out, *v) + } + return out } func capabilityFanoutAllOK(verdicts []CapabilityVerdict) bool { @@ -277,6 +331,16 @@ func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) Cap verdict.Err = pkgerrors.Wrapf(err, "GetCapability %s", member.Address) return verdict } + // Mismatched-responder guard: in a stale-routing or + // shared-address scenario, the responder might report a + // different full_node_id than the one the snapshot expected. + // Accepting the response would credit the expected member as + // verified despite no member-X ever answering. Fail closed. + if member.FullNodeID != 0 && report.GetFullNodeId() != 0 && report.GetFullNodeId() != member.FullNodeID { + verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMismatchedResponder, + "%s: expected full_node_id=%d got %d", member.Address, member.FullNodeID, report.GetFullNodeId()) + return verdict + } verdict.Reachable = true verdict.EncryptionCapable = report.GetEncryptionCapable() verdict.BuildSHA = report.GetBuildSha() diff --git a/internal/admin/capability_fanout_test.go b/internal/admin/capability_fanout_test.go index eefd9291..a5204e9a 100644 --- a/internal/admin/capability_fanout_test.go +++ b/internal/admin/capability_fanout_test.go @@ -392,6 +392,106 @@ func TestCapabilityFanout_NilClientFromDialFailsClosed(t *testing.T) { } } +// TestCapabilityFanout_MismatchedResponderIDFailsClosed pins the +// fail-closed posture for stale-routing / shared-address scenarios: +// when the snapshot says n2:9000 has FullNodeID=2 but the responder +// reports FullNodeID=99, accepting the response would credit +// member 2 as verified despite member 2 never actually answering. +// Codex r3 P1 on PR #793. +// +// Contract pinned here: when both the snapshot and the response +// carry non-zero FullNodeIDs and they disagree, the verdict is +// Reachable=false with an explanatory Err so the cutover RPC +// refuses with OK=false rather than silently passing. +func TestCapabilityFanout_MismatchedResponderIDFailsClosed(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 99, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{{FullNodeID: 2, Address: "n2:9000"}}, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when responder full_node_id (99) differs from expected (2), got OK=true") + } + if len(res.Verdicts) != 1 { + t.Fatalf("expected 1 verdict, got %d", len(res.Verdicts)) + } + v := res.Verdicts[0] + if v.Reachable { + t.Errorf("expected Reachable=false for mismatched-id verdict, got %+v", v) + } + if v.Err == nil { + t.Errorf("expected Err to be set on mismatched-id verdict, got nil") + } +} + +// hangingEncryptionAdminClient is a test-only client whose +// GetCapability blocks on a channel and deliberately ignores ctx +// cancellation, simulating a buggy gRPC client that fails to honor +// the helper's timeout contract. +type hangingEncryptionAdminClient struct { + pb.EncryptionAdminClient + hangCh chan struct{} +} + +func (h *hangingEncryptionAdminClient) GetCapability(_ context.Context, _ *pb.Empty, _ ...grpc.CallOption) (*pb.CapabilityReport, error) { + <-h.hangCh + return nil, errors.New("hangingEncryptionAdminClient: unblocked") +} + +// TestCapabilityFanout_TimeoutBoundsHelperReturn pins the §4.3 +// timeout contract: "Returns within `timeout` regardless of how +// many members responded". A DialFunc / client that ignores ctx +// cancellation must NOT be able to hang the admin RPC path +// indefinitely — the helper synthesizes Reachable=false verdicts +// for any probe that hasn't reported by the deadline. Codex r3 P2 +// on PR #793. +func TestCapabilityFanout_TimeoutBoundsHelperReturn(t *testing.T) { + t.Parallel() + hangCh := make(chan struct{}) + defer close(hangCh) + + hang := &hangingEncryptionAdminClient{hangCh: hangCh} + dial := func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return hang, func() {}, nil + } + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{{FullNodeID: 1, Address: "n1:9000"}}, + }}} + + start := time.Now() + res, err := CapabilityFanout(context.Background(), snapshot, dial, 100*time.Millisecond) + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // Allow a generous margin (5x the timeout) for slow CI; the + // pre-fix code would have hung indefinitely so any bounded + // return time is a pass. + if elapsed > 500*time.Millisecond { + t.Fatalf("helper hung for %v despite 100ms timeout — contract violated", elapsed) + } + if res.OK { + t.Fatalf("expected OK=false on timeout, got OK=true") + } + if len(res.Verdicts) != 1 { + t.Fatalf("expected 1 verdict (synthesized for hung probe), got %d", len(res.Verdicts)) + } + if res.Verdicts[0].Reachable { + t.Errorf("expected Reachable=false for hung probe, got %+v", res.Verdicts[0]) + } +} + // TestCapabilityFanout_EmptySnapshot pins the empty-input case: // returns OK=false with zero verdicts but no error (the cutover RPC // can decide its own semantics for "no members" — typically refuse