diff --git a/docs/design/2026_05_18_proposed_6d_enable_storage_envelope.md b/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md similarity index 97% rename from docs/design/2026_05_18_proposed_6d_enable_storage_envelope.md rename to docs/design/2026_05_18_partial_6d_enable_storage_envelope.md index 9fa99b08..10672c9e 100644 --- a/docs/design/2026_05_18_proposed_6d_enable_storage_envelope.md +++ b/docs/design/2026_05_18_partial_6d_enable_storage_envelope.md @@ -2,12 +2,36 @@ | Field | Value | |---|---| -| Status | proposed | +| Status | partial — 6D-1 (doc), 6D-2 (startup guards), 6D-3 (capability fan-out helper) shipped; 6D-4, 6D-5, 6D-6 remain | | Date | 2026-05-18 | | Parent design | [`2026_04_29_partial_data_at_rest_encryption.md`](2026_04_29_partial_data_at_rest_encryption.md) | | Blockers (now satisfied) | 6B (KEK plumbing), 6C-1 / 6C-2 (startup guards), 6C-2d (`ErrSidecarBehindRaftLog` wiring) | | Bundles | 6C-3 (`ErrNodeIDCollision` + `ErrLocalEpochRollback` cluster-wide guards) | +## Shipped milestones (lifecycle: partial) + +- **6D-1** (doc) — this file. Landed with the initial proposal commit. +- **6D-2** (startup guards) — `ErrNodeIDCollision` primitive + (`internal/encryption/node_id_collision.go`), `ErrLocalEpochRollback` + primitive (`internal/encryption/local_epoch_rollback.go`), and the + `probe-node-id` admin CLI subcommand. Landed in PR #788. +- **6D-3** (capability fan-out helper) — `CapabilityFanout` in + `internal/admin/capability_fanout.go` with the table-driven §9 + unit tests. The §4.1 `FanoutResult` type was renamed + `CapabilityFanoutResult` to disambiguate from the unrelated + `admin.FanoutResult` already shipped in `keyviz_fanout.go`; the + contract is otherwise unchanged. + +## Open milestones + +- **6D-4** — `RotateSubEnableStorageEnvelope = 0x04` wire-format + addition + FSM apply-path dispatch + `StorageEnvelopeCutoverIndex` + sidecar field. +- **6D-5** — §6.2 storage-layer toggle (`PutAt` consults + `StorageEnvelopeActive`). +- **6D-6** — `EnableStorageEnvelope` admin RPC + CLI command + + integration test composing 6D-3 + 6D-4 + 6D-5. + ## 0. Why this doc exists Stage 6D is the first cutover that actually flips a cluster-wide diff --git a/internal/admin/capability_fanout.go b/internal/admin/capability_fanout.go new file mode 100644 index 00000000..921b7c5c --- /dev/null +++ b/internal/admin/capability_fanout.go @@ -0,0 +1,352 @@ +package admin + +import ( + "context" + "errors" + "strconv" + "sync" + "time" + + pb "github.com/bootjp/elastickv/proto" + pkgerrors "github.com/cockroachdb/errors" +) + +// errCapabilityFanoutBadInput is the sentinel wrapped by every +// input-validation refusal in this file so callers can errors.Is() +// against it without parsing strings. The concrete failure adds a +// %w-wrapped detail. +var errCapabilityFanoutBadInput = errors.New("capability fan-out: bad input") + +// errCapabilityFanoutMalformedMember is the sentinel set on the +// Err field of verdicts whose RouteMember rows arrived malformed +// (missing Address) from the caller's route-snapshot construction. +// The verdict still appears in Result.Verdicts so the cutover RPC +// fails closed (OK=false) and the operator can name the +// misconfigured member, rather than silently dropping the row and +// letting OK=true sneak through against an unprobed peer. +var errCapabilityFanoutMalformedMember = errors.New("capability fan-out: malformed route member") + +// errCapabilityFanoutBadDialer is the sentinel for a DialFunc that +// violates its contract — returns no error but also no client +// (nil, nil, nil). Without this guard the goroutine would panic on +// client.GetCapability(...) and take down the admin RPC path +// instead of producing a Reachable=false verdict. +var errCapabilityFanoutBadDialer = errors.New("capability fan-out: dialer returned nil client without error") + +// errCapabilityFanoutMismatchedResponder is the sentinel for the +// stale-routing / shared-address case: the snapshot expected one +// full_node_id at an address but the responder reports a different +// one. Accepting the response would credit the expected member as +// verified even though the intended member never actually answered. +// Fail-closed: verdict carries this error with Reachable=false. +var errCapabilityFanoutMismatchedResponder = errors.New("capability fan-out: responder full_node_id does not match expected") + +// errCapabilityFanoutProbeTimeout is the sentinel pre-seeded on +// every verdict slot before the probe goroutines start. A buggy +// DialFunc/client that ignores ctx cancellation would otherwise +// hang the helper indefinitely; by pre-seeding the slot and +// returning whichever slots have been overwritten by the deadline, +// the helper honors its §4.3 "Returns within timeout" contract +// even against an uncooperative dialer. +var errCapabilityFanoutProbeTimeout = errors.New("capability fan-out: probe did not complete within timeout") + +// RouteMember is one peer entry in a Raft group. The fan-out helper +// dials Address and identifies the node by FullNodeID for dedup +// across groups (a node serving multiple groups is probed once). +type RouteMember struct { + FullNodeID uint64 + Address string +} + +// RouteGroup is one Raft group's membership. Voters and Learners are +// kept separate at the input level so the cutover RPC handler can +// log them distinctly, but CapabilityFanout treats them identically +// per the §4.1 contract "every (voter ∪ learner) of every Raft +// group". The 6D design pins that learner unreachability is a hard +// no the same way voter unreachability is — see §8 / row "One +// learner unreachable during fan-out". +type RouteGroup struct { + GroupID uint64 + Voters []RouteMember + Learners []RouteMember +} + +// RouteSnapshot is the input the cutover RPC handler builds from +// the Raft engine's membership view and passes to CapabilityFanout. +// Independent of the route-catalog `distribution.CatalogSnapshot`, +// which only carries shard→group mappings, not per-group membership. +type RouteSnapshot struct { + Groups []RouteGroup +} + +// CapabilityVerdict is one node's per-call outcome. Reachable=false +// means the dial or the RPC timed out / failed at transport level — +// not a "no" answer from the peer. The cutover RPC handler treats +// both Reachable=false and EncryptionCapable=false as hard refusals +// per the §8 failure-modes table, but the verdict separates them so +// the operator-facing error message can name the precise reason. +type CapabilityVerdict struct { + FullNodeID uint64 + EncryptionCapable bool + BuildSHA string + SidecarPresent bool + Reachable bool + Err error +} + +// CapabilityFanoutResult is the aggregated outcome. OK is true iff +// every verdict has Reachable && EncryptionCapable — there is no +// partial-success mode per §4.3. +// +// Named CapabilityFanoutResult rather than the design-doc's +// `FanoutResult` to avoid a collision with the unrelated +// `admin.FanoutResult` in `keyviz_fanout.go` (KeyViz cluster fan-out +// shipped earlier in the same package). +type CapabilityFanoutResult struct { + Verdicts []CapabilityVerdict + OK bool +} + +// DialFunc opens a connection to one node's admin endpoint and +// returns an EncryptionAdmin client plus a cleanup closure. The +// helper invokes the closure exactly once per successful dial, +// regardless of how the RPC subsequently resolved. +// +// The 6D design says "DialFunc reuses the existing admin connection +// pool" — the concrete implementation will reach for whatever +// connection-pool helper the caller already holds (e.g. the +// `internal/admin.ForwardClient` connection pool for TLS-aware +// dials). +type DialFunc func(ctx context.Context, address string) (pb.EncryptionAdminClient, func(), error) + +// CapabilityFanout fans GetCapability out to every unique +// (voter ∪ learner) of every group in routes. Concurrent; bounded +// by timeout regardless of how many members respond. Missing +// responses surface as Reachable=false verdicts (no partial-success +// mode — see §4.3). +// +// Dedup key: FullNodeID. A node serving multiple groups is probed +// exactly once. Members with FullNodeID=0 are treated as distinct +// dedup keys per unique address; this case appears in stub catalogs +// where the dedup-by-id contract has not been satisfied — the +// helper still completes by falling back to address-based identity +// rather than silently collapsing every zero-id row into one probe. +// +// Returns (result, nil) on every input. The error slot is reserved +// for input validation failures (zero-member snapshot, etc.) so +// callers can keep their existing `err != nil → refuse` shape. +func CapabilityFanout( + ctx context.Context, + routes RouteSnapshot, + dial DialFunc, + timeout time.Duration, +) (CapabilityFanoutResult, error) { + if dial == nil { + return CapabilityFanoutResult{}, pkgerrors.Wrap(errCapabilityFanoutBadInput, "dial func is nil") + } + if timeout <= 0 { + return CapabilityFanoutResult{}, pkgerrors.Wrapf(errCapabilityFanoutBadInput, "timeout must be positive, got %v", timeout) + } + + dedupKeys := buildCapabilityFanoutDedupSet(routes) + if len(dedupKeys) == 0 { + return CapabilityFanoutResult{Verdicts: []CapabilityVerdict{}, OK: false}, nil + } + + fanCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + results := runCapabilityFanoutProbes(fanCtx, dedupKeys, dial) + return CapabilityFanoutResult{Verdicts: results, OK: capabilityFanoutAllOK(results)}, nil +} + +// buildCapabilityFanoutDedupSet folds every voter ∪ learner of +// every group into the dedup map keyed by FullNodeID (or address +// for unidentified rows; a synthetic key for fully-malformed rows +// with neither FullNodeID nor Address — see capabilityFanoutDedupKey). +// Pulled out of CapabilityFanout to keep the orchestration body +// under the cyclomatic-complexity budget. The size hint is the +// upper bound on cardinality (every row distinct); the map shrinks +// at no cost when dedup collapses entries. +func buildCapabilityFanoutDedupSet(routes RouteSnapshot) map[string]RouteMember { + upperBound := 0 + for _, group := range routes.Groups { + upperBound += len(group.Voters) + len(group.Learners) + } + dedupKeys := make(map[string]RouteMember, upperBound) + for _, group := range routes.Groups { + for _, m := range group.Voters { + addCapabilityFanoutDedupTarget(dedupKeys, m) + } + for _, m := range group.Learners { + addCapabilityFanoutDedupTarget(dedupKeys, m) + } + } + return dedupKeys +} + +// runCapabilityFanoutProbes dials every dedup target concurrently +// and returns the per-node verdicts in unspecified order. Bounded +// by ctx — when the deadline fires before a probe goroutine +// finishes, its pre-seeded "probe-timeout" verdict surfaces as +// Reachable=false instead of the helper waiting indefinitely for +// a buggy DialFunc/client that ignores ctx cancellation. This +// pins the §4.3 "Returns within `timeout`" contract. +// +// Note on goroutine cleanup: goroutines whose probes ignore ctx +// continue running until they unblock on their own. The helper +// does NOT promise to reclaim those goroutines — the contract is +// "the HELPER returns within timeout", not "every spawned +// goroutine returns within timeout". A dialer / client that +// ignores ctx is a bug on the caller side; the helper's job is to +// prevent that bug from hanging the admin RPC path. +func runCapabilityFanoutProbes(ctx context.Context, dedupKeys map[string]RouteMember, dial DialFunc) []CapabilityVerdict { + slots := make(map[string]*CapabilityVerdict, len(dedupKeys)) + for key, member := range dedupKeys { + v := CapabilityVerdict{ + FullNodeID: member.FullNodeID, + Err: pkgerrors.Wrap(errCapabilityFanoutProbeTimeout, member.Address), + } + slots[key] = &v + } + + var mu sync.Mutex + var wg sync.WaitGroup + for key, m := range dedupKeys { + wg.Add(1) + go func(slotKey string, member RouteMember) { + defer wg.Done() + verdict := probeCapability(ctx, member, dial) + mu.Lock() + *slots[slotKey] = verdict + mu.Unlock() + }(key, m) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-ctx.Done(): + } + + mu.Lock() + defer mu.Unlock() + out := make([]CapabilityVerdict, 0, len(slots)) + for _, v := range slots { + out = append(out, *v) + } + return out +} + +func capabilityFanoutAllOK(verdicts []CapabilityVerdict) bool { + if len(verdicts) == 0 { + return false + } + for _, v := range verdicts { + if !v.Reachable || !v.EncryptionCapable { + return false + } + } + return true +} + +// addCapabilityFanoutDedupTarget folds a member into the dedup map. +// Fail-closed posture: malformed rows (empty Address) are NOT +// dropped here — they would silently disappear from Result.Verdicts +// and let OK=true sneak through despite an unprobed peer. Instead +// every row is admitted; probeCapability classifies empty-Address +// members as Reachable=false with errCapabilityFanoutMalformedMember +// so the cutover RPC's caller-facing error can name the missing +// address. +func addCapabilityFanoutDedupTarget(m map[string]RouteMember, member RouteMember) { + key := capabilityFanoutDedupKey(member, len(m)) + if _, exists := m[key]; exists { + return + } + m[key] = member +} + +// capabilityFanoutDedupKey is the dedup key for a route member. +// +// - FullNodeID != 0 → "id:" (the §4.1 dedup contract) +// - FullNodeID == 0 ∧ Address != "" → "addr:
" +// (stub catalogs where dedup-by-id isn't satisfied still +// dedupe by address rather than silently collapsing rows) +// - FullNodeID == 0 ∧ Address == "" → "synthetic:" +// (a fully-malformed row gets a unique key so two such rows +// surface as two separate Reachable=false verdicts instead +// of collapsing into one and hiding the second +// misconfiguration) +func capabilityFanoutDedupKey(member RouteMember, ordinal int) string { + if member.FullNodeID != 0 { + return "id:" + uint64ToDecimal(member.FullNodeID) + } + if member.Address != "" { + return "addr:" + member.Address + } + return "synthetic:" + strconv.Itoa(ordinal) +} + +// uint64ToDecimal avoids pulling fmt for one-call hot-path +// stringification used only to build dedup keys. +func uint64ToDecimal(v uint64) string { + if v == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for v > 0 { + i-- + buf[i] = byte('0' + v%10) + v /= 10 + } + return string(buf[i:]) +} + +func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) CapabilityVerdict { + verdict := CapabilityVerdict{FullNodeID: member.FullNodeID} + if member.Address == "" { + verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMalformedMember, "full_node_id=%d has empty address", member.FullNodeID) + return verdict + } + client, closer, err := dial(ctx, member.Address) + if err != nil { + verdict.Err = pkgerrors.Wrapf(err, "dial %s", member.Address) + return verdict + } + if closer != nil { + defer closer() + } + if client == nil { + verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutBadDialer, "dial %s", member.Address) + return verdict + } + report, err := client.GetCapability(ctx, &pb.Empty{}) + if err != nil { + verdict.Err = pkgerrors.Wrapf(err, "GetCapability %s", member.Address) + return verdict + } + // Mismatched-responder guard: in a stale-routing or + // shared-address scenario, the responder might report a + // different full_node_id than the one the snapshot expected. + // Accepting the response would credit the expected member as + // verified despite no member-X ever answering. Fail closed. + if member.FullNodeID != 0 && report.GetFullNodeId() != 0 && report.GetFullNodeId() != member.FullNodeID { + verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMismatchedResponder, + "%s: expected full_node_id=%d got %d", member.Address, member.FullNodeID, report.GetFullNodeId()) + return verdict + } + verdict.Reachable = true + verdict.EncryptionCapable = report.GetEncryptionCapable() + verdict.BuildSHA = report.GetBuildSha() + verdict.SidecarPresent = report.GetSidecarPresent() + if report.GetFullNodeId() != 0 { + verdict.FullNodeID = report.GetFullNodeId() + } + return verdict +} diff --git a/internal/admin/capability_fanout_test.go b/internal/admin/capability_fanout_test.go new file mode 100644 index 00000000..a5204e9a --- /dev/null +++ b/internal/admin/capability_fanout_test.go @@ -0,0 +1,513 @@ +package admin + +import ( + "context" + "errors" + "sort" + "sync/atomic" + "testing" + "time" + + pb "github.com/bootjp/elastickv/proto" + "google.golang.org/grpc" +) + +// stubEncryptionAdminClient is a minimal in-test client that +// implements only the methods CapabilityFanout actually invokes +// (GetCapability). Every other method panics so that an accidental +// future use is loud. +type stubEncryptionAdminClient struct { + pb.EncryptionAdminClient + report *pb.CapabilityReport + err error +} + +func (s *stubEncryptionAdminClient) GetCapability(ctx context.Context, _ *pb.Empty, _ ...grpc.CallOption) (*pb.CapabilityReport, error) { + if s.err != nil { + return nil, s.err + } + return s.report, nil +} + +// stubDial maps addresses to canned (client, dialErr) pairs and +// records how many times each address was dialled so the +// dedup-by-full_node_id contract is asserted exactly. +type stubDial struct { + clients map[string]*stubEncryptionAdminClient + dialErrs map[string]error + dialed atomic.Int64 + calls map[string]*atomic.Int64 +} + +func newStubDial() *stubDial { + return &stubDial{ + clients: map[string]*stubEncryptionAdminClient{}, + dialErrs: map[string]error{}, + calls: map[string]*atomic.Int64{}, + } +} + +func (s *stubDial) addOK(addr string, report *pb.CapabilityReport) { + s.clients[addr] = &stubEncryptionAdminClient{report: report} + var c atomic.Int64 + s.calls[addr] = &c +} + +func (s *stubDial) addDialErr(addr string, err error) { + s.dialErrs[addr] = err + var c atomic.Int64 + s.calls[addr] = &c +} + +func (s *stubDial) dial(_ context.Context, addr string) (pb.EncryptionAdminClient, func(), error) { + s.dialed.Add(1) + if c := s.calls[addr]; c != nil { + c.Add(1) + } + if err, ok := s.dialErrs[addr]; ok { + return nil, nil, err + } + client, ok := s.clients[addr] + if !ok { + return nil, nil, errors.New("stubDial: no client registered for " + addr) + } + return client, func() {}, nil +} + +// TestCapabilityFanout_AllCapable pins the §9 happy-path: every +// node returns encryption_capable=true → Result.OK=true, every +// verdict reachable. +func TestCapabilityFanout_AllCapable(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true, BuildSha: "abc"}) + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 2, EncryptionCapable: true, SidecarPresent: true, BuildSha: "abc"}) + stub.addOK("n3:9000", &pb.CapabilityReport{FullNodeId: 3, EncryptionCapable: true, SidecarPresent: true, BuildSha: "abc"}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + }, + Learners: []RouteMember{{FullNodeID: 3, Address: "n3:9000"}}, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !res.OK { + t.Fatalf("expected OK=true with all-capable verdicts, got %+v", res) + } + if len(res.Verdicts) != 3 { + t.Fatalf("expected 3 verdicts, got %d: %+v", len(res.Verdicts), res.Verdicts) + } + for _, v := range res.Verdicts { + if !v.Reachable || !v.EncryptionCapable { + t.Errorf("verdict %+v should be Reachable && EncryptionCapable", v) + } + } +} + +// TestCapabilityFanout_OneNotCapable pins the §9 case where every +// member dials successfully but one reports +// encryption_capable=false → Result.OK=false. The verdict list +// still includes every node so the operator-facing error can name +// the refuser. +func TestCapabilityFanout_OneNotCapable(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 2, EncryptionCapable: false, SidecarPresent: false}) + stub.addOK("n3:9000", &pb.CapabilityReport{FullNodeId: 3, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + {FullNodeID: 3, Address: "n3:9000"}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when one verdict has EncryptionCapable=false, got OK=true; verdicts=%+v", res.Verdicts) + } + var refuser *CapabilityVerdict + for i := range res.Verdicts { + if res.Verdicts[i].FullNodeID == 2 { + refuser = &res.Verdicts[i] + } + } + if refuser == nil { + t.Fatal("expected verdict for node 2 to be present so operator can name the refuser") + } + if refuser.Reachable != true || refuser.EncryptionCapable != false { + t.Errorf("refuser verdict shape wrong: %+v (want Reachable=true, EncryptionCapable=false)", refuser) + } +} + +// TestCapabilityFanout_OneUnreachable pins the §9 case where one +// member's dial fails → Reachable=false on that verdict and +// Result.OK=false. The verdict list still includes the unreachable +// node so the operator-facing error can name it. +func TestCapabilityFanout_OneUnreachable(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + stub.addDialErr("n2:9000", errors.New("simulated dial timeout")) + stub.addOK("n3:9000", &pb.CapabilityReport{FullNodeId: 3, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + {FullNodeID: 3, Address: "n3:9000"}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when one node is unreachable, got OK=true; verdicts=%+v", res.Verdicts) + } + var unreachable *CapabilityVerdict + for i := range res.Verdicts { + if res.Verdicts[i].FullNodeID == 2 { + unreachable = &res.Verdicts[i] + } + } + if unreachable == nil { + t.Fatal("expected verdict for node 2 to be present") + } + if unreachable.Reachable { + t.Errorf("expected Reachable=false for dial-failed node, got %+v", unreachable) + } + if unreachable.Err == nil { + t.Errorf("expected Err to be set for dial-failed verdict, got nil") + } +} + +// TestCapabilityFanout_DeduplicatesByFullNodeID pins the §4.1 +// contract: a node serving multiple groups is probed exactly once. +// The same FullNodeID appears under group 1 (as voter) and group 2 +// (as voter) and group 3 (as learner) — expect exactly one dial +// per unique FullNodeID, exactly one verdict per unique FullNodeID. +func TestCapabilityFanout_DeduplicatesByFullNodeID(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 2, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{ + {GroupID: 1, Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + }}, + {GroupID: 2, Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: "n2:9000"}, + }}, + {GroupID: 3, Learners: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + }}, + }} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !res.OK { + t.Fatalf("expected OK=true, got %+v", res) + } + if len(res.Verdicts) != 2 { + t.Fatalf("expected 2 deduplicated verdicts, got %d: %+v", len(res.Verdicts), res.Verdicts) + } + ids := []uint64{res.Verdicts[0].FullNodeID, res.Verdicts[1].FullNodeID} + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + if ids[0] != 1 || ids[1] != 2 { + t.Errorf("expected verdicts for {1, 2}, got %v", ids) + } + if got := stub.dialed.Load(); got != 2 { + t.Errorf("expected exactly 2 dials (one per unique FullNodeID), got %d", got) + } + for addr, c := range stub.calls { + if c.Load() != 1 { + t.Errorf("address %q dialed %d times, expected 1", addr, c.Load()) + } + } +} + +// TestCapabilityFanout_NilDialRejected pins the input-validation +// contract: a nil DialFunc is a programmer error and returns +// (zero-value, err) so callers fail loudly instead of silently +// completing with zero verdicts. +func TestCapabilityFanout_NilDialRejected(t *testing.T) { + t.Parallel() + _, err := CapabilityFanout(context.Background(), RouteSnapshot{}, nil, time.Second) + if err == nil { + t.Fatal("expected error for nil dial func") + } +} + +// TestCapabilityFanout_NonPositiveTimeoutRejected pins the input +// validation: a zero or negative timeout would behave like no +// timeout and is a misuse worth refusing. +func TestCapabilityFanout_NonPositiveTimeoutRejected(t *testing.T) { + t.Parallel() + _, err := CapabilityFanout(context.Background(), RouteSnapshot{}, func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return nil, nil, errors.New("never called") + }, 0) + if err == nil { + t.Fatal("expected error for zero timeout") + } +} + +// TestCapabilityFanout_EmptyAddressFailsClosed pins the fail-closed +// posture for a malformed route snapshot where a member has no +// Address. The pre-review code silently dropped such rows from the +// dedup set, which would let CapabilityFanout return OK=true even +// though that member was never probed — the cutover would then +// proceed against an un-verified peer. Codex r1 P1 / gemini high / +// coderabbit major all flagged the same defect (PR #793). +// +// Contract pinned here: +// - The empty-address row is NOT dropped; it appears in +// Result.Verdicts with Reachable=false. +// - Result.OK is false because not every verdict is Reachable. +// - The malformed row's Err field is set so the operator can +// name the misconfiguration. +func TestCapabilityFanout_EmptyAddressFailsClosed(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n1:9000", &pb.CapabilityReport{FullNodeId: 1, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 1, Address: "n1:9000"}, + {FullNodeID: 2, Address: ""}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when a member has empty address, got OK=true (verdicts=%+v)", res.Verdicts) + } + if len(res.Verdicts) != 2 { + t.Fatalf("expected 2 verdicts (incl. empty-address malformed row), got %d: %+v", len(res.Verdicts), res.Verdicts) + } + var malformed *CapabilityVerdict + for i := range res.Verdicts { + if res.Verdicts[i].FullNodeID == 2 { + malformed = &res.Verdicts[i] + } + } + if malformed == nil { + t.Fatal("expected verdict for empty-address member to be present so operator can name it") + } + if malformed.Reachable { + t.Errorf("expected Reachable=false for empty-address verdict, got %+v", malformed) + } + if malformed.Err == nil { + t.Errorf("expected Err to be set on malformed verdict so operator can diagnose, got nil") + } +} + +// TestCapabilityFanout_FullyMalformedRowsNotCollapsed pins the +// sibling fail-closed case: two members with BOTH FullNodeID=0 AND +// Address="" must NOT collapse into one verdict (which would silently +// hide one of the two misconfigured rows). Each gets its own +// Reachable=false verdict via a synthetic dedup key. +func TestCapabilityFanout_FullyMalformedRowsNotCollapsed(t *testing.T) { + t.Parallel() + stub := newStubDial() + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{ + {FullNodeID: 0, Address: ""}, + {FullNodeID: 0, Address: ""}, + }, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false for fully-malformed snapshot, got OK=true") + } + if len(res.Verdicts) != 2 { + t.Errorf("expected 2 separate verdicts for two fully-malformed rows, got %d (collapsing would hide one)", len(res.Verdicts)) + } +} + +// TestCapabilityFanout_NilClientFromDialFailsClosed pins the +// defensive-programming refusal for a buggy DialFunc that returns +// (nil, nil, nil) — no error but also no client. Codex r2 P1 on +// PR #793 flagged that without this guard the helper would panic on +// client.GetCapability(...) and take down the admin RPC path +// instead of producing a Reachable=false verdict. +// +// Contract pinned here: a nil client is treated as a dial-level +// failure (Reachable=false with Err set) so a misbehaving dialer +// implementation cannot crash the cutover RPC. +func TestCapabilityFanout_NilClientFromDialFailsClosed(t *testing.T) { + t.Parallel() + badDial := func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return nil, nil, nil + } + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{{FullNodeID: 1, Address: "n1:9000"}}, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, badDial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when dial returns nil client without error, got OK=true") + } + if len(res.Verdicts) != 1 { + t.Fatalf("expected 1 verdict, got %d", len(res.Verdicts)) + } + v := res.Verdicts[0] + if v.Reachable { + t.Errorf("expected Reachable=false for nil-client verdict, got %+v", v) + } + if v.Err == nil { + t.Errorf("expected Err to be set on nil-client verdict, got nil") + } +} + +// TestCapabilityFanout_MismatchedResponderIDFailsClosed pins the +// fail-closed posture for stale-routing / shared-address scenarios: +// when the snapshot says n2:9000 has FullNodeID=2 but the responder +// reports FullNodeID=99, accepting the response would credit +// member 2 as verified despite member 2 never actually answering. +// Codex r3 P1 on PR #793. +// +// Contract pinned here: when both the snapshot and the response +// carry non-zero FullNodeIDs and they disagree, the verdict is +// Reachable=false with an explanatory Err so the cutover RPC +// refuses with OK=false rather than silently passing. +func TestCapabilityFanout_MismatchedResponderIDFailsClosed(t *testing.T) { + t.Parallel() + stub := newStubDial() + stub.addOK("n2:9000", &pb.CapabilityReport{FullNodeId: 99, EncryptionCapable: true, SidecarPresent: true}) + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{{FullNodeID: 2, Address: "n2:9000"}}, + }}} + + res, err := CapabilityFanout(context.Background(), snapshot, stub.dial, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Fatalf("expected OK=false when responder full_node_id (99) differs from expected (2), got OK=true") + } + if len(res.Verdicts) != 1 { + t.Fatalf("expected 1 verdict, got %d", len(res.Verdicts)) + } + v := res.Verdicts[0] + if v.Reachable { + t.Errorf("expected Reachable=false for mismatched-id verdict, got %+v", v) + } + if v.Err == nil { + t.Errorf("expected Err to be set on mismatched-id verdict, got nil") + } +} + +// hangingEncryptionAdminClient is a test-only client whose +// GetCapability blocks on a channel and deliberately ignores ctx +// cancellation, simulating a buggy gRPC client that fails to honor +// the helper's timeout contract. +type hangingEncryptionAdminClient struct { + pb.EncryptionAdminClient + hangCh chan struct{} +} + +func (h *hangingEncryptionAdminClient) GetCapability(_ context.Context, _ *pb.Empty, _ ...grpc.CallOption) (*pb.CapabilityReport, error) { + <-h.hangCh + return nil, errors.New("hangingEncryptionAdminClient: unblocked") +} + +// TestCapabilityFanout_TimeoutBoundsHelperReturn pins the §4.3 +// timeout contract: "Returns within `timeout` regardless of how +// many members responded". A DialFunc / client that ignores ctx +// cancellation must NOT be able to hang the admin RPC path +// indefinitely — the helper synthesizes Reachable=false verdicts +// for any probe that hasn't reported by the deadline. Codex r3 P2 +// on PR #793. +func TestCapabilityFanout_TimeoutBoundsHelperReturn(t *testing.T) { + t.Parallel() + hangCh := make(chan struct{}) + defer close(hangCh) + + hang := &hangingEncryptionAdminClient{hangCh: hangCh} + dial := func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return hang, func() {}, nil + } + + snapshot := RouteSnapshot{Groups: []RouteGroup{{ + GroupID: 1, + Voters: []RouteMember{{FullNodeID: 1, Address: "n1:9000"}}, + }}} + + start := time.Now() + res, err := CapabilityFanout(context.Background(), snapshot, dial, 100*time.Millisecond) + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // Allow a generous margin (5x the timeout) for slow CI; the + // pre-fix code would have hung indefinitely so any bounded + // return time is a pass. + if elapsed > 500*time.Millisecond { + t.Fatalf("helper hung for %v despite 100ms timeout — contract violated", elapsed) + } + if res.OK { + t.Fatalf("expected OK=false on timeout, got OK=true") + } + if len(res.Verdicts) != 1 { + t.Fatalf("expected 1 verdict (synthesized for hung probe), got %d", len(res.Verdicts)) + } + if res.Verdicts[0].Reachable { + t.Errorf("expected Reachable=false for hung probe, got %+v", res.Verdicts[0]) + } +} + +// TestCapabilityFanout_EmptySnapshot pins the empty-input case: +// returns OK=false with zero verdicts but no error (the cutover RPC +// can decide its own semantics for "no members" — typically refuse +// because the catalog snapshot is bad). +func TestCapabilityFanout_EmptySnapshot(t *testing.T) { + t.Parallel() + res, err := CapabilityFanout(context.Background(), RouteSnapshot{}, func(context.Context, string) (pb.EncryptionAdminClient, func(), error) { + return nil, nil, errors.New("never called") + }, time.Second) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if res.OK { + t.Errorf("expected OK=false on empty snapshot, got OK=true") + } + if len(res.Verdicts) != 0 { + t.Errorf("expected zero verdicts, got %d", len(res.Verdicts)) + } +}