diff --git a/internal/backup/snapshot_reader.go b/internal/backup/snapshot_reader.go new file mode 100644 index 00000000..a6cb0bc8 --- /dev/null +++ b/internal/backup/snapshot_reader.go @@ -0,0 +1,432 @@ +package backup + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + + cockroachdberr "github.com/cockroachdb/errors" +) + +// snapshot_reader.go consumes the native Pebble snapshot format +// produced by store/lsm_store.go::pebbleSnapshotMagic + +// restoreBatchLoopInto and yields each entry as a (userKey, +// userValue, tombstone, expireAt) tuple after stripping the MVCC +// encoding the live store layers on top of raw Pebble bytes. +// +// Snapshot file shape: +// +// [8 bytes] magic "EKVPBBL1" +// [8 bytes] lastCommitTS (LittleEndian uint64) +// repeated: +// [8 bytes] keyLen (LittleEndian uint64) +// [keyLen] encoded key = +// [8 bytes] valLen (LittleEndian uint64) +// [valLen] encoded value = +// (flags bit 0 = tombstone, bits 1-2 = encryption_state) +// +// Mirrors store/lsm_store.go:1670-1697 (readRestoreEntry) and +// :336-340 (fillEncodedKey) and :419-422 (fillEncodedValue). The +// constants are duplicated here so this package stays +// adapter/store-independent (the design requires the decoder to +// run as an offline tool against a `.fsm` file with no live cluster +// libraries linked). + +// Snapshot format constants — mirror store/lsm_store.go. +const ( + // PebbleSnapshotMagicLen is the byte length of the "EKVPBBL1" + // header. Exposed so callers can sniff the first 8 bytes of a + // file to decide whether to dispatch into ReadSnapshot or fall + // through to another reader. + PebbleSnapshotMagicLen = 8 + + // snapshotTSSize is the 8-byte inverted-TS suffix appended to + // every encoded key (`store.fillEncodedKey`). + snapshotTSSize = 8 + + // snapshotValueHeaderSize is the 9-byte value-header prefix + // (flags + expireAt) on every encoded value + // (`store.fillEncodedValue`). + snapshotValueHeaderSize = 9 + + // snapshotTombstoneMask / snapshotEncStateMask / snapshotEncStateShift + // mirror store.tombstoneMask / encStateMask / encStateShift. A + // rename on the live side without an accompanying update here + // would surface at the snapshot reader's table-driven tests. + snapshotTombstoneMask byte = 0b0000_0001 + snapshotEncStateMask byte = 0b0000_0110 + snapshotEncStateShift = 1 + snapshotEncStateReserved byte = 0b1111_1000 // bits 3-7 must be zero + snapshotEncStateCleartx byte = 0b00 + snapshotEncStateEncrypt byte = 0b01 + + // MaxSnapshotEncodedKeySize / MaxSnapshotEncodedValueSize bound the + // per-entry allocations made by readExact / readExactGrow. Mirrors + // the live store's `maxPebbleEncodedKeySize` (1 MiB user-key cap + + // 8-byte TS suffix; store/mvcc_store.go:29 + store/lsm_store.go:51) + // and `maxSnapshotValueSize + valueHeaderSize + + // encryption.EnvelopeOverhead` (256 MiB cleartext + 9-byte header + + // 34-byte envelope; store/mvcc_store.go:37 + + // store/lsm_store.go:1692). Without these guards a corrupt or + // adversarial snapshot whose length-prefix declares a huge size + // would either OOM the decoder via `make([]byte, n)` or, on 32-bit + // architectures, panic when narrowing `uint64` → `int` for the + // slice length. Codex P1 + gemini security-high (PR #792 round 1). + // + // Duplicated here (rather than imported from `store`) so the + // backup package keeps the adapter-independence required by the + // design (it must run as a standalone offline tool with no live- + // cluster libraries linked). The values are reviewed for staleness + // alongside the live store's constants at every PR round. + + // maxSnapshotUserKeySize mirrors store/mvcc_store.go:29 + // `maxSnapshotKeySize` = 1 MiB. + maxSnapshotUserKeySize = 1 << maxSnapshotUserKeyShift + // maxSnapshotUserValueSize mirrors store/mvcc_store.go:37 + // `maxSnapshotValueSize` = 256 MiB. + maxSnapshotUserValueSize = maxSnapshotValueMiB << maxSnapshotValueShift + maxSnapshotUserKeyShift = 20 // 1 << 20 == 1 MiB + maxSnapshotValueShift = 20 // << 20 == byte count + maxSnapshotValueMiB = 256 + MaxSnapshotEncodedKeySize = maxSnapshotUserKeySize + snapshotTSSize + MaxSnapshotEncodedValueSize = maxSnapshotUserValueSize + snapshotValueHeaderSize + envelopeMaxB + + // envelopeMaxB mirrors internal/encryption.EnvelopeOverhead (12- + // byte nonce + 16-byte tag + 6-byte AAD-binding header = 34). + // Duplicated here so we do not import the encryption package + // from the backup-package's offline-tool boundary. + envelopeMaxB = 34 +) + +// PebbleSnapshotMagic is the 8-byte file header that introduces a +// native Pebble snapshot. Exposed for callers that need to sniff a +// file before deciding which reader to dispatch to. Declared as an +// untyped string CONSTANT (not a `var [8]byte`) so an importer +// cannot mutate the bytes — a writable package variable would let +// any caller corrupt the header globally and break parsing for +// every consumer (coderabbit Major on PR #792 round 2). +// Callers comparing against the magic should treat the encoded-key +// type of their own data: most call sites convert to a byte slice +// via `[]byte(PebbleSnapshotMagic)` at the comparison point. +const PebbleSnapshotMagic = "EKVPBBL1" + +// ErrSnapshotBadMagic is returned when the first 8 bytes of the +// reader do not match `EKVPBBL1`. The decoder caller should treat +// this as an immediate hard failure rather than try to skip past +// the bad header — a wrong magic almost always indicates the file +// is not actually a Pebble snapshot (an MVCC streaming snapshot, +// a tar archive, a partial truncate, etc.). +var ErrSnapshotBadMagic = cockroachdberr.New("backup: snapshot magic header does not match \"EKVPBBL1\"") + +// ErrSnapshotTruncated is returned when the snapshot ends mid-entry +// (after a key length but before the key, or after a value length +// but before the value). A clean EOF at the start of the +// key-length field is a normal terminator and is NOT an error. +var ErrSnapshotTruncated = cockroachdberr.New("backup: snapshot truncated mid-entry") + +// ErrSnapshotEncryptedReserved is returned when a value-header +// carries reserved encryption_state bits (0b10 or 0b11). Mirrors +// store.ErrEncryptedValueReservedState — the decoder fails closed +// rather than treat the body as cleartext, matching the design's +// §7.1 fail-closed contract. +var ErrSnapshotEncryptedReserved = cockroachdberr.New("backup: value header carries reserved encryption_state; decoder cannot interpret this entry") + +// ErrSnapshotEncryptedEntry is returned when a value-header +// declares the entry is encrypted (encState=0b01). Phase 0a does +// NOT carry the decryption keyring; an encrypted snapshot must be +// decoded with a Phase 0a+keyring binary or after Stage 8 of the +// encryption rollout reverses the encryption. +var ErrSnapshotEncryptedEntry = cockroachdberr.New("backup: snapshot contains encrypted entries — Phase 0a does not link the decryption keyring") + +// ErrSnapshotShortKey is returned when an entry's encoded key is +// shorter than the 8-byte timestamp suffix that +// `store.fillEncodedKey` always appends. Indicates a corrupt +// snapshot — the live store would never emit such a key. +var ErrSnapshotShortKey = cockroachdberr.New("backup: encoded key shorter than timestamp suffix") + +// ErrSnapshotKeyTooLarge / ErrSnapshotValueTooLarge are returned +// when the on-disk length prefix declares an entry larger than the +// MaxSnapshotEncodedKeySize / MaxSnapshotEncodedValueSize budgets. +// Mirrors `store.ErrSnapshotKeyTooLarge` and `store.ErrValueTooLarge` +// from the live restore path so a corrupt or adversarial snapshot +// fails closed at the length-prefix layer instead of triggering an +// OOM-sized allocation (codex P1 + gemini security-high on PR #792). +var ErrSnapshotKeyTooLarge = cockroachdberr.New("backup: snapshot key length exceeds limit") +var ErrSnapshotValueTooLarge = cockroachdberr.New("backup: snapshot value length exceeds limit") + +// ErrSnapshotShortValue is returned when an entry's encoded value +// is shorter than the 9-byte value header. Indicates a corrupt +// snapshot — the live store always writes the header even for +// tombstones. +var ErrSnapshotShortValue = cockroachdberr.New("backup: encoded value shorter than value-header") + +// SnapshotEntry is one decoded entry emitted by ReadSnapshot's +// callback. Fields are the user-visible key / value bytes plus the +// MVCC metadata the decoder peeled off (commit timestamp, expiry, +// tombstone marker). Slices are owned by the snapshot reader's +// scratch buffer and may be overwritten when the callback returns — +// callers that need to retain bytes across iterations must +// `bytes.Clone` them. +type SnapshotEntry struct { + UserKey []byte + UserValue []byte + CommitTS uint64 + ExpireAt uint64 + Tombstone bool +} + +// SnapshotHeader is the decoded preamble returned to the caller +// before iteration begins so the caller can record the snapshot's +// commit-time horizon in its MANIFEST.json (per design §380-422). +type SnapshotHeader struct { + LastCommitTS uint64 +} + +// ReadSnapshot reads the EKVPBBL1 header from r, then yields every +// entry through fn. fn receives a transient SnapshotEntry whose +// byte slices are NOT safe to retain across calls (the reader +// reuses scratch buffers to keep per-entry allocations bounded for +// multi-GB snapshots). If fn returns an error, iteration stops and +// the error is returned verbatim. +// +// Iteration terminates cleanly on EOF at the start of an entry's +// key-length field. EOF inside an entry returns +// ErrSnapshotTruncated. +// +// Tombstone entries (flags bit 0 set) are surfaced via +// SnapshotEntry.Tombstone — callers decide whether to suppress +// them (Phase 0a's intended behavior for backup output) or include +// them (a multi-version diagnostic dump might want both). +func ReadSnapshot(r io.Reader, fn func(SnapshotHeader, SnapshotEntry) error) error { + br := bufio.NewReader(r) + header, err := readSnapshotHeader(br) + if err != nil { + return err + } + var ( + keyBuf [1 << 16]byte + valBuf []byte + ) + for { + stop, err := readOneEntry(br, header, keyBuf[:], &valBuf, fn) + if err != nil { + return err + } + if stop { + return nil + } + } +} + +// readOneEntry handles one (key, value) tuple plus the callback +// dispatch. Extracted from ReadSnapshot so the parent stays under +// the cyclop budget — the same shape every backup encoder uses +// (small fixed driver loop + extracted per-record helper). +// Returns (true, nil) on the natural inter-entry EOF terminator. +func readOneEntry( + r *bufio.Reader, + header SnapshotHeader, + keyScratch []byte, + valBuf *[]byte, + fn func(SnapshotHeader, SnapshotEntry) error, +) (bool, error) { + key, eof, err := readEntryKey(r, keyScratch) + if err != nil { + return false, err + } + if eof { + // Clean inter-entry EOF — natural terminator. + return true, nil + } + value, err := readEntryValue(r, valBuf) + if err != nil { + return false, err + } + entry, err := decodeSnapshotEntry(key, value) + if err != nil { + return false, err + } + if err := fn(header, entry); err != nil { + return false, err + } + return false, nil +} + +// readEntryKey reads the per-entry length-prefix-then-bytes for the +// key half of one entry. Returns (nil, true, nil) on the natural +// inter-entry EOF (clean stream terminator). Applies the +// MaxSnapshotEncodedKeySize bound BEFORE allocating the read +// buffer; without this guard a corrupt or adversarial snapshot +// whose length prefix declares a huge size would OOM the decoder +// or panic on 32-bit narrowing. Mirrors +// `store/lsm_store.go::readRestoreFieldLen`. Codex P1 + gemini +// security-high on PR #792. +func readEntryKey(r *bufio.Reader, scratch []byte) ([]byte, bool, error) { + kLen, kEof, err := readEntryLen(r) + if err != nil { + return nil, false, err + } + if kEof { + return nil, true, nil + } + if kLen > MaxSnapshotEncodedKeySize { + return nil, false, cockroachdberr.Wrapf(ErrSnapshotKeyTooLarge, + "length %d > %d", kLen, MaxSnapshotEncodedKeySize) + } + key, err := readExact(r, scratch[:0], kLen) + if err != nil { + return nil, false, cockroachdberr.WithStack(err) + } + return key, false, nil +} + +// readEntryValue reads the per-entry length-prefix-then-bytes for +// the value half of one entry. A mid-entry EOF (length prefix +// missing after the key was already consumed) surfaces as +// ErrSnapshotTruncated rather than the natural inter-entry EOF — +// gemini high finding on PR #792 (the previous code ignored +// readEntryLen's eof return value here, allowing a truncated stream +// to flow into decodeSnapshotEntry with vLen=0 and surface as the +// wrong error). Applies MaxSnapshotEncodedValueSize before allocation +// (same rationale as readEntryKey). +func readEntryValue(r *bufio.Reader, valBuf *[]byte) ([]byte, error) { + vLen, vEof, err := readEntryLen(r) + if err != nil { + return nil, err + } + if vEof { + return nil, cockroachdberr.WithStack(ErrSnapshotTruncated) + } + if vLen > MaxSnapshotEncodedValueSize { + return nil, cockroachdberr.Wrapf(ErrSnapshotValueTooLarge, + "length %d > %d", vLen, MaxSnapshotEncodedValueSize) + } + *valBuf, err = readExactGrow(r, (*valBuf)[:0], vLen) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return *valBuf, nil +} + +// readSnapshotHeader consumes the 8-byte magic and the 8-byte LE +// lastCommitTS. Returns ErrSnapshotBadMagic on header mismatch. +func readSnapshotHeader(r io.Reader) (SnapshotHeader, error) { + var magic [PebbleSnapshotMagicLen]byte + if _, err := io.ReadFull(r, magic[:]); err != nil { + return SnapshotHeader{}, cockroachdberr.WithStack(err) + } + if !bytes.Equal(magic[:], []byte(PebbleSnapshotMagic)) { + return SnapshotHeader{}, cockroachdberr.Wrapf(ErrSnapshotBadMagic, + "got %q", magic[:]) + } + var ts uint64 + if err := binary.Read(r, binary.LittleEndian, &ts); err != nil { + return SnapshotHeader{}, cockroachdberr.WithStack(err) + } + return SnapshotHeader{LastCommitTS: ts}, nil +} + +// readEntryLen reads an 8-byte LittleEndian length prefix. Returns +// (0, true, nil) on clean EOF — used to detect the natural end of +// the snapshot. Any other read error (including unexpected EOF) is +// returned verbatim. +func readEntryLen(r io.Reader) (uint64, bool, error) { + var raw [8]byte + n, err := io.ReadFull(r, raw[:]) + if err == nil { + return binary.LittleEndian.Uint64(raw[:]), false, nil + } + if cockroachdberr.Is(err, io.EOF) && n == 0 { + return 0, true, nil + } + if cockroachdberr.Is(err, io.ErrUnexpectedEOF) { + return 0, false, cockroachdberr.WithStack(ErrSnapshotTruncated) + } + return 0, false, cockroachdberr.WithStack(err) +} + +// readExact reads exactly n bytes into dst (extending it as +// needed). The returned slice aliases dst's underlying array — the +// caller must not retain it across loop iterations. +func readExact(r io.Reader, dst []byte, n uint64) ([]byte, error) { + if uint64(cap(dst)) < n { + // Cap fallback path: allocate a fresh slice when the + // caller's scratch buffer isn't large enough. For the + // stack-allocated keyBuf this only kicks in on + // pathologically long keys. + return readExactGrow(r, dst, n) + } + dst = dst[:n] + if _, err := io.ReadFull(r, dst); err != nil { + if cockroachdberr.Is(err, io.ErrUnexpectedEOF) || cockroachdberr.Is(err, io.EOF) { + return nil, cockroachdberr.WithStack(ErrSnapshotTruncated) + } + return nil, cockroachdberr.WithStack(err) + } + return dst, nil +} + +// readExactGrow is the heap-fallback variant of readExact. Used +// for value bodies, which can be up to several MiB and so live in +// a separately grown buffer rather than a fixed stack array. +func readExactGrow(r io.Reader, dst []byte, n uint64) ([]byte, error) { + if uint64(cap(dst)) < n { + dst = make([]byte, n) + } else { + dst = dst[:n] + } + if _, err := io.ReadFull(r, dst); err != nil { + if cockroachdberr.Is(err, io.ErrUnexpectedEOF) || cockroachdberr.Is(err, io.EOF) { + return nil, cockroachdberr.WithStack(ErrSnapshotTruncated) + } + return nil, cockroachdberr.WithStack(err) + } + return dst, nil +} + +// decodeSnapshotEntry strips the 8-byte inverted-TS key suffix and +// the 9-byte value header, surfacing the user-visible byte slices +// plus the MVCC metadata. Returns ErrSnapshotShortKey / +// ErrSnapshotShortValue on length violations and +// ErrSnapshotEncryptedReserved / ErrSnapshotEncryptedEntry on bad +// or unsupported encryption_state bits. +func decodeSnapshotEntry(encKey, encVal []byte) (SnapshotEntry, error) { + if len(encKey) < snapshotTSSize { + return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotShortKey, + "encoded key length %d < %d", len(encKey), snapshotTSSize) + } + if len(encVal) < snapshotValueHeaderSize { + return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotShortValue, + "encoded value length %d < %d", len(encVal), snapshotValueHeaderSize) + } + userKey := encKey[:len(encKey)-snapshotTSSize] + invTS := binary.BigEndian.Uint64(encKey[len(encKey)-snapshotTSSize:]) + commitTS := ^invTS + + flags := encVal[0] + if flags&snapshotEncStateReserved != 0 { + return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotEncryptedReserved, + "value header byte %#08b", flags) + } + encState := (flags & snapshotEncStateMask) >> snapshotEncStateShift + switch encState { + case snapshotEncStateCleartx: + // fall through + case snapshotEncStateEncrypt: + return SnapshotEntry{}, cockroachdberr.WithStack(ErrSnapshotEncryptedEntry) + default: + return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotEncryptedReserved, + "encryption_state=%#x is reserved", encState) + } + tombstone := (flags & snapshotTombstoneMask) != 0 + expireAt := binary.LittleEndian.Uint64(encVal[1:snapshotValueHeaderSize]) + userValue := encVal[snapshotValueHeaderSize:] + return SnapshotEntry{ + UserKey: userKey, + UserValue: userValue, + CommitTS: commitTS, + ExpireAt: expireAt, + Tombstone: tombstone, + }, nil +} diff --git a/internal/backup/snapshot_reader_test.go b/internal/backup/snapshot_reader_test.go new file mode 100644 index 00000000..90ad2446 --- /dev/null +++ b/internal/backup/snapshot_reader_test.go @@ -0,0 +1,518 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "io" + "math" + "testing" + + "github.com/cockroachdb/errors" +) + +// snapBuilder is a test-side mirror of the live store's native +// snapshot writer (store/lsm_store.go). Each WriteEntry call +// appends an entry whose key is `` and whose +// value is ``. The Bytes() result +// is the full `EKVPBBL1`-prefixed snapshot stream a real .fsm file +// would carry. +type snapBuilder struct { + buf bytes.Buffer +} + +func newSnapBuilder(lastCommitTS uint64) *snapBuilder { + b := &snapBuilder{} + b.buf.WriteString(PebbleSnapshotMagic) + _ = binary.Write(&b.buf, binary.LittleEndian, lastCommitTS) + return b +} + +// WriteEntry appends one (userKey, commitTS, body) tuple. Tombstone +// flag is independent of body; the live store writes the same +// header shape for both. expireAt is the absolute Unix-ms expiry +// (0 == no TTL). +func (b *snapBuilder) WriteEntry(userKey []byte, commitTS uint64, body []byte, tombstone bool, expireAt uint64, encState byte) { + // key = userKey || (^commitTS)BE + encKey := make([]byte, len(userKey)+snapshotTSSize) + copy(encKey, userKey) + binary.BigEndian.PutUint64(encKey[len(userKey):], ^commitTS) + encVal := make([]byte, snapshotValueHeaderSize+len(body)) + var flags byte + if tombstone { + flags |= snapshotTombstoneMask + } + flags |= (encState << snapshotEncStateShift) & snapshotEncStateMask + encVal[0] = flags + binary.LittleEndian.PutUint64(encVal[1:snapshotValueHeaderSize], expireAt) + copy(encVal[snapshotValueHeaderSize:], body) + _ = binary.Write(&b.buf, binary.LittleEndian, uint64(len(encKey))) + b.buf.Write(encKey) + _ = binary.Write(&b.buf, binary.LittleEndian, uint64(len(encVal))) + b.buf.Write(encVal) +} + +func (b *snapBuilder) Bytes() []byte { return b.buf.Bytes() } + +// TestReadSnapshot_HeaderOnly pins that a snapshot containing only +// the magic + ts header (no entries) terminates cleanly with the +// header surfaced to the callback's first arg and zero entry +// callbacks. +func TestReadSnapshot_HeaderOnly(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0xdeadbeef) + var sawHeader SnapshotHeader + var count int + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(h SnapshotHeader, _ SnapshotEntry) error { + sawHeader = h + count++ + return nil + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if count != 0 { + t.Fatalf("entries = %d, want 0", count) + } + // Header callback never fires on an empty body — we don't + // surface SnapshotHeader through fn until at least one entry + // arrives. The "want 0" check above is sufficient. + _ = sawHeader +} + +// TestReadSnapshot_SingleEntryRoundTrip pins the basic single-entry +// path: header parses, key/value decode, MVCC metadata surfaces. +func TestReadSnapshot_SingleEntryRoundTrip(t *testing.T) { + t.Parallel() + b := newSnapBuilder(100) + b.WriteEntry([]byte("!redis|str|hello"), 42, []byte("world"), false, 1234567, snapshotEncStateCleartx) + var got SnapshotEntry + var hdr SnapshotHeader + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(h SnapshotHeader, e SnapshotEntry) error { + hdr = h + got = e + // Clone the bytes because the reader's scratch buffer + // may be overwritten on return. + got.UserKey = bytes.Clone(e.UserKey) + got.UserValue = bytes.Clone(e.UserValue) + return nil + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if hdr.LastCommitTS != 100 { + t.Fatalf("hdr.LastCommitTS = %d, want 100", hdr.LastCommitTS) + } + if string(got.UserKey) != "!redis|str|hello" { + t.Fatalf("UserKey = %q, want %q", got.UserKey, "!redis|str|hello") + } + if string(got.UserValue) != "world" { + t.Fatalf("UserValue = %q, want %q", got.UserValue, "world") + } + if got.CommitTS != 42 { + t.Fatalf("CommitTS = %d, want 42", got.CommitTS) + } + if got.ExpireAt != 1234567 { + t.Fatalf("ExpireAt = %d, want 1234567", got.ExpireAt) + } + if got.Tombstone { + t.Fatalf("Tombstone = true, want false") + } +} + +// TestReadSnapshot_TombstoneFlagSurfaced pins that the tombstone +// bit on the value-header is exposed via SnapshotEntry.Tombstone. +// Phase 0a callers (the dispatcher) skip tombstones so a restored +// dump matches the snapshot's logical visibility; surfacing the +// flag also lets diagnostic dumps include them. +func TestReadSnapshot_TombstoneFlagSurfaced(t *testing.T) { + t.Parallel() + b := newSnapBuilder(1) + b.WriteEntry([]byte("!redis|str|gone"), 7, nil, true, 0, snapshotEncStateCleartx) + var sawTomb bool + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, e SnapshotEntry) error { + sawTomb = e.Tombstone + return nil + }) + if err != nil { + t.Fatalf("err: %v", err) + } + if !sawTomb { + t.Fatalf("tombstone flag did not surface") + } +} + +// TestReadSnapshot_MultipleEntriesPreserveOrder pins that the +// reader yields entries in the on-disk order. The live snapshot +// stream is sorted by encoded key (userKey||^TS), so newer +// versions of the same userKey appear FIRST. The dispatcher +// relies on this order for first-seen-wins dedup. +func TestReadSnapshot_MultipleEntriesPreserveOrder(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + keys := []string{"!redis|str|a", "!redis|str|b", "!redis|str|c"} + for i, k := range keys { + b.WriteEntry([]byte(k), uint64(i+1), []byte{byte(i)}, false, 0, snapshotEncStateCleartx) //nolint:gosec // i bounded by len(keys)<<63 + } + var gotKeys []string + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, e SnapshotEntry) error { + gotKeys = append(gotKeys, string(e.UserKey)) + return nil + }) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(gotKeys) != len(keys) { + t.Fatalf("entries = %d, want %d", len(gotKeys), len(keys)) + } + for i, k := range keys { + if gotKeys[i] != k { + t.Fatalf("entries[%d] = %q, want %q", i, gotKeys[i], k) + } + } +} + +// TestReadSnapshot_RejectsBadMagic pins that a file whose first 8 +// bytes are not "EKVPBBL1" fails closed with ErrSnapshotBadMagic. +// This is the trip-wire for "operator pointed the decoder at an +// MVCC streaming snapshot, a tar archive, or a truncated file." +func TestReadSnapshot_RejectsBadMagic(t *testing.T) { + t.Parallel() + bad := []byte("MVCCSTRM" + "....16....bytes.") + err := ReadSnapshot(bytes.NewReader(bad), func(_ SnapshotHeader, _ SnapshotEntry) error { + t.Fatalf("callback fired on bad-magic input") + return nil + }) + if !errors.Is(err, ErrSnapshotBadMagic) { + t.Fatalf("err = %v want ErrSnapshotBadMagic", err) + } +} + +// TestReadSnapshot_RejectsTruncatedHeader pins that a file shorter +// than the 16-byte header (magic + ts) returns io.EOF / unexpected +// EOF wrapped in WithStack — not ErrSnapshotBadMagic. This +// matters: a 0-byte file is "no snapshot" and a 4-byte file is "I +// tried to write a snapshot and crashed"; conflating the two would +// hide truncation incidents. +func TestReadSnapshot_RejectsTruncatedHeader(t *testing.T) { + t.Parallel() + err := ReadSnapshot(bytes.NewReader([]byte("EKVP")), func(_ SnapshotHeader, _ SnapshotEntry) error { + return nil + }) + if err == nil { + t.Fatalf("expected error on truncated header") + } + if errors.Is(err, ErrSnapshotBadMagic) { + t.Fatalf("truncated header must NOT report bad magic, got %v", err) + } +} + +// TestReadSnapshot_RejectsTruncatedEntry pins that a snapshot +// ending after a key-length but before the key bytes surfaces as +// ErrSnapshotTruncated. A clean EOF at the start of the key-length +// field is the normal terminator and must NOT trigger this. +func TestReadSnapshot_RejectsTruncatedEntry(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Append a key-length but no key bytes. + var l [8]byte + binary.LittleEndian.PutUint64(l[:], 32) + full := append(b.Bytes(), l[:]...) + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + return nil + }) + if !errors.Is(err, ErrSnapshotTruncated) { + t.Fatalf("err = %v want ErrSnapshotTruncated", err) + } +} + +// TestReadSnapshot_RejectsTruncatedBeforeValueLength pins the +// gemini-high fix on PR #792: a snapshot that ends AFTER the key +// bytes but BEFORE the value-length field must surface as +// ErrSnapshotTruncated (not the previous ErrSnapshotShortValue +// misclassification, which happened because readEntryLen's `eof` +// return value was being ignored on the second call). +func TestReadSnapshot_RejectsTruncatedBeforeValueLength(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Build a complete key (length + bytes) but no value-length. + encKey := make([]byte, 1+snapshotTSSize) + encKey[0] = 'k' + binary.BigEndian.PutUint64(encKey[1:], ^uint64(1)) + var kLen [8]byte + binary.LittleEndian.PutUint64(kLen[:], uint64(len(encKey))) + full := append(b.Bytes(), kLen[:]...) + full = append(full, encKey...) + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + return nil + }) + if !errors.Is(err, ErrSnapshotTruncated) { + t.Fatalf("err = %v want ErrSnapshotTruncated", err) + } +} + +// TestReadSnapshot_RejectsKeyLengthOverBudget pins the codex P1 + +// gemini security-high fix on PR #792: a corrupt or adversarial +// snapshot whose length-prefix declares a key larger than +// MaxSnapshotEncodedKeySize must fail at the length-prefix check +// BEFORE allocating `make([]byte, kLen)`. Without this guard a 4 GB +// length prefix would OOM the decoder (or panic on 32-bit when +// uint64 → int narrowing wraps). +func TestReadSnapshot_RejectsKeyLengthOverBudget(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Length-prefix one byte over the budget. + var l [8]byte + binary.LittleEndian.PutUint64(l[:], uint64(MaxSnapshotEncodedKeySize)+1) + full := append(b.Bytes(), l[:]...) + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + t.Fatalf("callback fired on over-budget key length") + return nil + }) + if !errors.Is(err, ErrSnapshotKeyTooLarge) { + t.Fatalf("err = %v want ErrSnapshotKeyTooLarge", err) + } +} + +// TestReadSnapshot_RejectsValueLengthOverBudget mirrors the +// key-length guard for the value side. +func TestReadSnapshot_RejectsValueLengthOverBudget(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Build a valid key entry, then a value-length one byte over the + // budget. + encKey := make([]byte, 1+snapshotTSSize) + encKey[0] = 'k' + binary.BigEndian.PutUint64(encKey[1:], ^uint64(1)) + var kLen, vLen [8]byte + binary.LittleEndian.PutUint64(kLen[:], uint64(len(encKey))) + binary.LittleEndian.PutUint64(vLen[:], uint64(MaxSnapshotEncodedValueSize)+1) + full := append(b.Bytes(), kLen[:]...) + full = append(full, encKey...) + full = append(full, vLen[:]...) + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + t.Fatalf("callback fired on over-budget value length") + return nil + }) + if !errors.Is(err, ErrSnapshotValueTooLarge) { + t.Fatalf("err = %v want ErrSnapshotValueTooLarge", err) + } +} + +// TestReadSnapshot_AcceptsKeyLengthAtBudgetBoundary pins the off- +// by-one: encoded-key length EXACTLY equal to +// MaxSnapshotEncodedKeySize must be accepted (the reader rejects +// only `> budget`, matching the live store's `readRestoreFieldLen` +// semantics). The earlier 1 KiB version of this test (coderabbit +// Major on PR #792 round 2) would not catch a `>=` vs `>` regression +// because the input was far below the budget. This version sizes +// the user key to make the encoded key (`userKey + 8-byte TS +// suffix`) land at exactly MaxSnapshotEncodedKeySize, then asserts +// successful round-trip. A regression that flipped the comparison +// to `>=` would reject this entry with ErrSnapshotKeyTooLarge. +func TestReadSnapshot_AcceptsKeyLengthAtBudgetBoundary(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // userKey sized so that encoded-key length == MaxSnapshotEncodedKeySize. + userKey := make([]byte, MaxSnapshotEncodedKeySize-snapshotTSSize) + for i := range userKey { + userKey[i] = byte(i % 256) + } + b.WriteEntry(userKey, 1, []byte("v"), false, 0, snapshotEncStateCleartx) + var got SnapshotEntry + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, e SnapshotEntry) error { + got = e + got.UserKey = bytes.Clone(e.UserKey) + return nil + }) + if err != nil { + t.Fatalf("err = %v want nil at length == MaxSnapshotEncodedKeySize", err) + } + if !bytes.Equal(got.UserKey, userKey) { + t.Fatalf("UserKey mismatch at boundary (got len=%d want len=%d)", len(got.UserKey), len(userKey)) + } +} + +// TestReadSnapshot_RejectsShortKey pins the encoded-key-length +// invariant. Every encoded key has an 8-byte TS suffix; an entry +// with a shorter key indicates store corruption. +func TestReadSnapshot_RejectsShortKey(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Encode an entry with a key shorter than the TS suffix by + // hand. WriteEntry won't do this, so we splice the bytes in + // directly. + bodyHdr := []byte{0x00, 0, 0, 0, 0, 0, 0, 0, 0} // 9-byte header, zero flags + expireAt + var kLen, vLen [8]byte + binary.LittleEndian.PutUint64(kLen[:], 4) // < snapshotTSSize=8 + binary.LittleEndian.PutUint64(vLen[:], uint64(len(bodyHdr))) + full := append(b.Bytes(), kLen[:]...) + full = append(full, 'a', 'b', 'c', 'd') + full = append(full, vLen[:]...) + full = append(full, bodyHdr...) + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + return nil + }) + if !errors.Is(err, ErrSnapshotShortKey) { + t.Fatalf("err = %v want ErrSnapshotShortKey", err) + } +} + +// TestReadSnapshot_RejectsShortValue pins that an entry with a +// value shorter than the 9-byte header (flags + expireAt) fails +// closed. The live store always emits the full 9 bytes even for +// tombstones. +func TestReadSnapshot_RejectsShortValue(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Encoded key with a valid 8-byte TS suffix. + encKey := make([]byte, 1+snapshotTSSize) + encKey[0] = 'k' + binary.BigEndian.PutUint64(encKey[1:], ^uint64(1)) + var kLen, vLen [8]byte + binary.LittleEndian.PutUint64(kLen[:], uint64(len(encKey))) + binary.LittleEndian.PutUint64(vLen[:], 4) // < snapshotValueHeaderSize=9 + full := append(b.Bytes(), kLen[:]...) + full = append(full, encKey...) + full = append(full, vLen[:]...) + full = append(full, 'a', 'b', 'c', 'd') + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + return nil + }) + if !errors.Is(err, ErrSnapshotShortValue) { + t.Fatalf("err = %v want ErrSnapshotShortValue", err) + } +} + +// TestReadSnapshot_RejectsEncryptedEntry pins that Phase 0a fails +// closed on encrypted entries. Stage 8 of the encryption rollout +// will add a keyring-aware variant; until then, attempting to +// decode an encrypted snapshot with this binary returns +// ErrSnapshotEncryptedEntry rather than silently emitting +// ciphertext. +func TestReadSnapshot_RejectsEncryptedEntry(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + b.WriteEntry([]byte("k"), 1, []byte("ciphertext"), false, 0, snapshotEncStateEncrypt) + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, _ SnapshotEntry) error { + t.Fatalf("callback must not fire on encrypted entry") + return nil + }) + if !errors.Is(err, ErrSnapshotEncryptedEntry) { + t.Fatalf("err = %v want ErrSnapshotEncryptedEntry", err) + } +} + +// TestReadSnapshot_RejectsReservedEncryptionState pins the §7.1 +// fail-closed contract for reserved encState bits (0b10, 0b11) and +// the high-bit reserved-zero range. +func TestReadSnapshot_RejectsReservedEncryptionState(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + flags byte + }{ + {"reserved-encstate-10", 0b0000_0100}, // encState bits = 0b10 + {"reserved-encstate-11", 0b0000_0110}, // encState bits = 0b11 + {"reserved-high-bit-3", 0b0000_1000}, // bit 3 set + {"reserved-high-bit-7", 0b1000_0000}, // bit 7 set + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + // Build an entry by hand with the chosen flags byte. + encKey := make([]byte, 1+snapshotTSSize) + encKey[0] = 'k' + binary.BigEndian.PutUint64(encKey[1:], ^uint64(1)) + encVal := make([]byte, snapshotValueHeaderSize+1) + encVal[0] = tc.flags + binary.LittleEndian.PutUint64(encVal[1:snapshotValueHeaderSize], 0) + encVal[snapshotValueHeaderSize] = 'v' + var kLen, vLen [8]byte + binary.LittleEndian.PutUint64(kLen[:], uint64(len(encKey))) + binary.LittleEndian.PutUint64(vLen[:], uint64(len(encVal))) + full := append(b.Bytes(), kLen[:]...) + full = append(full, encKey...) + full = append(full, vLen[:]...) + full = append(full, encVal...) + err := ReadSnapshot(bytes.NewReader(full), func(_ SnapshotHeader, _ SnapshotEntry) error { + t.Fatalf("callback fired on reserved encState") + return nil + }) + if !errors.Is(err, ErrSnapshotEncryptedReserved) { + t.Fatalf("err = %v want ErrSnapshotEncryptedReserved", err) + } + }) + } +} + +// TestReadSnapshot_CallbackErrorPropagates pins that fn's error +// terminates iteration and returns the error verbatim (caller can +// distinguish their own error from a snapshot-format error via +// errors.Is). +func TestReadSnapshot_CallbackErrorPropagates(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + b.WriteEntry([]byte("k1"), 1, []byte("v1"), false, 0, snapshotEncStateCleartx) + b.WriteEntry([]byte("k2"), 2, []byte("v2"), false, 0, snapshotEncStateCleartx) + sentinel := errors.New("test sentinel") + var calls int + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, _ SnapshotEntry) error { + calls++ + return sentinel + }) + if !errors.Is(err, sentinel) { + t.Fatalf("err = %v want sentinel", err) + } + if calls != 1 { + t.Fatalf("callback called %d times after sentinel error, want 1", calls) + } +} + +// TestReadSnapshot_CommitTSInversionRoundTrips pins that the +// inverted-TS suffix (^commitTS) decodes back to the original TS +// across the math.MaxUint64 boundary. Off-by-one errors in the +// XOR would break sort order silently. +func TestReadSnapshot_CommitTSInversionRoundTrips(t *testing.T) { + t.Parallel() + for _, ts := range []uint64{0, 1, 1 << 32, math.MaxUint64 - 1, math.MaxUint64} { + b := newSnapBuilder(0) + b.WriteEntry([]byte("k"), ts, []byte("v"), false, 0, snapshotEncStateCleartx) + var got uint64 + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, e SnapshotEntry) error { + got = e.CommitTS + return nil + }) + if err != nil { + t.Fatalf("ts=%d: err=%v", ts, err) + } + if got != ts { + t.Fatalf("ts=%d round-tripped to %d", ts, got) + } + } +} + +// TestReadSnapshot_EmptyValueBody pins that an entry with a 9-byte +// value (header only, zero body) decodes to UserValue=[] without +// firing ErrSnapshotShortValue. This is the on-disk shape of a +// tombstone or a deliberately empty value. +func TestReadSnapshot_EmptyValueBody(t *testing.T) { + t.Parallel() + b := newSnapBuilder(0) + b.WriteEntry([]byte("k"), 1, nil, false, 0, snapshotEncStateCleartx) + var sawEmpty bool + err := ReadSnapshot(bytes.NewReader(b.Bytes()), func(_ SnapshotHeader, e SnapshotEntry) error { + sawEmpty = len(e.UserValue) == 0 + return nil + }) + if err != nil { + t.Fatalf("err: %v", err) + } + if !sawEmpty { + t.Fatalf("expected empty UserValue, got non-empty") + } +} + +// Compile-time sanity: io.EOF can flow up from the reader without +// being misclassified as a snapshot-format error. (Mostly a guard +// for future refactors that might add error wrapping.) +var _ = io.EOF