Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,36 @@

| Field | Value |
|---|---|
| Status | proposed |
| Status | partial — 6D-1 (doc), 6D-2 (startup guards), 6D-3 (capability fan-out helper) shipped; 6D-4, 6D-5, 6D-6 remain |
| Date | 2026-05-18 |
| Parent design | [`2026_04_29_partial_data_at_rest_encryption.md`](2026_04_29_partial_data_at_rest_encryption.md) |
| Blockers (now satisfied) | 6B (KEK plumbing), 6C-1 / 6C-2 (startup guards), 6C-2d (`ErrSidecarBehindRaftLog` wiring) |
| Bundles | 6C-3 (`ErrNodeIDCollision` + `ErrLocalEpochRollback` cluster-wide guards) |

## Shipped milestones (lifecycle: partial)

- **6D-1** (doc) — this file. Landed with the initial proposal commit.
- **6D-2** (startup guards) — `ErrNodeIDCollision` primitive
(`internal/encryption/node_id_collision.go`), `ErrLocalEpochRollback`
primitive (`internal/encryption/local_epoch_rollback.go`), and the
`probe-node-id` admin CLI subcommand. Landed in PR #788.
- **6D-3** (capability fan-out helper) — `CapabilityFanout` in
`internal/admin/capability_fanout.go` with the table-driven §9
unit tests. The §4.1 `FanoutResult` type was renamed
`CapabilityFanoutResult` to disambiguate from the unrelated
`admin.FanoutResult` already shipped in `keyviz_fanout.go`; the
contract is otherwise unchanged.

## Open milestones

- **6D-4** — `RotateSubEnableStorageEnvelope = 0x04` wire-format
addition + FSM apply-path dispatch + `StorageEnvelopeCutoverIndex`
sidecar field.
- **6D-5** — §6.2 storage-layer toggle (`PutAt` consults
`StorageEnvelopeActive`).
- **6D-6** — `EnableStorageEnvelope` admin RPC + CLI command +
integration test composing 6D-3 + 6D-4 + 6D-5.

## 0. Why this doc exists

Stage 6D is the first cutover that actually flips a cluster-wide
Expand Down
352 changes: 352 additions & 0 deletions internal/admin/capability_fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
package admin

import (
"context"
"errors"
"strconv"
"sync"
"time"

pb "github.com/bootjp/elastickv/proto"
pkgerrors "github.com/cockroachdb/errors"
)

// errCapabilityFanoutBadInput is the sentinel wrapped by every
// input-validation refusal in this file so callers can errors.Is()
// against it without parsing strings. The concrete failure adds a
// %w-wrapped detail.
var errCapabilityFanoutBadInput = errors.New("capability fan-out: bad input")

// errCapabilityFanoutMalformedMember is the sentinel set on the
// Err field of verdicts whose RouteMember rows arrived malformed
// (missing Address) from the caller's route-snapshot construction.
// The verdict still appears in Result.Verdicts so the cutover RPC
// fails closed (OK=false) and the operator can name the
// misconfigured member, rather than silently dropping the row and
// letting OK=true sneak through against an unprobed peer.
var errCapabilityFanoutMalformedMember = errors.New("capability fan-out: malformed route member")

// errCapabilityFanoutBadDialer is the sentinel for a DialFunc that
// violates its contract — returns no error but also no client
// (nil, nil, nil). Without this guard the goroutine would panic on
// client.GetCapability(...) and take down the admin RPC path
// instead of producing a Reachable=false verdict.
var errCapabilityFanoutBadDialer = errors.New("capability fan-out: dialer returned nil client without error")

// errCapabilityFanoutMismatchedResponder is the sentinel for the
// stale-routing / shared-address case: the snapshot expected one
// full_node_id at an address but the responder reports a different
// one. Accepting the response would credit the expected member as
// verified even though the intended member never actually answered.
// Fail-closed: verdict carries this error with Reachable=false.
var errCapabilityFanoutMismatchedResponder = errors.New("capability fan-out: responder full_node_id does not match expected")

// errCapabilityFanoutProbeTimeout is the sentinel pre-seeded on
// every verdict slot before the probe goroutines start. A buggy
// DialFunc/client that ignores ctx cancellation would otherwise
// hang the helper indefinitely; by pre-seeding the slot and
// returning whichever slots have been overwritten by the deadline,
// the helper honors its §4.3 "Returns within timeout" contract
// even against an uncooperative dialer.
var errCapabilityFanoutProbeTimeout = errors.New("capability fan-out: probe did not complete within timeout")

// RouteMember is one peer entry in a Raft group. The fan-out helper
// dials Address and identifies the node by FullNodeID for dedup
// across groups (a node serving multiple groups is probed once).
type RouteMember struct {
FullNodeID uint64
Address string
}

// RouteGroup is one Raft group's membership. Voters and Learners are
// kept separate at the input level so the cutover RPC handler can
// log them distinctly, but CapabilityFanout treats them identically
// per the §4.1 contract "every (voter ∪ learner) of every Raft
// group". The 6D design pins that learner unreachability is a hard
// no the same way voter unreachability is — see §8 / row "One
// learner unreachable during fan-out".
type RouteGroup struct {
GroupID uint64
Voters []RouteMember
Learners []RouteMember
}

// RouteSnapshot is the input the cutover RPC handler builds from
// the Raft engine's membership view and passes to CapabilityFanout.
// Independent of the route-catalog `distribution.CatalogSnapshot`,
// which only carries shard→group mappings, not per-group membership.
type RouteSnapshot struct {
Groups []RouteGroup
}

// CapabilityVerdict is one node's per-call outcome. Reachable=false
// means the dial or the RPC timed out / failed at transport level —
// not a "no" answer from the peer. The cutover RPC handler treats
// both Reachable=false and EncryptionCapable=false as hard refusals
// per the §8 failure-modes table, but the verdict separates them so
// the operator-facing error message can name the precise reason.
type CapabilityVerdict struct {
FullNodeID uint64
EncryptionCapable bool
BuildSHA string
SidecarPresent bool
Reachable bool
Err error
}

// CapabilityFanoutResult is the aggregated outcome. OK is true iff
// every verdict has Reachable && EncryptionCapable — there is no
// partial-success mode per §4.3.
//
// Named CapabilityFanoutResult rather than the design-doc's
// `FanoutResult` to avoid a collision with the unrelated
// `admin.FanoutResult` in `keyviz_fanout.go` (KeyViz cluster fan-out
// shipped earlier in the same package).
type CapabilityFanoutResult struct {
Verdicts []CapabilityVerdict
OK bool
}

// DialFunc opens a connection to one node's admin endpoint and
// returns an EncryptionAdmin client plus a cleanup closure. The
// helper invokes the closure exactly once per successful dial,
// regardless of how the RPC subsequently resolved.
//
// The 6D design says "DialFunc reuses the existing admin connection
// pool" — the concrete implementation will reach for whatever
// connection-pool helper the caller already holds (e.g. the
// `internal/admin.ForwardClient` connection pool for TLS-aware
// dials).
type DialFunc func(ctx context.Context, address string) (pb.EncryptionAdminClient, func(), error)

// CapabilityFanout fans GetCapability out to every unique
// (voter ∪ learner) of every group in routes. Concurrent; bounded
// by timeout regardless of how many members respond. Missing
// responses surface as Reachable=false verdicts (no partial-success
// mode — see §4.3).
//
// Dedup key: FullNodeID. A node serving multiple groups is probed
// exactly once. Members with FullNodeID=0 are treated as distinct
// dedup keys per unique address; this case appears in stub catalogs
// where the dedup-by-id contract has not been satisfied — the
// helper still completes by falling back to address-based identity
// rather than silently collapsing every zero-id row into one probe.
//
// Returns (result, nil) on every input. The error slot is reserved
// for input validation failures (zero-member snapshot, etc.) so
// callers can keep their existing `err != nil → refuse` shape.
func CapabilityFanout(
ctx context.Context,
routes RouteSnapshot,
dial DialFunc,
timeout time.Duration,
) (CapabilityFanoutResult, error) {
if dial == nil {
return CapabilityFanoutResult{}, pkgerrors.Wrap(errCapabilityFanoutBadInput, "dial func is nil")
}
if timeout <= 0 {
return CapabilityFanoutResult{}, pkgerrors.Wrapf(errCapabilityFanoutBadInput, "timeout must be positive, got %v", timeout)
}

dedupKeys := buildCapabilityFanoutDedupSet(routes)
if len(dedupKeys) == 0 {
return CapabilityFanoutResult{Verdicts: []CapabilityVerdict{}, OK: false}, nil
}

fanCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

results := runCapabilityFanoutProbes(fanCtx, dedupKeys, dial)
return CapabilityFanoutResult{Verdicts: results, OK: capabilityFanoutAllOK(results)}, nil
}

// buildCapabilityFanoutDedupSet folds every voter ∪ learner of
// every group into the dedup map keyed by FullNodeID (or address
// for unidentified rows; a synthetic key for fully-malformed rows
// with neither FullNodeID nor Address — see capabilityFanoutDedupKey).
// Pulled out of CapabilityFanout to keep the orchestration body
// under the cyclomatic-complexity budget. The size hint is the
// upper bound on cardinality (every row distinct); the map shrinks
// at no cost when dedup collapses entries.
func buildCapabilityFanoutDedupSet(routes RouteSnapshot) map[string]RouteMember {
upperBound := 0
for _, group := range routes.Groups {
upperBound += len(group.Voters) + len(group.Learners)
}
dedupKeys := make(map[string]RouteMember, upperBound)
for _, group := range routes.Groups {
for _, m := range group.Voters {
addCapabilityFanoutDedupTarget(dedupKeys, m)
}
for _, m := range group.Learners {
addCapabilityFanoutDedupTarget(dedupKeys, m)
}
}
return dedupKeys
}

// runCapabilityFanoutProbes dials every dedup target concurrently
// and returns the per-node verdicts in unspecified order. Bounded
// by ctx — when the deadline fires before a probe goroutine
// finishes, its pre-seeded "probe-timeout" verdict surfaces as
// Reachable=false instead of the helper waiting indefinitely for
// a buggy DialFunc/client that ignores ctx cancellation. This
// pins the §4.3 "Returns within `timeout`" contract.
//
// Note on goroutine cleanup: goroutines whose probes ignore ctx
// continue running until they unblock on their own. The helper
// does NOT promise to reclaim those goroutines — the contract is
// "the HELPER returns within timeout", not "every spawned
// goroutine returns within timeout". A dialer / client that
// ignores ctx is a bug on the caller side; the helper's job is to
// prevent that bug from hanging the admin RPC path.
func runCapabilityFanoutProbes(ctx context.Context, dedupKeys map[string]RouteMember, dial DialFunc) []CapabilityVerdict {
slots := make(map[string]*CapabilityVerdict, len(dedupKeys))
for key, member := range dedupKeys {
v := CapabilityVerdict{
FullNodeID: member.FullNodeID,
Err: pkgerrors.Wrap(errCapabilityFanoutProbeTimeout, member.Address),
}
slots[key] = &v
}

var mu sync.Mutex
var wg sync.WaitGroup
for key, m := range dedupKeys {
wg.Add(1)
go func(slotKey string, member RouteMember) {
defer wg.Done()
verdict := probeCapability(ctx, member, dial)
mu.Lock()
*slots[slotKey] = verdict
mu.Unlock()
}(key, m)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
case <-ctx.Done():
}

mu.Lock()
defer mu.Unlock()
out := make([]CapabilityVerdict, 0, len(slots))
for _, v := range slots {
out = append(out, *v)
}
return out
}

func capabilityFanoutAllOK(verdicts []CapabilityVerdict) bool {
if len(verdicts) == 0 {
return false
}
for _, v := range verdicts {
if !v.Reachable || !v.EncryptionCapable {
return false
}
}
return true
}

// addCapabilityFanoutDedupTarget folds a member into the dedup map.
// Fail-closed posture: malformed rows (empty Address) are NOT
// dropped here — they would silently disappear from Result.Verdicts
// and let OK=true sneak through despite an unprobed peer. Instead
// every row is admitted; probeCapability classifies empty-Address
// members as Reachable=false with errCapabilityFanoutMalformedMember
// so the cutover RPC's caller-facing error can name the missing
// address.
func addCapabilityFanoutDedupTarget(m map[string]RouteMember, member RouteMember) {
key := capabilityFanoutDedupKey(member, len(m))
if _, exists := m[key]; exists {
return
}
m[key] = member
}

// capabilityFanoutDedupKey is the dedup key for a route member.
//
// - FullNodeID != 0 → "id:<full_node_id>" (the §4.1 dedup contract)
// - FullNodeID == 0 ∧ Address != "" → "addr:<address>"
// (stub catalogs where dedup-by-id isn't satisfied still
// dedupe by address rather than silently collapsing rows)
// - FullNodeID == 0 ∧ Address == "" → "synthetic:<ordinal>"
// (a fully-malformed row gets a unique key so two such rows
// surface as two separate Reachable=false verdicts instead
// of collapsing into one and hiding the second
// misconfiguration)
func capabilityFanoutDedupKey(member RouteMember, ordinal int) string {
if member.FullNodeID != 0 {
return "id:" + uint64ToDecimal(member.FullNodeID)
}
if member.Address != "" {
return "addr:" + member.Address
}
return "synthetic:" + strconv.Itoa(ordinal)
}

// uint64ToDecimal avoids pulling fmt for one-call hot-path
// stringification used only to build dedup keys.
func uint64ToDecimal(v uint64) string {
if v == 0 {
return "0"
}
var buf [20]byte
i := len(buf)
for v > 0 {
i--
buf[i] = byte('0' + v%10)
v /= 10
}
return string(buf[i:])
}

func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) CapabilityVerdict {
verdict := CapabilityVerdict{FullNodeID: member.FullNodeID}
if member.Address == "" {
verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMalformedMember, "full_node_id=%d has empty address", member.FullNodeID)
return verdict
}
client, closer, err := dial(ctx, member.Address)
if err != nil {
verdict.Err = pkgerrors.Wrapf(err, "dial %s", member.Address)
return verdict
}
if closer != nil {
defer closer()
}
if client == nil {
verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutBadDialer, "dial %s", member.Address)
return verdict
}
report, err := client.GetCapability(ctx, &pb.Empty{})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle nil dial client before issuing RPC

If dial returns (nil, closer, nil), this line panics on client.GetCapability(...) and can crash the admin RPC path instead of producing a Reachable=false verdict. Because DialFunc is injected and not validated beyond err, the helper should fail closed when client == nil (treating it like a dial failure) so a bad dialer implementation cannot take down cutover preflight.

Useful? React with 👍 / 👎.

if err != nil {
verdict.Err = pkgerrors.Wrapf(err, "GetCapability %s", member.Address)
return verdict
}
// Mismatched-responder guard: in a stale-routing or
// shared-address scenario, the responder might report a
// different full_node_id than the one the snapshot expected.
// Accepting the response would credit the expected member as
// verified despite no member-X ever answering. Fail closed.
if member.FullNodeID != 0 && report.GetFullNodeId() != 0 && report.GetFullNodeId() != member.FullNodeID {
verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMismatchedResponder,
"%s: expected full_node_id=%d got %d", member.Address, member.FullNodeID, report.GetFullNodeId())
return verdict
}
verdict.Reachable = true
verdict.EncryptionCapable = report.GetEncryptionCapable()
verdict.BuildSHA = report.GetBuildSha()
verdict.SidecarPresent = report.GetSidecarPresent()
if report.GetFullNodeId() != 0 {
verdict.FullNodeID = report.GetFullNodeId()
Comment on lines +348 to +349
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject capability replies from unexpected node IDs

The probe accepts any successful GetCapability response as evidence that the target member is reachable/capable, but it never validates that the responder’s full_node_id matches the expected route member. In a stale/misrouted address scenario (or if two members accidentally share an address), one healthy node can satisfy probes for another and OK can incorrectly become true even though the intended member was never verified. This check should fail closed on ID mismatch instead of overwriting the verdict ID.

Useful? React with 👍 / 👎.

}
return verdict
}
Loading
Loading