diff --git a/sei-db/common/iterators/domain_iterator.go b/sei-db/common/iterators/domain_iterator.go new file mode 100644 index 0000000000..304867b82f --- /dev/null +++ b/sei-db/common/iterators/domain_iterator.go @@ -0,0 +1,33 @@ +package iterators + +import ( + "fmt" + + dbm "github.com/tendermint/tm-db" +) + +var _ dbm.Iterator = (*domainIterator)(nil) + +// domainIterator wraps a parent iterator and overrides Domain() to report a +// caller-supplied [start, end) range. It is useful when the parent is built +// over a physical/translated keyspace (so its own Domain() reflects physical +// bounds) but callers expect the logical bounds they requested, as required by +// the dbm.Iterator contract. All other methods are inherited from the parent. +type domainIterator struct { + dbm.Iterator + start []byte + end []byte +} + +// NewDomainIterator returns an iterator that behaves exactly like parent except +// that Domain() reports [start, end). The parent must be non-nil. +func NewDomainIterator(parent dbm.Iterator, start, end []byte) (dbm.Iterator, error) { + if parent == nil { + return nil, fmt.Errorf("nil parent iterator") + } + return &domainIterator{Iterator: parent, start: start, end: end}, nil +} + +func (d *domainIterator) Domain() ([]byte, []byte) { + return d.start, d.end +} diff --git a/sei-db/common/iterators/domain_iterator_test.go b/sei-db/common/iterators/domain_iterator_test.go new file mode 100644 index 0000000000..32bdcaf05a --- /dev/null +++ b/sei-db/common/iterators/domain_iterator_test.go @@ -0,0 +1,63 @@ +package iterators_test + +import ( + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/common/iterators" + "github.com/stretchr/testify/require" +) + +func TestNewDomainIterator_NilParent(t *testing.T) { + it, err := iterators.NewDomainIterator(nil, []byte("a"), []byte("z")) + require.Error(t, err) + require.Nil(t, it) +} + +func TestNewDomainIterator_OverridesDomain(t *testing.T) { + data := map[string][]byte{ + "b": []byte("vb"), + "a": []byte("va"), + "c": []byte("vc"), + } + parent, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, data) + require.NoError(t, err) + + // Sanity check: the parent reports nil bounds before wrapping. + pStart, pEnd := parent.Domain() + require.Nil(t, pStart) + require.Nil(t, pEnd) + + start, end := []byte("a"), []byte("d") + it, err := iterators.NewDomainIterator(parent, start, end) + require.NoError(t, err) + defer it.Close() + + gotStart, gotEnd := it.Domain() + require.Equal(t, start, gotStart) + require.Equal(t, end, gotEnd) +} + +func TestNewDomainIterator_DelegatesIteration(t *testing.T) { + data := map[string][]byte{ + "a": []byte("va"), + "b": []byte("vb"), + "c": []byte("vc"), + } + parent, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, data) + require.NoError(t, err) + + it, err := iterators.NewDomainIterator(parent, []byte("a"), []byte("d")) + require.NoError(t, err) + defer it.Close() + + var got [][2][]byte + for ; it.Valid(); it.Next() { + got = append(got, [2][]byte{it.Key(), it.Value()}) + } + require.NoError(t, it.Error()) + require.Equal(t, [][2][]byte{ + {[]byte("a"), []byte("va")}, + {[]byte("b"), []byte("vb")}, + {[]byte("c"), []byte("vc")}, + }, got) +} diff --git a/sei-db/common/iterators/map_iterator.go b/sei-db/common/iterators/map_iterator.go new file mode 100644 index 0000000000..55f6d6416b --- /dev/null +++ b/sei-db/common/iterators/map_iterator.go @@ -0,0 +1,160 @@ +package iterators + +import ( + "bytes" + "fmt" + "sort" + + "github.com/sei-protocol/sei-chain/sei-db/common/utils" + dbm "github.com/tendermint/tm-db" +) + +var _ dbm.Iterator = (*mapIterator[any])(nil) + +// Iterates over a map of key/value pairs. +type mapIterator[T any] struct { + kvPairs []kvPair + currentIndex int + start []byte + end []byte +} + +type kvPair struct { + key []byte + value []byte +} + +// BytesSerializer is a pass-through serializer for map[string][]byte. +func BytesSerializer(v []byte) ([]byte, error) { + return v, nil +} + +// NewMapIterator returns an iterator over the union of maps in lexicographic order +// (or reverse lex order when ascending is false). start is inclusive; end is +// exclusive. nil start or end means unbounded on that side. Duplicate keys across +// maps are rejected. Values are serialized with serializer before iteration. +func NewMapIterator[T any]( + start []byte, + end []byte, + ascending bool, + serializer func(T) ([]byte, error), + maps ...map[string]T, +) (dbm.Iterator, error) { + if serializer == nil { + return nil, fmt.Errorf("nil serializer") + } + pairs, err := buildMapPairs(start, end, ascending, serializer, maps...) + if err != nil { + return nil, err + } + return &mapIterator[T]{ + kvPairs: pairs, + start: start, + end: end, + }, nil +} + +func buildMapPairs[T any]( + start, end []byte, + ascending bool, + serializer func(T) ([]byte, error), + maps ...map[string]T, +) ([]kvPair, error) { + if start != nil && end != nil && bytes.Compare(start, end) > 0 { + return nil, nil + } + + total := 0 + for _, data := range maps { + total += len(data) + } + if total == 0 { + return nil, nil + } + + seen := make(map[string]struct{}, total) + pairs := make([]kvPair, 0, total) + for _, data := range maps { + if data == nil { + continue + } + for k, v := range data { + if _, dup := seen[k]; dup { + return nil, fmt.Errorf("duplicate key %q", k) + } + seen[k] = struct{}{} + + key := []byte(k) + if !keyInRange(key, start, end) { + continue + } + + serialized, err := serializer(v) + if err != nil { + return nil, fmt.Errorf("serialize key %q: %w", k, err) + } + pairs = append(pairs, kvPair{ + key: utils.Clone(key), + value: utils.Clone(serialized), + }) + } + } + + sort.Slice(pairs, func(i, j int) bool { + cmp := bytes.Compare(pairs[i].key, pairs[j].key) + if ascending { + return cmp < 0 + } + return cmp > 0 + }) + return pairs, nil +} + +func keyInRange(key, start, end []byte) bool { + if start != nil && bytes.Compare(key, start) < 0 { + return false + } + if end != nil && bytes.Compare(key, end) >= 0 { + return false + } + return true +} + +func (m *mapIterator[T]) Close() error { + m.kvPairs = nil + m.currentIndex = 0 + return nil +} + +func (m *mapIterator[T]) Domain() ([]byte, []byte) { + return m.start, m.end +} + +func (m *mapIterator[T]) Error() error { + return nil +} + +func (m *mapIterator[T]) Key() []byte { + if !m.Valid() { + return nil + } + return m.kvPairs[m.currentIndex].key +} + +func (m *mapIterator[T]) Next() { + if !m.Valid() { + return + } + m.currentIndex++ +} + +func (m *mapIterator[T]) Valid() bool { + return m.currentIndex >= 0 && m.currentIndex < len(m.kvPairs) +} + +func (m *mapIterator[T]) Value() []byte { + if !m.Valid() { + return nil + } + return m.kvPairs[m.currentIndex].value +} diff --git a/sei-db/common/iterators/map_iterator_test.go b/sei-db/common/iterators/map_iterator_test.go new file mode 100644 index 0000000000..095f3a75f8 --- /dev/null +++ b/sei-db/common/iterators/map_iterator_test.go @@ -0,0 +1,167 @@ +package iterators_test + +import ( + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/common/iterators" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +func TestNewMapIterator_Empty(t *testing.T) { + it, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer) + require.NoError(t, err) + require.False(t, it.Valid()) + require.Nil(t, it.Key()) + require.Nil(t, it.Value()) + require.NoError(t, it.Error()) + start, end := it.Domain() + require.Nil(t, start) + require.Nil(t, end) + require.NoError(t, it.Close()) + require.False(t, it.Valid()) +} + +func TestNewMapIterator_Ascending(t *testing.T) { + data := map[string][]byte{ + "c": []byte("vc"), + "a": []byte("va"), + "b": []byte("vb"), + } + it, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, data) + require.NoError(t, err) + defer it.Close() + + got := collectMapIterPairs(t, it) + require.Equal(t, [][2][]byte{ + {[]byte("a"), []byte("va")}, + {[]byte("b"), []byte("vb")}, + {[]byte("c"), []byte("vc")}, + }, got) +} + +func TestNewMapIterator_Descending(t *testing.T) { + data := map[string][]byte{ + "c": []byte("vc"), + "a": []byte("va"), + "b": []byte("vb"), + } + it, err := iterators.NewMapIterator(nil, nil, false, iterators.BytesSerializer, data) + require.NoError(t, err) + defer it.Close() + + got := collectMapIterPairs(t, it) + require.Equal(t, [][2][]byte{ + {[]byte("c"), []byte("vc")}, + {[]byte("b"), []byte("vb")}, + {[]byte("a"), []byte("va")}, + }, got) +} + +func TestNewMapIterator_CombinesMaps(t *testing.T) { + left := map[string][]byte{"a": []byte("1"), "c": []byte("3")} + right := map[string][]byte{"b": []byte("2")} + it, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, left, right) + require.NoError(t, err) + defer it.Close() + + got := collectMapIterPairs(t, it) + require.Equal(t, [][2][]byte{ + {[]byte("a"), []byte("1")}, + {[]byte("b"), []byte("2")}, + {[]byte("c"), []byte("3")}, + }, got) +} + +func TestNewMapIterator_DuplicateKey(t *testing.T) { + left := map[string][]byte{"k": []byte("v0")} + right := map[string][]byte{"k": []byte("v1")} + _, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, left, right) + require.Error(t, err) + require.Contains(t, err.Error(), `duplicate key "k"`) +} + +func TestNewMapIterator_Domain(t *testing.T) { + data := map[string][]byte{ + "a": []byte("1"), + "b": []byte("2"), + "c": []byte("3"), + "d": []byte("4"), + } + start := []byte("b") + end := []byte("d") + it, err := iterators.NewMapIterator(start, end, true, iterators.BytesSerializer, data) + require.NoError(t, err) + defer it.Close() + + got := collectMapIterPairs(t, it) + require.Equal(t, [][2][]byte{ + {[]byte("b"), []byte("2")}, + {[]byte("c"), []byte("3")}, + }, got) + + domainStart, domainEnd := it.Domain() + require.Equal(t, start, domainStart) + require.Equal(t, end, domainEnd) +} + +func TestNewMapIterator_StartInclusiveEndExclusive(t *testing.T) { + data := map[string][]byte{ + "k1": []byte("v1"), + "k2": []byte("v2"), + } + it, err := iterators.NewMapIterator([]byte("k1"), []byte("k1"), true, iterators.BytesSerializer, data) + require.NoError(t, err) + require.False(t, it.Valid()) + require.NoError(t, it.Close()) + + it, err = iterators.NewMapIterator([]byte("k1"), []byte("k2"), true, iterators.BytesSerializer, data) + require.NoError(t, err) + got := collectMapIterPairs(t, it) + require.Equal(t, [][2][]byte{{[]byte("k1"), []byte("v1")}}, got) +} + +func TestNewMapIterator_InvalidRange(t *testing.T) { + data := map[string][]byte{"a": []byte("1")} + it, err := iterators.NewMapIterator([]byte("z"), []byte("a"), true, iterators.BytesSerializer, data) + require.NoError(t, err) + require.False(t, it.Valid()) + require.NoError(t, it.Close()) +} + +func TestNewMapIterator_IsolatedFromMapMutations(t *testing.T) { + data := map[string][]byte{"k": []byte("v")} + it, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, data) + require.NoError(t, err) + require.True(t, it.Valid()) + + data["k"] = []byte("mutated") + delete(data, "k") + + require.Equal(t, []byte("k"), it.Key()) + require.Equal(t, []byte("v"), it.Value()) + require.NoError(t, it.Close()) +} + +func TestMapIterator_NextAfterExhausted(t *testing.T) { + it, err := iterators.NewMapIterator(nil, nil, true, iterators.BytesSerializer, map[string][]byte{"a": []byte("1")}) + require.NoError(t, err) + require.True(t, it.Valid()) + it.Next() + require.False(t, it.Valid()) + require.Nil(t, it.Key()) + require.Nil(t, it.Value()) + it.Next() // no-op + require.False(t, it.Valid()) + require.NoError(t, it.Close()) +} + +func collectMapIterPairs(t *testing.T, it dbm.Iterator) [][2][]byte { + t.Helper() + var got [][2][]byte + for ; it.Valid(); it.Next() { + got = append(got, [2][]byte{it.Key(), it.Value()}) + } + require.NoError(t, it.Error()) + return got +} diff --git a/sei-db/common/iterators/merging_iterator.go b/sei-db/common/iterators/merging_iterator.go index 37947f6a75..297df3f481 100644 --- a/sei-db/common/iterators/merging_iterator.go +++ b/sei-db/common/iterators/merging_iterator.go @@ -30,22 +30,27 @@ type mergingIterator struct { // the index of the iterator that should next emit a value nextIteratorIndex int + // ascending is true when children iterate in ascending key order. + ascending bool + // the error encountered by the iterator, if any err error } // NewMergingIterator combines iterators into a single iterator. // -// Each child must be in ascending lexicographic order without duplicate keys; -// otherwise behavior is undefined. Output is in global lex order. Duplicate -// keys across children are emitted once; the last child wins. +// Each child must iterate in the same direction as ascending (lex ascending +// when true, lex descending when false) without duplicate keys; otherwise +// behavior is undefined. Duplicate keys across children are emitted once; the +// last child wins. // // Intended for a small number of iterators (on the order of half a dozen). May // not be performant for combining large numbers of iterators. -func NewMergingIterator(iterators ...dbm.Iterator) (dbm.Iterator, error) { +func NewMergingIterator(ascending bool, iterators ...dbm.Iterator) (dbm.Iterator, error) { m := &mergingIterator{ iterators: make([]dbm.Iterator, len(iterators)), nextIteratorIndex: -1, + ascending: ascending, } copy(m.iterators, iterators) @@ -61,10 +66,18 @@ func NewMergingIterator(iterators ...dbm.Iterator) (dbm.Iterator, error) { } m.start, m.end = mergeDomain(m.iterators) - m.findMin() + m.findNext() return m, nil } +func (m *mergingIterator) findNext() { + if m.ascending { + m.findMin() + } else { + m.findMax() + } +} + // findMin sets nextIteratorIndex to the valid child with the smallest current // key, breaking ties toward the highest index. Child errors are checked here // and cached on the merge iterator via fail. @@ -99,6 +112,39 @@ func (m *mergingIterator) findMin() { } } +// findMax sets nextIteratorIndex to the valid child with the largest current +// key, breaking ties toward the highest index. +func (m *mergingIterator) findMax() { + if m.err != nil { + return + } + m.nextIteratorIndex = -1 + var largestKey []byte + for i, child := range m.iterators { + if child == nil { + continue + } + if err := child.Error(); err != nil { + m.fail(err) + return + } + if !child.Valid() { + continue + } + childKey := child.Key() + if m.nextIteratorIndex < 0 { + m.nextIteratorIndex = i + largestKey = bytes.Clone(childKey) + continue + } + cmp := bytes.Compare(childKey, largestKey) + if cmp > 0 || (cmp == 0 && i > m.nextIteratorIndex) { + m.nextIteratorIndex = i + largestKey = bytes.Clone(childKey) + } + } +} + // advanceChildrenAtKey advances every child positioned at key past that key. func (m *mergingIterator) advanceChildrenAtKey(key []byte) { for _, child := range m.iterators { @@ -223,7 +269,7 @@ func (m *mergingIterator) Next() { if m.err != nil { return } - m.findMin() + m.findNext() } func (m *mergingIterator) Valid() bool { diff --git a/sei-db/common/iterators/merging_iterator_test.go b/sei-db/common/iterators/merging_iterator_test.go index 2a1a73e364..47688849b7 100644 --- a/sei-db/common/iterators/merging_iterator_test.go +++ b/sei-db/common/iterators/merging_iterator_test.go @@ -48,12 +48,12 @@ func collect(t *testing.T, it dbm.Iterator) [][2][]byte { } func TestNewMergingIterator_NilIterator(t *testing.T) { - _, err := iterators.NewMergingIterator(memIter(t, []byte("a")), nil) + _, err := iterators.NewMergingIterator(true, memIter(t, []byte("a")), nil) require.Error(t, err) } func TestNewMergingIterator_Empty(t *testing.T) { - it, err := iterators.NewMergingIterator() + it, err := iterators.NewMergingIterator(true) require.NoError(t, err) require.False(t, it.Valid()) require.Nil(t, it.Key()) @@ -63,7 +63,7 @@ func TestNewMergingIterator_Empty(t *testing.T) { func TestMergingIterator_Single(t *testing.T) { child := memIter(t, []byte("b"), []byte("c")) - it, err := iterators.NewMergingIterator(child) + it, err := iterators.NewMergingIterator(true, child) require.NoError(t, err) defer it.Close() @@ -77,7 +77,7 @@ func TestMergingIterator_Single(t *testing.T) { func TestMergingIterator_LexOrder(t *testing.T) { a := memIter(t, []byte("a"), []byte("d")) b := memIter(t, []byte("b"), []byte("c"), []byte("e")) - it, err := iterators.NewMergingIterator(a, b) + it, err := iterators.NewMergingIterator(true, a, b) require.NoError(t, err) defer it.Close() @@ -93,7 +93,7 @@ func TestMergingIterator_LexOrder(t *testing.T) { func TestMergingIterator_DuplicateKeys(t *testing.T) { left := memIterKV(t, [2][]byte{[]byte("k"), []byte("v0")}, [2][]byte{[]byte("z"), []byte("z0")}) right := memIterKV(t, [2][]byte{[]byte("k"), []byte("v1")}, [2][]byte{[]byte("m"), []byte("m1")}) - it, err := iterators.NewMergingIterator(left, right) + it, err := iterators.NewMergingIterator(true, left, right) require.NoError(t, err) defer it.Close() @@ -109,7 +109,7 @@ func TestMergingIterator_RightmostWinsOnDuplicateKey(t *testing.T) { child0 := memIterKV(t, [2][]byte{[]byte("k"), []byte("v0")}, [2][]byte{[]byte("a"), []byte("a0")}) child1 := memIter(t, []byte("b")) child2 := memIterKV(t, [2][]byte{[]byte("k"), []byte("v2")}, [2][]byte{[]byte("c"), []byte("c0")}) - it, err := iterators.NewMergingIterator(child0, child1, child2) + it, err := iterators.NewMergingIterator(true, child0, child1, child2) require.NoError(t, err) defer it.Close() @@ -129,7 +129,7 @@ func TestMergingIterator_Domain(t *testing.T) { it2, err := db.Iterator([]byte("a"), nil) require.NoError(t, err) - merged, err := iterators.NewMergingIterator(it1, it2) + merged, err := iterators.NewMergingIterator(true, it1, it2) require.NoError(t, err) defer merged.Close() @@ -174,7 +174,7 @@ func (child *errOnSecondNextIterator) Close() error { func TestMergingIterator_CachesChildError(t *testing.T) { ok := memIter(t, []byte("a"), []byte("c")) bad := &errOnSecondNextIterator{Iterator: memIter(t, []byte("b"), []byte("d"))} - merged, err := iterators.NewMergingIterator(ok, bad) + merged, err := iterators.NewMergingIterator(true, ok, bad) require.NoError(t, err) require.True(t, merged.Valid()) @@ -244,7 +244,7 @@ func TestMergingIterator_DuplicateKeys_SharedKeyBuffer(t *testing.T) { keys: [][]byte{[]byte("k"), []byte("m")}, values: [][]byte{[]byte("v1"), []byte("m1")}, } - it, err := iterators.NewMergingIterator(left, right) + it, err := iterators.NewMergingIterator(true, left, right) require.NoError(t, err) defer it.Close() @@ -270,7 +270,7 @@ func TestMergingIterator_SharedKeyBuffer_DifferentInitialKeys(t *testing.T) { keys: [][]byte{[]byte("z")}, values: [][]byte{[]byte("z1")}, } - it, err := iterators.NewMergingIterator(left, right) + it, err := iterators.NewMergingIterator(true, left, right) require.NoError(t, err) defer it.Close() @@ -283,7 +283,7 @@ func TestMergingIterator_SharedKeyBuffer_DifferentInitialKeys(t *testing.T) { func TestMergingIterator_ClosesChildren(t *testing.T) { child := &closeTrackingIterator{Iterator: memIter(t, []byte("x"))} - it, err := iterators.NewMergingIterator(child) + it, err := iterators.NewMergingIterator(true, child) require.NoError(t, err) require.NoError(t, it.Close()) require.True(t, child.closed) diff --git a/sei-db/common/iterators/mapping_iterator.go b/sei-db/common/iterators/transforming_iterator.go similarity index 56% rename from sei-db/common/iterators/mapping_iterator.go rename to sei-db/common/iterators/transforming_iterator.go index a3415bfbaf..7021220b3e 100644 --- a/sei-db/common/iterators/mapping_iterator.go +++ b/sei-db/common/iterators/transforming_iterator.go @@ -6,10 +6,10 @@ import ( dbm "github.com/tendermint/tm-db" ) -var _ dbm.Iterator = (*mappingIterator)(nil) +var _ dbm.Iterator = (*transformingIterator)(nil) -// A function used to remap key/value pairs returned by an iterator. -type IteratorRemapper func( +// A function used to transform key/value pairs returned by an iterator. +type IteratorTransform func( // The input key. inputKey []byte, // The input value. @@ -22,42 +22,46 @@ type IteratorRemapper func( // Whether to skip the current key/value pair. If true, the iterator will // not emit this key/value pair. skip bool, - // An error to return if the remapping fails (e.g. parsing failure) + // An error to return if the transform fails (e.g. parsing failure) err error, ) -// mappingIterator applies a remapper to each key/value pair from a parent +// transformingIterator applies a transform to each key/value pair from a parent // iterator, optionally skipping entries. -type mappingIterator struct { - // The parent iterator to remap. +type transformingIterator struct { + // The parent iterator to transform. parent dbm.Iterator - // The function used to remap key/value pairs. - remapper IteratorRemapper + // The function used to transform key/value pairs. + transform IteratorTransform // The next key/value pair to emit. key []byte // The next value to emit. value []byte + // valid reports whether key/value hold a current entry to emit. Tracked + // explicitly rather than inferred from key != nil so a transform may + // legitimately emit a nil/empty key without terminating iteration. + valid bool // The error encountered by the iterator, if any. err error } -// NewMappingIterator returns an iterator that emits remapped key/value pairs -// from parent, skipping pairs for which remapper returns skip=true. -func NewMappingIterator(parent dbm.Iterator, remapper IteratorRemapper) (dbm.Iterator, error) { +// NewTransformingIterator returns an iterator that emits transformed key/value pairs +// from parent, skipping pairs for which transform returns skip=true. +func NewTransformingIterator(parent dbm.Iterator, transform IteratorTransform) (dbm.Iterator, error) { if parent == nil { return nil, fmt.Errorf("nil parent iterator") } - if remapper == nil { + if transform == nil { _ = parent.Close() - return nil, fmt.Errorf("nil remapper") + return nil, fmt.Errorf("nil transform") } if err := parent.Error(); err != nil { _ = parent.Close() return nil, fmt.Errorf("parent iterator error: %w", err) } - m := &mappingIterator{ - parent: parent, - remapper: remapper, + m := &transformingIterator{ + parent: parent, + transform: transform, } m.advance() if err := m.Error(); err != nil { @@ -69,9 +73,10 @@ func NewMappingIterator(parent dbm.Iterator, remapper IteratorRemapper) (dbm.Ite // advance moves to the next non-skipped parent entry, or clears the position if // none remain. -func (m *mappingIterator) advance() { +func (m *transformingIterator) advance() { m.key = nil m.value = nil + m.valid = false if m.parent == nil { return } @@ -82,7 +87,7 @@ func (m *mappingIterator) advance() { } inputKey := m.parent.Key() inputValue := m.parent.Value() - outputKey, outputValue, skip, err := m.remapper(inputKey, inputValue) + outputKey, outputValue, skip, err := m.transform(inputKey, inputValue) if err != nil { m.fail(err) return @@ -90,6 +95,7 @@ func (m *mappingIterator) advance() { if !skip { m.key = outputKey m.value = outputValue + m.valid = true return } m.parent.Next() @@ -101,20 +107,21 @@ func (m *mappingIterator) advance() { // fail records the first error, closes the parent, and clears it so no further // parent methods are invoked. -func (m *mappingIterator) fail(err error) { +func (m *transformingIterator) fail(err error) { if m.err != nil { return } m.err = err m.key = nil m.value = nil + m.valid = false if m.parent != nil { _ = m.parent.Close() m.parent = nil } } -func (m *mappingIterator) Close() error { +func (m *transformingIterator) Close() error { if m.parent == nil { return nil } @@ -122,28 +129,29 @@ func (m *mappingIterator) Close() error { m.parent = nil m.key = nil m.value = nil + m.valid = false return err } -func (m *mappingIterator) Domain() ([]byte, []byte) { +func (m *transformingIterator) Domain() ([]byte, []byte) { if m.parent == nil { return nil, nil } return m.parent.Domain() } -func (m *mappingIterator) Error() error { +func (m *transformingIterator) Error() error { return m.err } -func (m *mappingIterator) Key() []byte { +func (m *transformingIterator) Key() []byte { if !m.Valid() { return nil } return m.key } -func (m *mappingIterator) Next() { +func (m *transformingIterator) Next() { if !m.Valid() { return } @@ -155,11 +163,11 @@ func (m *mappingIterator) Next() { m.advance() } -func (m *mappingIterator) Valid() bool { - return m.err == nil && m.key != nil +func (m *transformingIterator) Valid() bool { + return m.err == nil && m.valid } -func (m *mappingIterator) Value() []byte { +func (m *transformingIterator) Value() []byte { if !m.Valid() { return nil } diff --git a/sei-db/common/iterators/mapping_iterator_test.go b/sei-db/common/iterators/transforming_iterator_test.go similarity index 54% rename from sei-db/common/iterators/mapping_iterator_test.go rename to sei-db/common/iterators/transforming_iterator_test.go index 6be609392f..aad9f1c9b5 100644 --- a/sei-db/common/iterators/mapping_iterator_test.go +++ b/sei-db/common/iterators/transforming_iterator_test.go @@ -10,11 +10,11 @@ import ( dbm "github.com/tendermint/tm-db" ) -var errRemap = errors.New("remap failed") +var errTransform = errors.New("transform failed") -func TestMappingIterator_SkipsKeys(t *testing.T) { +func TestTransformingIterator_SkipsKeys(t *testing.T) { parent := memIter(t, []byte("a"), []byte("b"), []byte("c")) - mapIter, err := iterators.NewMappingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { + transformIter, err := iterators.NewTransformingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { if key[0] == 'b' { return nil, nil, true, nil } @@ -22,51 +22,51 @@ func TestMappingIterator_SkipsKeys(t *testing.T) { }) require.NoError(t, err) - got := collect(t, mapIter) + got := collect(t, transformIter) require.Equal(t, [][2][]byte{ {[]byte("a"), []byte("a")}, {[]byte("c"), []byte("c")}, }, got) } -func TestMappingIterator_RemapsKeyValue(t *testing.T) { +func TestTransformingIterator_TransformsKeyValue(t *testing.T) { parent := memIter(t, []byte("k")) - mapIter, err := iterators.NewMappingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { + transformIter, err := iterators.NewTransformingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { return append([]byte("x"), key...), append([]byte("y"), value...), false, nil }) require.NoError(t, err) - require.True(t, mapIter.Valid()) - require.Equal(t, []byte("xk"), mapIter.Key()) - require.Equal(t, []byte("ya"), mapIter.Value()) - require.NoError(t, mapIter.Close()) + require.True(t, transformIter.Valid()) + require.Equal(t, []byte("xk"), transformIter.Key()) + require.Equal(t, []byte("ya"), transformIter.Value()) + require.NoError(t, transformIter.Close()) } -func TestMappingIterator_RemapperError(t *testing.T) { +func TestTransformingIterator_TransformError(t *testing.T) { parent := memIter(t, []byte("a"), []byte("b")) - mapIter, err := iterators.NewMappingIterator(parent, func(key, _ []byte) ([]byte, []byte, bool, error) { + transformIter, err := iterators.NewTransformingIterator(parent, func(key, _ []byte) ([]byte, []byte, bool, error) { if key[0] == 'b' { - return nil, nil, false, errRemap + return nil, nil, false, errTransform } return key, key, false, nil }) require.NoError(t, err) - require.True(t, mapIter.Valid()) - require.Equal(t, []byte("a"), mapIter.Key()) - mapIter.Next() - require.False(t, mapIter.Valid()) - require.ErrorIs(t, mapIter.Error(), errRemap) + require.True(t, transformIter.Valid()) + require.Equal(t, []byte("a"), transformIter.Key()) + transformIter.Next() + require.False(t, transformIter.Valid()) + require.ErrorIs(t, transformIter.Error(), errTransform) } -func TestMappingIterator_EmptyParent(t *testing.T) { +func TestTransformingIterator_EmptyParent(t *testing.T) { parent := memIter(t) - mapIter, err := iterators.NewMappingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { + transformIter, err := iterators.NewTransformingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { return key, value, false, nil }) require.NoError(t, err) - require.False(t, mapIter.Valid()) - require.NoError(t, mapIter.Error()) + require.False(t, transformIter.Valid()) + require.NoError(t, transformIter.Error()) } var errSkipNext = errors.New("skip next failed") @@ -97,25 +97,25 @@ func (child *invalidAfterFirstNextIterator) Error() error { return child.Iterator.Error() } -func TestNewMappingIterator_ParentErrorAfterSkipNext(t *testing.T) { +func TestNewTransformingIterator_ParentErrorAfterSkipNext(t *testing.T) { // Keys must sort with the skipped key first (memDB iterates in lex order). parent := &invalidAfterFirstNextIterator{Iterator: memIter(t, []byte("_meta"), []byte("user"))} - _, err := iterators.NewMappingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { + _, err := iterators.NewTransformingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { return key, value, bytes.HasPrefix(key, []byte("_meta")), nil }) require.ErrorIs(t, err, errSkipNext) } -func TestNewMappingIterator_NilParent(t *testing.T) { - _, err := iterators.NewMappingIterator(nil, func([]byte, []byte) ([]byte, []byte, bool, error) { +func TestNewTransformingIterator_NilParent(t *testing.T) { + _, err := iterators.NewTransformingIterator(nil, func([]byte, []byte) ([]byte, []byte, bool, error) { return nil, nil, false, nil }) require.Error(t, err) } -func TestNewMappingIterator_NilRemapper(t *testing.T) { +func TestNewTransformingIterator_NilTransform(t *testing.T) { parent := memIter(t, []byte("k")) - _, err := iterators.NewMappingIterator(parent, nil) + _, err := iterators.NewTransformingIterator(parent, nil) require.Error(t, err) } @@ -130,9 +130,9 @@ func (child *errAtConstructionIterator) Error() error { return errConstruction } -func TestNewMappingIterator_ParentError(t *testing.T) { +func TestNewTransformingIterator_ParentError(t *testing.T) { parent := &errAtConstructionIterator{Iterator: memIter(t, []byte("k"))} - _, err := iterators.NewMappingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { + _, err := iterators.NewTransformingIterator(parent, func(key, value []byte) ([]byte, []byte, bool, error) { return key, value, false, nil }) require.ErrorIs(t, err, errConstruction) diff --git a/sei-db/db_engine/dbcache/unwrap.go b/sei-db/db_engine/dbcache/unwrap.go new file mode 100644 index 0000000000..9d5296fd0a --- /dev/null +++ b/sei-db/db_engine/dbcache/unwrap.go @@ -0,0 +1,14 @@ +package dbcache + +import "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + +// Unwrap returns the innermost KeyValueDB, stripping cached wrappers. +func Unwrap(db types.KeyValueDB) types.KeyValueDB { + for { + c, ok := db.(*cachedKeyValueDB) + if !ok { + return db + } + db = c.db + } +} diff --git a/sei-db/db_engine/pebbledb/iterator.go b/sei-db/db_engine/pebbledb/iterator.go index 5f2687896f..2be785639a 100644 --- a/sei-db/db_engine/pebbledb/iterator.go +++ b/sei-db/db_engine/pebbledb/iterator.go @@ -15,6 +15,7 @@ type pebbleIterator struct { it *pebble.Iterator lowerBound []byte upperBound []byte + reverse bool } func newPebbleIterator(it *pebble.Iterator, opts *types.IterOptions) *pebbleIterator { @@ -22,8 +23,13 @@ func newPebbleIterator(it *pebble.Iterator, opts *types.IterOptions) *pebbleIter if opts != nil { pi.lowerBound = opts.LowerBound pi.upperBound = opts.UpperBound + pi.reverse = opts.Reverse + } + if pi.reverse { + pi.it.Last() + } else { + pi.it.First() } - pi.it.First() return pi } @@ -39,7 +45,11 @@ func (pi *pebbleIterator) Next() { if !pi.Valid() { return } - pi.it.Next() + if pi.reverse { + pi.it.Prev() + } else { + pi.it.Next() + } } func (pi *pebbleIterator) Key() []byte { diff --git a/sei-db/db_engine/pebbledb/table_iters.go b/sei-db/db_engine/pebbledb/table_iters.go new file mode 100644 index 0000000000..1347f7c4e0 --- /dev/null +++ b/sei-db/db_engine/pebbledb/table_iters.go @@ -0,0 +1,19 @@ +package pebbledb + +import ( + "fmt" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/dbcache" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +// TableIters returns the number of open SSTable iterators for db. +// db may be wrapped in one or more cachedKeyValueDB layers. +func TableIters(db types.KeyValueDB) (int64, error) { + inner := dbcache.Unwrap(db) + p, ok := inner.(*pebbleDB) + if !ok { + return 0, fmt.Errorf("expected pebbleDB, got %T", inner) + } + return p.db.Metrics().TableIters, nil +} diff --git a/sei-db/db_engine/types/types.go b/sei-db/db_engine/types/types.go index 23cacb31f8..69abbdd976 100644 --- a/sei-db/db_engine/types/types.go +++ b/sei-db/db_engine/types/types.go @@ -16,9 +16,11 @@ type WriteOptions struct { // IterOptions controls iterator bounds. // - LowerBound is inclusive. // - UpperBound is exclusive. +// - Reverse iterates in descending key order when true (default false). type IterOptions struct { LowerBound []byte UpperBound []byte + Reverse bool } // BatchGetResult describes the result of a single key lookup within a BatchGet call. diff --git a/sei-db/state_db/sc/composite/store_test.go b/sei-db/state_db/sc/composite/store_test.go index 0c98bb5bb2..511d6d40fe 100644 --- a/sei-db/state_db/sc/composite/store_test.go +++ b/sei-db/state_db/sc/composite/store_test.go @@ -38,17 +38,20 @@ func (f *failingEVMStore) GetBlockHeightModified(string, []byte) (int64, bool, e } func (f *failingEVMStore) Has(string, []byte) bool { return false } func (f *failingEVMStore) RawGlobalIterator() (dbm.Iterator, error) { return nil, nil } -func (f *failingEVMStore) RootHash() []byte { return nil } -func (f *failingEVMStore) Version() int64 { return 0 } -func (f *failingEVMStore) GetLatestVersion() (int64, error) { return 0, nil } -func (f *failingEVMStore) WriteSnapshot(string) error { return nil } -func (f *failingEVMStore) Rollback(int64) error { return nil } -func (f *failingEVMStore) Exporter(int64) (types.Exporter, error) { return nil, nil } -func (f *failingEVMStore) Importer(int64) (types.Importer, error) { return nil, nil } -func (f *failingEVMStore) GetPhaseTimer() *metrics.PhaseTimer { return nil } -func (f *failingEVMStore) CommittedRootHash() []byte { return nil } -func (f *failingEVMStore) CleanupOrphanedReadOnlyDirs() error { return nil } -func (f *failingEVMStore) Close() error { return nil } +func (f *failingEVMStore) Iterator(string, []byte, []byte, bool) (dbm.Iterator, error) { + return nil, nil +} +func (f *failingEVMStore) RootHash() []byte { return nil } +func (f *failingEVMStore) Version() int64 { return 0 } +func (f *failingEVMStore) GetLatestVersion() (int64, error) { return 0, nil } +func (f *failingEVMStore) WriteSnapshot(string) error { return nil } +func (f *failingEVMStore) Rollback(int64) error { return nil } +func (f *failingEVMStore) Exporter(int64) (types.Exporter, error) { return nil, nil } +func (f *failingEVMStore) Importer(int64) (types.Importer, error) { return nil, nil } +func (f *failingEVMStore) GetPhaseTimer() *metrics.PhaseTimer { return nil } +func (f *failingEVMStore) CommittedRootHash() []byte { return nil } +func (f *failingEVMStore) CleanupOrphanedReadOnlyDirs() error { return nil } +func (f *failingEVMStore) Close() error { return nil } func padLeft32(val ...byte) []byte { var b [32]byte diff --git a/sei-db/state_db/sc/flatkv/api.go b/sei-db/state_db/sc/flatkv/api.go index 276cebffd8..b007894400 100644 --- a/sei-db/state_db/sc/flatkv/api.go +++ b/sei-db/state_db/sc/flatkv/api.go @@ -61,9 +61,36 @@ type Store interface { // keys across underlying data DBs, merged in global lexicographic order. // Keys are physical format: "evm/" + type_prefix_byte + stripped_key. // Pending writes are not visible. Keys and values are read-only; copy - // before modifying. Caller must Close when done. + // before modifying. + // + // The returned iterator is a stable snapshot taken at construction: it may + // be used concurrently with, and outlive, subsequent ApplyChangeSets/Commit + // calls without observing their effects. The caller must Close it when done; + // an open iterator pins Pebble sstables/memtables and holds back compaction, + // so close promptly rather than relying on it being safe to keep open. RawGlobalIterator() (dbm.Iterator, error) + // Create an iterator over a range of keys in a given store. + // + // The returned iterator is a stable snapshot taken at construction (pending + // writes are cloned and the Pebble view is pinned): it may be used + // concurrently with, and outlive, subsequent ApplyChangeSets/Commit calls + // without observing their effects. The caller must Close it when done; an + // open iterator pins Pebble resources and holds back compaction, so close + // promptly rather than relying on it being safe to keep open. + Iterator( + // The store to iterate over. + store string, + // The start key of the range to iterate over, inclusive. + // If nil, the iterator will start at the beginning of the store. + start []byte, + // The end key of the range to iterate over, exclusive. + // If nil, the iterator will iterate until the end of the store. + end []byte, + // Whether to iterate in ascending order. + ascending bool, + ) (dbm.Iterator, error) + // RootHash returns the 32-byte checksum of the working LtHash. // Note: This is the Blake3-256 digest of the underlying 2048-byte // raw LtHash vector. diff --git a/sei-db/state_db/sc/flatkv/snapshot.go b/sei-db/state_db/sc/flatkv/snapshot.go index b7e9b9fe21..1c2149c100 100644 --- a/sei-db/state_db/sc/flatkv/snapshot.go +++ b/sei-db/state_db/sc/flatkv/snapshot.go @@ -433,6 +433,11 @@ func (s *CommitStore) migrateFlatLayout(flatkvDir string) (string, error) { // The snapshot is written into a versioned subdirectory under the flatkv root // (e.g. flatkv/snapshot-00000000000000000100) and the current symlink is updated. // The dir parameter is ignored; snapshots are always stored alongside the live data. +// +// Concurrency: this MUST NOT acquire s.mu. Commit calls it while already holding +// the write lock (s.mu is not reentrant), and as a lifecycle operation it is +// otherwise expected to be serialized by the caller. It only reads committed +// state and checkpoints the DBs; it does not touch the pending-writes maps. func (s *CommitStore) WriteSnapshot(_ string) (err error) { var pruned int obs := s.observeOp("snapshot", otelMetrics.SnapshotWriteLatency, diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index c59a690ada..f70027c551 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "runtime" + "sync" "time" "github.com/zbiljic/go-filelock" @@ -91,8 +92,26 @@ func applyPebbleMetricsConfig(c *config.Config) { } // CommitStore implements flatkv.Store for EVM state storage. -// NOT thread-safe; callers must serialize all operations. +// +// Concurrency: writes (ApplyChangeSets, Commit) and the reads that touch the +// pending-writes maps (Get, Has, GetBlockHeightModified) and iterator +// construction (Iterator, RawGlobalIterator) are guarded by mu. Iterators +// snapshot their data at construction time (pending writes are cloned and the +// Pebble view is pinned), so once built they may be used and Closed without +// holding mu and may safely outlive a subsequent ApplyChangeSets/Commit. All +// other lifecycle operations (LoadVersion, Rollback, snapshot/import/export, +// Close) must still be serialized by the caller. type CommitStore struct { + // mu guards the pending-writes maps against concurrent iterator + // construction / reads while ApplyChangeSets and Commit mutate them. + // + // TODO(concurrency): this is a coarse lock taken at the exported entry + // points. Commit in particular holds the write lock across its WAL fsync + // and periodic auto-snapshot. That is acceptable while commits are not + // pipelined with reads; revisit with a finer-grained scheme (guarding only + // the in-memory maps) if/when pipelining is introduced. + mu sync.RWMutex + ctx context.Context cancel context.CancelFunc config config.Config diff --git a/sei-db/state_db/sc/flatkv/store_apply.go b/sei-db/state_db/sc/flatkv/store_apply.go index cd8967628f..a653ff1342 100644 --- a/sei-db/state_db/sc/flatkv/store_apply.go +++ b/sei-db/state_db/sc/flatkv/store_apply.go @@ -22,6 +22,12 @@ func (s *CommitStore) ApplyChangeSets(changeSets []*proto.NamedChangeSet) (err e return errReadOnly } + // Hold the write lock for the whole body: it both reads + // (batchReadOldValues) and mutates (maps.Copy) the pending-writes maps, + // which iterator construction and Get read under a read lock. + s.mu.Lock() + defer s.mu.Unlock() + /////////// // Setup // /////////// diff --git a/sei-db/state_db/sc/flatkv/store_iteration.go b/sei-db/state_db/sc/flatkv/store_iteration.go new file mode 100644 index 0000000000..852fb528dc --- /dev/null +++ b/sei-db/state_db/sc/flatkv/store_iteration.go @@ -0,0 +1,714 @@ +package flatkv + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/sei-protocol/sei-chain/sei-db/common/iterators" + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + seidbtypes "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/ktype" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/vtype" + dbm "github.com/tendermint/tm-db" +) + +// RawGlobalIterator returns an iterator over all committed keys across the +// data DBs (account, code, storage, legacy), merged in global lexicographic +// order. Within each DB, keys are in Pebble order. Per-DB _meta/* keys are +// skipped. Pending writes are not visible. metadataDB is not included. +func (s *CommitStore) RawGlobalIterator() (dbm.Iterator, error) { + // Read lock for the construction span: the returned iterator pins a Pebble + // view and may then outlive a concurrent ApplyChangeSets/Commit. + s.mu.RLock() + defer s.mu.RUnlock() + + dbs := s.dataDBs() + children := make([]dbm.Iterator, 0, len(dbs)) + for _, db := range dbs { + pebbleIter, err := db.NewIter(nil) + if err != nil { + closeIterators(children) + return nil, fmt.Errorf("open data DB iterator: %w", err) + } + transformed, err := iterators.NewTransformingIterator(pebbleIter, skipMetaKeys) + if err != nil { + closeIterators(children) + return nil, err + } + children = append(children, transformed) + } + merged, err := iterators.NewMergingIterator(true, children...) + if err != nil { + closeIterators(children) + return nil, err + } + if err := merged.Error(); err != nil { + _ = merged.Close() + return nil, err + } + return merged, nil +} + +func (s *CommitStore) Iterator(store string, start []byte, end []byte, ascending bool) (dbm.Iterator, error) { + if store == "" { + return nil, fmt.Errorf("store name cannot be empty") + } + + // Read lock for the construction span: buildEvmIterator/buildLegacyDBLane + // snapshot the pending-writes maps and pin the Pebble view here, so the + // returned iterator may safely outlive a concurrent ApplyChangeSets/Commit. + s.mu.RLock() + defer s.mu.RUnlock() + + var iter dbm.Iterator + var err error + if store == keys.EVMStoreKey { + iter, err = s.buildEvmIterator(start, end, ascending) + } else { + lowerBound, upperBound := moduleIteratorBounds(store, start, end) + iter, err = s.buildLegacyDBLane(store, lowerBound, upperBound, ascending) + } + if err != nil { + return nil, err + } + // The underlying lane/merge/transform iterators report physical Pebble + // bounds from Domain(); present the caller's logical [start, end) instead. + return iterators.NewDomainIterator(iter, start, end) +} + +/* Data flow: buildEvmIterator + +buildCodeLane ──────────────┐ +buildStorageLane ───────────┤ +buildLegacyDBLane (evm/) ───┼──► merge iterator ──► memiavl keys + values +buildAccountNonceLane ──────┤ +buildAccountCodehashLane ───┘ + +* balance not iterated — not stored in FlatKV yet +*/ + +func (s *CommitStore) buildEvmIterator( + start []byte, + end []byte, + ascending bool, +) (dbm.Iterator, error) { + lanes := make([]dbm.Iterator, 0, 5) + + // Each optimized lane scans its own physical keyspace and re-labels rows to + // a logical key. The codehash lane is the only one whose logical type byte + // (0x08) differs from the physical byte it scans (account rows live under + // 0x0a), so its bounds must be translated against the account keyspace. + for _, laneSpec := range s.evmLaneSpecs() { + lower, upper, empty, err := laneSpec.bounds(start, end) + if err != nil { + closeIterators(lanes) + return nil, err + } + if empty { + continue + } + lane, err := laneSpec.build(lower, upper, ascending) + if err != nil { + closeIterators(lanes) + return nil, err + } + lanes = append(lanes, lane) + } + + // Legacy is the identity-mapped catch-all (no single type prefix), so it uses + // the whole-range translation and is always built. + legacyLower, legacyUpper := moduleIteratorBounds(keys.EVMStoreKey, start, end) + legacyLane, err := s.buildLegacyDBLane(keys.EVMStoreKey, legacyLower, legacyUpper, ascending) + if err != nil { + closeIterators(lanes) + return nil, err + } + lanes = append(lanes, legacyLane) + + // TODO: once we move account balances to FlatKV, we need to add a lane for them here. + + merged, err := iterators.NewMergingIterator(ascending, lanes...) + if err != nil { + closeIterators(lanes) + return nil, fmt.Errorf("failed to create EVM merge iterator: %w", err) + } + return merged, nil +} + +// evmLaneSpec describes one EVM iterator lane. +type evmLaneSpec struct { + // logical is the type byte callers query with. + logical keys.EVMKeyKind + // physical is the type byte the lane's rows are stored under; equal to + // logical for every lane except codehash, whose rows live in the account DB + // under 0x0a. + physical keys.EVMKeyKind + // build constructs the iterator that scans the lane's physical keyspace. + build func(lower []byte, upper []byte, ascending bool) (dbm.Iterator, error) +} + +// bounds resolves the lane's logical and physical type bytes and translates the +// caller's logical [start,end) into this lane's physical [lower,upper) via +// evmLaneBounds. empty is true when the lane's span is disjoint from [start,end) +// and the lane should be skipped. +func (sp evmLaneSpec) bounds(start []byte, end []byte) (lower []byte, upper []byte, empty bool, err error) { + // the logical prefix, i.e. the prefix from the perspective of the external caller + logicalByte, ok := keys.EVMKeyPrefixByte(sp.logical) + if !ok { + return nil, nil, false, fmt.Errorf("no prefix byte for EVM key kind %v", sp.logical) + } + // the physical type byte the rows are stored under in the low level DB; + // the full physical prefix is the module name "evm/" followed by this byte + physByte, ok := keys.EVMKeyPrefixByte(sp.physical) + if !ok { + return nil, nil, false, fmt.Errorf("no prefix byte for EVM key kind %v", sp.physical) + } + lower, upper, empty = evmLaneBounds(start, end, logicalByte, physByte) + return lower, upper, empty, nil +} + +// evmLaneSpecs returns the optimized lanes, in no particular order (the merging +// iterator orders the combined output). The legacy catch-all lane is handled +// separately because it has no single type prefix. +func (s *CommitStore) evmLaneSpecs() []evmLaneSpec { + return []evmLaneSpec{ + {keys.EVMKeyStorage, keys.EVMKeyStorage, s.buildStorageLane}, + {keys.EVMKeyCode, keys.EVMKeyCode, s.buildCodeLane}, + {keys.EVMKeyCodeHash, ktype.EVMKeyAccount, s.buildAccountCodehashLane}, + {keys.EVMKeyNonce, ktype.EVMKeyAccount, s.buildAccountNonceLane}, + } +} + +// evmLaneBounds maps the caller's logical [start,end) range to the physical +// [lower,upper) range for a single EVM lane. Physical keys are +// "evm/" + physByte + suffix while logical keys are logicalPrefix + suffix, so +// the suffix and intra-lane ordering are preserved: translating the clamped +// logical bounds yields physical bounds that select exactly the in-range rows. +func evmLaneBounds( + // start is the inclusive lower bound of the caller's logical range; nil means unbounded. + start []byte, + // end is the exclusive upper bound of the caller's logical range; nil means unbounded. + end []byte, + // logicalPrefix is the lane's logical type byte (the prefix callers use, e.g. 0x08 for codehash). + logicalPrefix byte, + // physByte is the physical type byte the rows are stored under. It equals logicalPrefix for every + // lane except codehash, whose rows live in the account DB under 0x0a. + physByte byte, +) ( + // lower is the physical inclusive lower bound for the lane. + lower []byte, + // upper is the physical exclusive upper bound for the lane. + upper []byte, + // empty is true when [start,end) is disjoint from the lane's span, so the lane should be skipped. + empty bool, +) { + lp := []byte{logicalPrefix} + lpEnd := ktype.PrefixEnd(lp) + + lo := lp + if start != nil && bytes.Compare(start, lp) > 0 { + lo = start + } + hi := lpEnd + if end != nil && bytes.Compare(end, lpEnd) < 0 { + hi = end + } + if bytes.Compare(lo, hi) >= 0 { + return nil, nil, true + } + + physPrefix := ktype.ModulePhysicalKey(keys.EVMStoreKey, []byte{physByte}) + lower = ktype.ModulePhysicalKey(keys.EVMStoreKey, append([]byte{physByte}, lo[1:]...)) + if bytes.Equal(hi, lpEnd) { + upper = ktype.PrefixEnd(physPrefix) + } else { + upper = ktype.ModulePhysicalKey(keys.EVMStoreKey, append([]byte{physByte}, hi[1:]...)) + } + return lower, upper, false +} + +// moduleIteratorBounds translates caller logical [start, end) keys into physical +// bounds for iterating a module-prefixed keyspace in the data DBs. +func moduleIteratorBounds(store string, start, end []byte) (lowerBound, upperBound []byte) { + modulePrefix := ktype.ModulePhysicalKey(store, nil) + lowerBound = modulePrefix + if start != nil { + lowerBound = ktype.ModulePhysicalKey(store, start) + } + if end != nil { + upperBound = ktype.ModulePhysicalKey(store, end) + } else { + upperBound = ktype.PrefixEnd(modulePrefix) + } + return lowerBound, upperBound +} + +/* Data flow: buildLegacyDBLane + + ┌────────────────────────┐ ┌───────────────────┐ + │ legacyWrites (pending) │ │ legacyDB (pebble) │ + └────────────────────────┘ └───────────────────┘ + │ │ + ▼ ▼ + ┌──────────────┐ ┌─────────────────┐ + │ map iterator │ │ pebble iterator │ + └──────────────┘ └─────────────────┘ + │ │ + └──────┐ ┌────────────────┘ + │ │ + ▼ ▼ + ┌────────────────┐ + │ merge iterator │ pending writes "win" + └────────────────┘ + │ + physical key + serialized LegacyData + includes deleted values + │ + ▼ + ┌────────────────────┐ + │ transform iterator │ + └────────────────────┘ + │ + logical module key + raw value bytes + excludes deleted values + │ + ▼ +*/ + +func (s *CommitStore) buildLegacyDBLane( + store string, + lowerBound, upperBound []byte, + ascending bool, +) (dbm.Iterator, error) { + legacySerializer := func(v *vtype.LegacyData) ([]byte, error) { + if v == nil || v.IsDelete() { + return nil, nil + } + return v.Serialize(), nil + } + pendingDataIterator, err := iterators.NewMapIterator( + lowerBound, upperBound, ascending, legacySerializer, s.legacyWrites) + if err != nil { + return nil, fmt.Errorf("failed to create pending legacy iterator: %w", err) + } + + pebbleIterator, err := s.legacyDB.NewIter(&seidbtypes.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + Reverse: !ascending, + }) + if err != nil { + _ = pendingDataIterator.Close() + return nil, fmt.Errorf("failed to create legacy pebble iterator: %w", err) + } + + mergingIterator, err := iterators.NewMergingIterator(ascending, pebbleIterator, pendingDataIterator) + if err != nil { + _ = pendingDataIterator.Close() + _ = pebbleIterator.Close() + return nil, fmt.Errorf("failed to create legacy merge iterator: %w", err) + } + + transform := func(key []byte, value []byte) ([]byte, []byte, bool, error) { + if len(value) == 0 { + return nil, nil, true, nil + } + moduleName, logicalKey, err := ktype.StripModulePrefix(key) + if err != nil { + return nil, nil, false, err + } + if moduleName != store { + return nil, nil, false, fmt.Errorf( + "legacy iterator key %q has module %q, expected %q", + key, moduleName, store, + ) + } + ld, err := vtype.DeserializeLegacyData(value) + if err != nil { + return nil, nil, false, err + } + if ld.IsDelete() { + return nil, nil, true, nil + } + return logicalKey, ld.GetValue(), false, nil + } + transformedIterator, err := iterators.NewTransformingIterator(mergingIterator, transform) + if err != nil { + _ = mergingIterator.Close() + return nil, fmt.Errorf("failed to create legacy transform iterator: %w", err) + } + return transformedIterator, nil +} + +/* Data flow: buildCodeLane + + ┌─────────────────────┐ ┌────────────────┐ + │ codeWrites (pending)│ │ codeDB (pebble)│ + └─────────────────────┘ └────────────────┘ + │ │ + ▼ ▼ + ┌──────────────┐ ┌─────────────────┐ + │ map iterator │ │ pebble iterator │ + └──────────────┘ └─────────────────┘ + │ │ + └──────┐ ┌────────────┘ + │ │ + ▼ ▼ + ┌────────────────┐ + │ merge iterator │ pending writes "win" + └────────────────┘ + │ + physical key + serialized CodeData + includes deleted values + │ + ▼ + ┌────────────────────┐ + │ transform iterator │ + └────────────────────┘ + │ + 0x07‖addr + bytecode + excludes deleted values + │ + ▼ +*/ + +func (s *CommitStore) buildCodeLane( + lowerBound, upperBound []byte, + ascending bool, +) (dbm.Iterator, error) { + codeSerializer := func(v *vtype.CodeData) ([]byte, error) { + if v == nil { + return nil, nil + } + return v.Serialize(), nil + } + pendingDataIterator, err := iterators.NewMapIterator( + lowerBound, upperBound, ascending, codeSerializer, s.codeWrites) + if err != nil { + return nil, fmt.Errorf("failed to create pending code iterator: %w", err) + } + + pebbleIterator, err := s.codeDB.NewIter(&seidbtypes.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + Reverse: !ascending, + }) + if err != nil { + _ = pendingDataIterator.Close() + return nil, fmt.Errorf("failed to create code pebble iterator: %w", err) + } + + mergingIterator, err := iterators.NewMergingIterator(ascending, pebbleIterator, pendingDataIterator) + if err != nil { + _ = pendingDataIterator.Close() + _ = pebbleIterator.Close() + return nil, fmt.Errorf("failed to create code merge iterator: %w", err) + } + + transform := func(key []byte, value []byte) ([]byte, []byte, bool, error) { + _, strippedKey, err := ktype.StripEVMPhysicalKey(key) + if err != nil { + return nil, nil, false, err + } + cd, err := vtype.DeserializeCodeData(value) + if err != nil { + return nil, nil, false, err + } + if cd.IsDelete() { + return nil, nil, true, nil + } + return keys.BuildEVMKey(keys.EVMKeyCode, strippedKey), cd.GetBytecode(), false, nil + } + transformedIterator, err := iterators.NewTransformingIterator(mergingIterator, transform) + if err != nil { + _ = mergingIterator.Close() + return nil, fmt.Errorf("failed to create code transform iterator: %w", err) + } + return transformedIterator, nil +} + +/* Data flow: buildStorageLane + + ┌─────────────────────────┐ ┌────────────────────┐ + │ storageWrites (pending) │ │ storageDB (pebble) │ + └─────────────────────────┘ └────────────────────┘ + │ │ + ▼ ▼ + ┌──────────────┐ ┌─────────────────┐ + │ map iterator │ │ pebble iterator │ + └──────────────┘ └─────────────────┘ + │ │ + └──────┐ ┌────────────────┘ + │ │ + ▼ ▼ + ┌────────────────┐ + │ merge iterator │ pending writes "win" + └────────────────┘ + │ + physical key + serialized StorageData + includes deleted values + │ + ▼ + ┌────────────────────┐ + │ transform iterator │ + └────────────────────┘ + │ + 0x03‖addr‖slot + 32-byte value + excludes deleted values + │ + ▼ +*/ + +func (s *CommitStore) buildStorageLane( + lowerBound, upperBound []byte, + ascending bool, +) (dbm.Iterator, error) { + storageSerializer := func(v *vtype.StorageData) ([]byte, error) { + if v == nil { + return nil, nil + } + return v.Serialize(), nil + } + pendingDataIterator, err := iterators.NewMapIterator( + lowerBound, upperBound, ascending, storageSerializer, s.storageWrites) + if err != nil { + return nil, fmt.Errorf("failed to create pending storage iterator: %w", err) + } + + pebbleIterator, err := s.storageDB.NewIter(&seidbtypes.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + Reverse: !ascending, + }) + if err != nil { + _ = pendingDataIterator.Close() + return nil, fmt.Errorf("failed to create storage pebble iterator: %w", err) + } + + mergingIterator, err := iterators.NewMergingIterator(ascending, pebbleIterator, pendingDataIterator) + if err != nil { + _ = pendingDataIterator.Close() + _ = pebbleIterator.Close() + return nil, fmt.Errorf("failed to create storage merge iterator: %w", err) + } + + transform := func(key []byte, value []byte) ([]byte, []byte, bool, error) { + _, strippedKey, err := ktype.StripEVMPhysicalKey(key) + if err != nil { + return nil, nil, false, err + } + sd, err := vtype.DeserializeStorageData(value) + if err != nil { + return nil, nil, false, err + } + if sd.IsDelete() { + return nil, nil, true, nil + } + return keys.BuildEVMKey(keys.EVMKeyStorage, strippedKey), sd.GetValue()[:], false, nil + } + transformedIterator, err := iterators.NewTransformingIterator(mergingIterator, transform) + if err != nil { + _ = mergingIterator.Close() + return nil, fmt.Errorf("failed to create storage transform iterator: %w", err) + } + return transformedIterator, nil +} + +/* Data flow: buildAccountNonceLane + + Same accountWrites + accountDB as buildAccountCodehashLane (one pending map, one DB). + + ┌─────────────────────────┐ ┌────────────────────┐ + │ accountWrites (pending) │ │ accountDB (pebble) │ + └─────────────────────────┘ └────────────────────┘ + │ │ + ▼ ▼ + ┌──────────────┐ ┌─────────────────┐ + │ map iterator │ │ pebble iterator │ + └──────────────┘ └─────────────────┘ + │ │ + └──────┐ ┌────────────────┘ + │ │ + ▼ ▼ + ┌────────────────┐ + │ merge iterator │ pending writes "win" + └────────────────┘ + │ + physical key + serialized AccountData + includes deleted values + │ + ▼ + ┌────────────────────┐ + │ transform iterator │ + └────────────────────┘ + │ + 0x0a‖addr + 8-byte nonce + excludes deleted values + │ + ▼ +*/ + +func (s *CommitStore) buildAccountNonceLane( + lowerBound, upperBound []byte, + ascending bool, +) (dbm.Iterator, error) { + accountSerializer := func(v *vtype.AccountData) ([]byte, error) { + if v == nil { + return nil, nil + } + return v.Serialize(), nil + } + pendingDataIterator, err := iterators.NewMapIterator( + lowerBound, upperBound, ascending, accountSerializer, s.accountWrites) + if err != nil { + return nil, fmt.Errorf("failed to create pending account iterator: %w", err) + } + + pebbleIterator, err := s.accountDB.NewIter(&seidbtypes.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + Reverse: !ascending, + }) + if err != nil { + _ = pendingDataIterator.Close() + return nil, fmt.Errorf("failed to create account pebble iterator: %w", err) + } + + mergingIterator, err := iterators.NewMergingIterator(ascending, pebbleIterator, pendingDataIterator) + if err != nil { + _ = pendingDataIterator.Close() + _ = pebbleIterator.Close() + return nil, fmt.Errorf("failed to create account merge iterator: %w", err) + } + + transform := func(key []byte, value []byte) ([]byte, []byte, bool, error) { + _, addrBytes, err := ktype.StripEVMPhysicalKey(key) + if err != nil { + return nil, nil, false, err + } + ad, err := vtype.DeserializeAccountData(value) + if err != nil { + return nil, nil, false, err + } + if ad.IsDelete() { + return nil, nil, true, nil + } + nonceBytes := make([]byte, vtype.NonceLen) + binary.BigEndian.PutUint64(nonceBytes, ad.GetNonce()) + return keys.BuildEVMKey(keys.EVMKeyNonce, addrBytes), nonceBytes, false, nil + } + transformedIterator, err := iterators.NewTransformingIterator(mergingIterator, transform) + if err != nil { + _ = mergingIterator.Close() + return nil, fmt.Errorf("failed to create account nonce transform iterator: %w", err) + } + return transformedIterator, nil +} + +/* Data flow: buildAccountCodehashLane + + Same accountWrites + accountDB as buildAccountNonceLane (one pending map, one DB). + + ┌─────────────────────────┐ ┌────────────────────┐ + │ accountWrites (pending) │ │ accountDB (pebble) │ + └─────────────────────────┘ └────────────────────┘ + │ │ + ▼ ▼ + ┌──────────────┐ ┌─────────────────┐ + │ map iterator │ │ pebble iterator │ + └──────────────┘ └─────────────────┘ + │ │ + └──────┐ ┌────────────────┘ + │ │ + ▼ ▼ + ┌────────────────┐ + │ merge iterator │ pending writes "win" + └────────────────┘ + │ + physical key + serialized AccountData + includes deleted values + │ + ▼ + ┌────────────────────┐ + │ transform iterator │ + └────────────────────┘ + │ + 0x08‖addr + code hash bytes + excludes deleted values and zero hash + │ + ▼ +*/ + +func (s *CommitStore) buildAccountCodehashLane( + lowerBound, upperBound []byte, + ascending bool, +) (dbm.Iterator, error) { + accountSerializer := func(v *vtype.AccountData) ([]byte, error) { + if v == nil { + return nil, nil + } + return v.Serialize(), nil + } + pendingDataIterator, err := iterators.NewMapIterator( + lowerBound, upperBound, ascending, accountSerializer, s.accountWrites) + if err != nil { + return nil, fmt.Errorf("failed to create pending account iterator: %w", err) + } + + pebbleIterator, err := s.accountDB.NewIter(&seidbtypes.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + Reverse: !ascending, + }) + if err != nil { + _ = pendingDataIterator.Close() + return nil, fmt.Errorf("failed to create account pebble iterator: %w", err) + } + + mergingIterator, err := iterators.NewMergingIterator(ascending, pebbleIterator, pendingDataIterator) + if err != nil { + _ = pendingDataIterator.Close() + _ = pebbleIterator.Close() + return nil, fmt.Errorf("failed to create account merge iterator: %w", err) + } + + transform := func(key []byte, value []byte) ([]byte, []byte, bool, error) { + _, addrBytes, err := ktype.StripEVMPhysicalKey(key) + if err != nil { + return nil, nil, false, err + } + ad, err := vtype.DeserializeAccountData(value) + if err != nil { + return nil, nil, false, err + } + if ad.IsDelete() { + return nil, nil, true, nil + } + codeHash := ad.GetCodeHash() + var zeroCodeHash vtype.CodeHash + if *codeHash == zeroCodeHash { + return nil, nil, true, nil + } + return keys.BuildEVMKey(keys.EVMKeyCodeHash, addrBytes), codeHash[:], false, nil + } + transformedIterator, err := iterators.NewTransformingIterator(mergingIterator, transform) + if err != nil { + _ = mergingIterator.Close() + return nil, fmt.Errorf("failed to create account codehash transform iterator: %w", err) + } + return transformedIterator, nil +} + +// Used to cause the raw global iterator to skip _meta/* keys. +func skipMetaKeys(key, value []byte) ([]byte, []byte, bool, error) { + return key, value, ktype.IsMetaKey(key), nil +} + +func closeIterators(iters []dbm.Iterator) { + for _, it := range iters { + if it != nil { + _ = it.Close() + } + } +} diff --git a/sei-db/state_db/sc/flatkv/store_iteration_test.go b/sei-db/state_db/sc/flatkv/store_iteration_test.go new file mode 100644 index 0000000000..286cda6ac9 --- /dev/null +++ b/sei-db/state_db/sc/flatkv/store_iteration_test.go @@ -0,0 +1,1017 @@ +package flatkv + +import ( + "bytes" + "flag" + "fmt" + "hash/fnv" + "math/rand" + "os" + "sort" + "sync" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/pebbledb" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/ktype" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/vtype" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +// evmIterSeed overrides the deterministic seed for EVM iterator tests when non-zero. +var evmIterSeed = flag.Int64("evm-iter-seed", 0, "seed for EVM iterator fixture generation (0 = derive from test name)") + +func TestMain(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) +} + +type evmIteratorEntry struct { + Key []byte + Value []byte +} + +type evmIteratorDisposition int + +const ( + dispositionPebbleOnly evmIteratorDisposition = iota + dispositionPendingOnly + dispositionOverlap + dispositionTombstone +) + +type evmIteratorFixture struct { + Seed int64 + Store *CommitStore + Sorted []evmIteratorEntry + OverlapSamples []evmIteratorEntry + TombstonedKeys [][]byte +} + +func TestEvmIterator(t *testing.T) { + seed := iteratorTestSeed(t, "TestEvmIterator") + fixture := buildEvmIteratorFixture(t, seed) + + baseline, err := sumFlatKVTableIters(fixture.Store) + require.NoError(t, err) + t.Cleanup(func() { + current, sumErr := sumFlatKVTableIters(fixture.Store) + require.NoError(t, sumErr) + require.Equal(t, baseline, current, "leaked pebble table iterators") + require.NoError(t, fixture.Store.Close()) + }) + + storageStart := keys.StateKeyPrefix() + storageEnd := ktype.PrefixEnd(storageStart) + codeStart := []byte{0x07} + codeEnd := ktype.PrefixEnd(codeStart) + codeHashStart := []byte{0x08} + codeHashEnd := ktype.PrefixEnd(codeHashStart) + legacyStart := []byte{0x09} + legacyEnd := ktype.PrefixEnd(legacyStart) + nonceStart := []byte{0x0a} + nonceEnd := ktype.PrefixEnd(nonceStart) + midAddr := addrN(0x80) + crossSpanStart := keys.BuildEVMKey(keys.EVMKeyCodeHash, midAddr[:]) // 0x08 || addr + crossSpanEnd := keys.BuildEVMKey(keys.EVMKeyNonce, midAddr[:]) // 0x0a || addr + storageResumeStart := evmStorageKey(addrN(0x40), slotN(0x10)) // 0x03 || addr || slot + + cases := []struct { + name string + start []byte + end []byte + ascending bool + }{ + {name: "full module ascending", ascending: true}, + {name: "full module descending", ascending: false}, + {name: "storage prefix range", start: storageStart, end: storageEnd, ascending: true}, + {name: "legacy sub-range", start: legacyStart, end: legacyEnd, ascending: true}, + {name: "code prefix range", start: codeStart, end: codeEnd, ascending: true}, + {name: "codehash prefix range ascending", start: codeHashStart, end: codeHashEnd, ascending: true}, + {name: "codehash prefix range descending", start: codeHashStart, end: codeHashEnd, ascending: false}, + {name: "nonce prefix range ascending", start: nonceStart, end: nonceEnd, ascending: true}, + {name: "nonce prefix range descending", start: nonceStart, end: nonceEnd, ascending: false}, + {name: "cross span codehash to nonce ascending", start: crossSpanStart, end: crossSpanEnd, ascending: true}, + {name: "cross span codehash to nonce descending", start: crossSpanStart, end: crossSpanEnd, ascending: false}, + {name: "storage resume ascending", start: storageResumeStart, end: nil, ascending: true}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + want := subrange(fixture.Sorted, tc.start, tc.end, tc.ascending) + iter, err := fixture.Store.Iterator(keys.EVMStoreKey, tc.start, tc.end, tc.ascending) + require.NoError(t, err) + got := collectIterEntries(t, iter) + require.NoError(t, iter.Close()) + require.Equal(t, want, got) + }) + } + + t.Run("overlap keys use pending values", func(t *testing.T) { + require.NotEmpty(t, fixture.OverlapSamples) + + iter, err := fixture.Store.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, err) + defer func() { require.NoError(t, iter.Close()) }() + + got := collectIterEntries(t, iter) + for _, sample := range fixture.OverlapSamples { + entry, ok := findEntry(got, sample.Key) + require.True(t, ok, "overlap key %x missing from iterator (seed=%d)", sample.Key, fixture.Seed) + require.Equal(t, sample.Value, entry.Value, "overlap key %x (seed=%d)", sample.Key, fixture.Seed) + } + }) + + t.Run("tombstones absent", func(t *testing.T) { + require.NotEmpty(t, fixture.TombstonedKeys) + + iter, err := fixture.Store.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, err) + got := collectIterEntries(t, iter) + require.NoError(t, iter.Close()) + + for _, key := range fixture.TombstonedKeys { + for _, entry := range got { + require.False(t, bytes.Equal(entry.Key, key), + "tombstoned key %x should not appear (seed=%d)", key, fixture.Seed) + } + for _, entry := range fixture.Sorted { + require.False(t, bytes.Equal(entry.Key, key), + "tombstoned key %x should not be in expected sorted output (seed=%d)", key, fixture.Seed) + } + } + }) +} + +func TestLegacyIteratorNonEVM(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + bankNamedCS(&proto.KVPair{Key: []byte("alpha"), Value: []byte("A")}), + })) + commitAndCheck(t, s) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + bankNamedCS( + &proto.KVPair{Key: []byte("beta"), Value: []byte("B")}, + &proto.KVPair{Key: []byte("alpha"), Value: []byte("A2")}, + ), + })) + + iter, err := s.Iterator("bank", nil, nil, true) + require.NoError(t, err) + got := collectIterEntries(t, iter) + require.NoError(t, iter.Close()) + + require.Equal(t, []evmIteratorEntry{ + {Key: []byte("alpha"), Value: []byte("A2")}, + {Key: []byte("beta"), Value: []byte("B")}, + }, got) +} + +// TestEvmIteratorDomain verifies that Iterator reports the caller's logical +// [start, end) from Domain() (M3), not the underlying physical Pebble bounds. +func TestEvmIteratorDomain(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + t.Run("evm bounded", func(t *testing.T) { + start := []byte{0x07} + end := []byte{0x09} + iter, err := s.Iterator(keys.EVMStoreKey, start, end, true) + require.NoError(t, err) + defer func() { require.NoError(t, iter.Close()) }() + + gotStart, gotEnd := iter.Domain() + require.Equal(t, start, gotStart) + require.Equal(t, end, gotEnd) + }) + + t.Run("evm unbounded", func(t *testing.T) { + iter, err := s.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, err) + defer func() { require.NoError(t, iter.Close()) }() + + gotStart, gotEnd := iter.Domain() + require.Nil(t, gotStart) + require.Nil(t, gotEnd) + }) + + t.Run("non-evm bounded", func(t *testing.T) { + start := []byte("a") + end := []byte("z") + iter, err := s.Iterator("bank", start, end, true) + require.NoError(t, err) + defer func() { require.NoError(t, iter.Close()) }() + + gotStart, gotEnd := iter.Domain() + require.Equal(t, start, gotStart) + require.Equal(t, end, gotEnd) + }) +} + +// TestEvmIteratorSnapshotConcurrentWithWrites exercises the RWMutex (M2): +// iterators are stable snapshots that can be built and drained concurrently +// with ApplyChangeSets/Commit, and a snapshot opened before writes is unaffected +// by later commits. Run with -race to detect unsynchronized access to the +// pending-writes maps. +func TestEvmIteratorSnapshotConcurrentWithWrites(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + base := addrN(0x01) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{namedCS( + noncePair(base, 7), + codePair(base, []byte{0xaa}), + storagePair(base, slotN(0x01), []byte{0xbb}), + )})) + commitAndCheck(t, s) + + // Expected committed-only state, captured before any concurrent writes. + wantIter, err := s.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, err) + want := collectIterEntries(t, wantIter) + require.NoError(t, wantIter.Close()) + + // A snapshot opened before the writer starts must keep returning `want` + // regardless of the commits that follow. + snap, err := s.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + a := addrN(byte(0x20 + i)) + if applyErr := s.ApplyChangeSets([]*proto.NamedChangeSet{namedCS( + noncePair(a, uint64(i+1)), + )}); applyErr != nil { + t.Errorf("ApplyChangeSets: %v", applyErr) + return + } + if _, commitErr := s.Commit(); commitErr != nil { + t.Errorf("Commit: %v", commitErr) + return + } + } + }() + + // Concurrently build and drain fresh iterators (RLock) while the writer + // holds the write lock, to stress the lock under -race. + for i := 0; i < 50; i++ { + it, iterErr := s.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, iterErr) + _ = collectIterEntries(t, it) + require.NoError(t, it.Close()) + } + + wg.Wait() + + got := collectIterEntries(t, snap) + require.NoError(t, snap.Close()) + require.Equal(t, want, got, "pre-write snapshot must be unaffected by concurrent commits") +} + +// TestEvmLaneBounds exercises every branch of evmLaneBounds in +// isolation, for an aligned lane (logical == physical byte) and the misaligned +// codehash lane (logical 0x08 -> physical account byte 0x0a). +func TestEvmLaneBounds(t *testing.T) { + phys := func(b byte, suffix ...byte) []byte { + return append([]byte{'e', 'v', 'm', '/', b}, suffix...) + } + + cases := []struct { + name string + start []byte + end []byte + logical byte + physical byte + wantEmpty bool + wantLower []byte + wantUpper []byte + }{ + // Aligned lane (nonce: logical 0x0a, physical 0x0a). + { + name: "aligned within span", + start: []byte{0x0a, 0x01}, + end: []byte{0x0a, 0x02}, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a, 0x01), + wantUpper: phys(0x0a, 0x02), + }, + { + name: "aligned low clamp nil start", + start: nil, + end: []byte{0x0a, 0x02}, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0a, 0x02), + }, + { + name: "aligned high clamp nil end", + start: []byte{0x0a, 0x01}, + end: nil, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a, 0x01), + wantUpper: phys(0x0b), + }, + { + name: "aligned full range", + start: nil, + end: nil, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0b), + }, + { + name: "aligned start below span", + start: []byte{0x05}, + end: nil, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0b), + }, + { + name: "aligned end above span", + start: nil, + end: []byte{0x20}, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0b), + }, + { + name: "aligned exact bare endpoints", + start: []byte{0x0a}, + end: []byte{0x0b}, + logical: 0x0a, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0b), + }, + { + name: "aligned disjoint below", + start: nil, + end: []byte{0x09}, + logical: 0x0a, + physical: 0x0a, + wantEmpty: true, + }, + { + name: "aligned disjoint above", + start: []byte{0x0b}, + end: nil, + logical: 0x0a, + physical: 0x0a, + wantEmpty: true, + }, + { + name: "aligned single key empty", + start: []byte{0x0a, 0x01}, + end: []byte{0x0a, 0x01}, + logical: 0x0a, + physical: 0x0a, + wantEmpty: true, + }, + + // Misaligned codehash lane (logical 0x08, physical 0x0a). + { + name: "codehash within span swaps prefix", + start: []byte{0x08, 0x01}, + end: []byte{0x08, 0x02}, + logical: 0x08, + physical: 0x0a, + wantLower: phys(0x0a, 0x01), + wantUpper: phys(0x0a, 0x02), + }, + { + name: "codehash full prefix maps to account region", + start: []byte{0x08}, + end: []byte{0x09}, + logical: 0x08, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0b), + }, + { + name: "codehash nil maps to account region", + start: nil, + end: nil, + logical: 0x08, + physical: 0x0a, + wantLower: phys(0x0a), + wantUpper: phys(0x0b), + }, + { + name: "codehash disjoint from nonce query", + start: []byte{0x0a}, + end: []byte{0x0b}, + logical: 0x08, + physical: 0x0a, + wantEmpty: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + lower, upper, empty := evmLaneBounds(tc.start, tc.end, tc.logical, tc.physical) + require.Equal(t, tc.wantEmpty, empty) + if tc.wantEmpty { + return + } + require.Equal(t, tc.wantLower, lower, "lower") + require.Equal(t, tc.wantUpper, upper, "upper") + }) + } +} + +// TestEvmIteratorDifferential compares the EVM iterator against the +// independent subrange oracle across many randomized boundary-relevant ranges +// and both directions. Because the oracle is implementation-independent and the +// comparison is over full ordered slices, this catches out-of-range emission, +// missing keys, mis-ordering, inclusive/exclusive boundary errors, and +// direction bugs. +func TestEvmIteratorDifferential(t *testing.T) { + seed := iteratorTestSeed(t, "TestEvmIteratorDifferential") + fixture := buildEvmIteratorFixture(t, seed) + defer func() { require.NoError(t, fixture.Store.Close()) }() + + rng := rand.New(rand.NewSource(seed ^ 0x5deece66d)) //nolint:gosec // deterministic test data only + + // Pool of boundary-relevant keys: every committed/pending key, plus each + // type prefix and its prefix-end. + var pool [][]byte + for _, e := range fixture.Sorted { + pool = append(pool, bytes.Clone(e.Key)) + } + for _, p := range [][]byte{{0x03}, {0x07}, {0x08}, {0x09}, {0x0a}} { + pool = append(pool, bytes.Clone(p), ktype.PrefixEnd(p)) + } + + pick := func() []byte { + switch rng.Intn(6) { + case 0: + return nil + case 1: + return bytes.Clone(pool[rng.Intn(len(pool))]) + case 2: + return decrementKey(pool[rng.Intn(len(pool))]) + case 3: + return incrementKey(pool[rng.Intn(len(pool))]) + default: + n := 1 + rng.Intn(21) + b := make([]byte, n) + rng.Read(b) + return b + } + } + + const iterations = 400 + for i := 0; i < iterations; i++ { + start := pick() + end := pick() + ascending := rng.Intn(2) == 0 + + want := subrange(fixture.Sorted, start, end, ascending) + iter, err := fixture.Store.Iterator(keys.EVMStoreKey, start, end, ascending) + require.NoError(t, err) + got := collectIterEntries(t, iter) + require.NoError(t, iter.Close()) + msg := fmt.Sprintf("iter %d start=%x end=%x ascending=%v seed=%d", i, start, end, ascending, fixture.Seed) + if len(want) == 0 { + require.Empty(t, got, msg) + } else { + require.Equal(t, want, got, msg) + } + } +} + +// TestEvmIteratorEmptyAndDegenerate covers an empty store and +// degenerate ranges (equal bounds, inverted bounds) on a populated store. +func TestEvmIteratorEmptyAndDegenerate(t *testing.T) { + t.Run("empty store", func(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + iter, err := s.Iterator(keys.EVMStoreKey, nil, nil, true) + require.NoError(t, err) + require.Empty(t, collectIterEntries(t, iter)) + require.NoError(t, iter.Close()) + }) + + seed := iteratorTestSeed(t, "TestEvmIteratorEmptyAndDegenerate") + fixture := buildEvmIteratorFixture(t, seed) + defer func() { require.NoError(t, fixture.Store.Close()) }() + require.NotEmpty(t, fixture.Sorted) + + lo := fixture.Sorted[0].Key + hi := fixture.Sorted[len(fixture.Sorted)-1].Key + + t.Run("equal bounds", func(t *testing.T) { + iter, err := fixture.Store.Iterator(keys.EVMStoreKey, lo, lo, true) + require.NoError(t, err) + require.Empty(t, collectIterEntries(t, iter)) + require.NoError(t, iter.Close()) + }) + + t.Run("inverted bounds", func(t *testing.T) { + iter, err := fixture.Store.Iterator(keys.EVMStoreKey, hi, lo, true) + require.NoError(t, err) + require.Empty(t, collectIterEntries(t, iter)) + require.NoError(t, iter.Close()) + }) +} + +// decrementKey returns the largest key strictly less than k (for boundary +// fuzzing). incrementKey returns the smallest key strictly greater than k. +func decrementKey(k []byte) []byte { + if len(k) == 0 { + return nil + } + out := bytes.Clone(k) + if out[len(out)-1] > 0 { + out[len(out)-1]-- + return out + } + return out[:len(out)-1] +} + +func incrementKey(k []byte) []byte { + return append(bytes.Clone(k), 0x00) +} + +func iteratorTestSeed(t *testing.T, label string) int64 { + t.Helper() + if *evmIterSeed != 0 { + t.Logf("evm iterator seed=%d (from -evm-iter-seed)", *evmIterSeed) + return *evmIterSeed + } + h := fnv.New64a() + _, _ = h.Write([]byte(t.Name())) + if label != "" { + _, _ = h.Write([]byte{0}) + _, _ = h.Write([]byte(label)) + } + seed := int64(h.Sum64()) //nolint:gosec // deterministic test data only + t.Logf("evm iterator seed=%d (reproduce with -evm-iter-seed=%d)", seed, seed) + return seed +} + +func buildEvmIteratorFixture(t *testing.T, seed int64) *evmIteratorFixture { + t.Helper() + t.Logf("building EVM iterator fixture with seed=%d", seed) + + s := setupTestStore(t) + rng := rand.New(rand.NewSource(seed)) //nolint:gosec // deterministic test data only + + latest := make(map[string]evmIteratorEntry) + var batch1, batch2 []*proto.KVPair + var overlapSamples []evmIteratorEntry + var tombstonedKeys [][]byte + usedAddrs := make(map[string]struct{}, 32) + + gen := &evmIteratorGenerator{ + rng: rng, + latest: latest, + batch1: &batch1, + batch2: &batch2, + overlaps: &overlapSamples, + tombstones: &tombstonedKeys, + usedAddrs: usedAddrs, + } + + for _, disp := range []evmIteratorDisposition{ + dispositionPebbleOnly, + dispositionPendingOnly, + dispositionOverlap, + dispositionTombstone, + } { + gen.addStorage(disp) + gen.addCode(disp) + gen.addLegacy(disp) + gen.addAccount(disp) + } + + // Nonce-only account (no codehash key in iterator output). + gen.addNonceOnlyAccount() + + // Malformed account-prefixed legacy key: lands in the account physical + // region (evm/0x0a...) but is routed to legacyDB, exercising the overlap + // between the legacy lane and the account-derived lanes. + gen.addMalformedAccountLegacyKey() + + // Malformed storage/code-prefixed legacy keys: correct type byte but wrong + // length, so they route to legacyDB and physically live in the storage + // (evm/0x03...) and code (evm/0x07...) keyspaces. Confirms the legacy lane + // interleaves with the storage and code lanes and that the merge does not + // falsely dedup a legacy key against an optimized-lane key of a different + // length. + gen.addMalformedStoragePrefixLegacyKey() + gen.addMalformedCodePrefixLegacyKey() + + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{namedCS(batch1...)})) + commitAndCheck(t, s) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{namedCS(batch2...)})) + + return &evmIteratorFixture{ + Seed: seed, + Store: s, + Sorted: sortedEvmEntries(latest), + OverlapSamples: overlapSamples, + TombstonedKeys: tombstonedKeys, + } +} + +type evmIteratorGenerator struct { + rng *rand.Rand + latest map[string]evmIteratorEntry + batch1 *[]*proto.KVPair + batch2 *[]*proto.KVPair + overlaps *[]evmIteratorEntry + tombstones *[][]byte + usedAddrs map[string]struct{} +} + +func (g *evmIteratorGenerator) uniqueAddr() ktype.Address { + for attempts := 0; attempts < 512; attempts++ { + var a ktype.Address + g.rng.Read(a[:]) + if _, used := g.usedAddrs[string(a[:])]; used { + continue + } + g.usedAddrs[string(a[:])] = struct{}{} + return a + } + panic("failed to allocate unique test address") +} + +// addMalformedAccountLegacyKey writes a 0x0a-prefixed key whose length does not +// match a well-formed nonce key, so it routes to legacyDB while physically +// living in the account keyspace (evm/0x0a...). +func (g *evmIteratorGenerator) addMalformedAccountLegacyKey() { + key := append([]byte{0x0a}, bytes.Repeat([]byte{0x7f}, 19)...) + val := []byte{0xab, 0xcd} + *g.batch1 = append(*g.batch1, &proto.KVPair{Key: bytes.Clone(key), Value: bytes.Clone(val)}) + setEvmLatest(g.latest, key, val) +} + +// addMalformedStoragePrefixLegacyKey writes a 0x03-prefixed key whose length +// does not match a well-formed storage key (1 + 20 + 32), so it routes to +// legacyDB while physically living in the storage keyspace (evm/0x03...). It is +// committed (batch1) so the legacy and storage lanes interleave over pebble. +func (g *evmIteratorGenerator) addMalformedStoragePrefixLegacyKey() { + key := append([]byte{0x03}, bytes.Repeat([]byte{0x7f}, ktype.AddressLen)...) + val := []byte{0x12, 0x34} + *g.batch1 = append(*g.batch1, &proto.KVPair{Key: bytes.Clone(key), Value: bytes.Clone(val)}) + setEvmLatest(g.latest, key, val) +} + +// addMalformedCodePrefixLegacyKey writes a 0x07-prefixed key whose length does +// not match a well-formed code key (1 + 20), so it routes to legacyDB while +// physically living in the code keyspace (evm/0x07...). It is pending-only +// (batch2) so the legacy and code lanes interleave over pending writes too. +func (g *evmIteratorGenerator) addMalformedCodePrefixLegacyKey() { + key := append([]byte{0x07}, bytes.Repeat([]byte{0x5a}, ktype.AddressLen-1)...) + val := []byte{0x56, 0x78} + *g.batch2 = append(*g.batch2, &proto.KVPair{Key: bytes.Clone(key), Value: bytes.Clone(val)}) + setEvmLatest(g.latest, key, val) +} + +func (g *evmIteratorGenerator) uniqueSlot() ktype.Slot { + var s ktype.Slot + g.rng.Read(s[:]) + if s == (ktype.Slot{}) { + s[31] = 1 + } + return s +} + +func (g *evmIteratorGenerator) storageVal() []byte { + return padLeft32(g.rngByte()) +} + +func (g *evmIteratorGenerator) codeVal() []byte { + n := 1 + g.rng.Intn(16) + out := make([]byte, n) + g.rng.Read(out) + return out +} + +func (g *evmIteratorGenerator) legacyVal() []byte { + n := 1 + g.rng.Intn(32) + out := make([]byte, n) + g.rng.Read(out) + return out +} + +func (g *evmIteratorGenerator) rngByte() byte { + return byte(g.rng.Intn(256)) //nolint:gosec +} + +func (g *evmIteratorGenerator) rngNonce() uint64 { + return g.rng.Uint64() +} + +func (g *evmIteratorGenerator) rngCodeHash() vtype.CodeHash { + var h vtype.CodeHash + g.rng.Read(h[:]) + if h == (vtype.CodeHash{}) { + h[0] = 1 + } + return h +} + +func (g *evmIteratorGenerator) recordOverlap(key, value []byte) { + *g.overlaps = append(*g.overlaps, evmIteratorEntry{ + Key: bytes.Clone(key), + Value: bytes.Clone(value), + }) +} + +func (g *evmIteratorGenerator) recordTombstone(key []byte) { + *g.tombstones = append(*g.tombstones, bytes.Clone(key)) +} + +func (g *evmIteratorGenerator) addStorage(disp evmIteratorDisposition) { + addr := g.uniqueAddr() + slot := g.uniqueSlot() + v1 := g.storageVal() + v2 := g.storageVal() + for bytes.Equal(v1, v2) { + v2 = g.storageVal() + } + + key := keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot)) + switch disp { + case dispositionPebbleOnly: + *g.batch1 = append(*g.batch1, storagePair(addr, slot, v1)) + recordStorageLatest(g.latest, addr, slot, v1) + case dispositionPendingOnly: + *g.batch2 = append(*g.batch2, storagePair(addr, slot, v2)) + recordStorageLatest(g.latest, addr, slot, v2) + case dispositionOverlap: + *g.batch1 = append(*g.batch1, storagePair(addr, slot, v1)) + *g.batch2 = append(*g.batch2, storagePair(addr, slot, v2)) + recordStorageLatest(g.latest, addr, slot, v2) + g.recordOverlap(key, padLeft32(v2...)) + case dispositionTombstone: + *g.batch1 = append(*g.batch1, storagePair(addr, slot, v1)) + *g.batch2 = append(*g.batch2, storageDeletePair(addr, slot)) + removeStorageLatest(g.latest, addr, slot) + g.recordTombstone(key) + } +} + +func (g *evmIteratorGenerator) addCode(disp evmIteratorDisposition) { + addr := g.uniqueAddr() + v1 := g.codeVal() + v2 := g.codeVal() + for bytes.Equal(v1, v2) { + v2 = g.codeVal() + } + + key := keys.BuildEVMKey(keys.EVMKeyCode, addr[:]) + switch disp { + case dispositionPebbleOnly: + *g.batch1 = append(*g.batch1, codePair(addr, v1)) + recordCodeLatest(g.latest, addr, v1) + case dispositionPendingOnly: + *g.batch2 = append(*g.batch2, codePair(addr, v2)) + recordCodeLatest(g.latest, addr, v2) + case dispositionOverlap: + *g.batch1 = append(*g.batch1, codePair(addr, v1)) + *g.batch2 = append(*g.batch2, codePair(addr, v2)) + recordCodeLatest(g.latest, addr, v2) + g.recordOverlap(key, bytes.Clone(v2)) + case dispositionTombstone: + *g.batch1 = append(*g.batch1, codePair(addr, v1)) + *g.batch2 = append(*g.batch2, codeDeletePair(addr)) + removeCodeLatest(g.latest, addr) + g.recordTombstone(key) + } +} + +func (g *evmIteratorGenerator) addLegacy(disp evmIteratorDisposition) { + addr := g.uniqueAddr() + v1 := g.legacyVal() + v2 := g.legacyVal() + for bytes.Equal(v1, v2) { + v2 = g.legacyVal() + } + + key := append([]byte{0x09}, addr[:]...) + switch disp { + case dispositionPebbleOnly: + *g.batch1 = append(*g.batch1, legacyPair(addr, v1)) + recordLegacyLatest(g.latest, addr, v1) + case dispositionPendingOnly: + *g.batch2 = append(*g.batch2, legacyPair(addr, v2)) + recordLegacyLatest(g.latest, addr, v2) + case dispositionOverlap: + *g.batch1 = append(*g.batch1, legacyPair(addr, v1)) + *g.batch2 = append(*g.batch2, legacyPair(addr, v2)) + recordLegacyLatest(g.latest, addr, v2) + g.recordOverlap(key, bytes.Clone(v2)) + case dispositionTombstone: + *g.batch1 = append(*g.batch1, legacyPair(addr, v1)) + *g.batch2 = append(*g.batch2, legacyDeletePair(addr)) + removeLegacyLatest(g.latest, addr) + g.recordTombstone(key) + } +} + +func (g *evmIteratorGenerator) addAccount(disp evmIteratorDisposition) { + addr := g.uniqueAddr() + n1 := g.rngNonce() + n2 := g.rngNonce() + for n1 == n2 { + n2 = g.rngNonce() + } + ch1 := g.rngCodeHash() + ch2 := g.rngCodeHash() + for ch1 == ch2 { + ch2 = g.rngCodeHash() + } + + nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) + codeHashKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) + + switch disp { + case dispositionPebbleOnly: + *g.batch1 = append(*g.batch1, noncePair(addr, n1), codeHashPair(addr, ch1)) + recordNonceLatest(g.latest, addr, n1) + recordCodeHashLatest(g.latest, addr, ch1) + case dispositionPendingOnly: + *g.batch2 = append(*g.batch2, noncePair(addr, n2), codeHashPair(addr, ch2)) + recordNonceLatest(g.latest, addr, n2) + recordCodeHashLatest(g.latest, addr, ch2) + case dispositionOverlap: + *g.batch1 = append(*g.batch1, noncePair(addr, n1), codeHashPair(addr, ch1)) + *g.batch2 = append(*g.batch2, noncePair(addr, n2), codeHashPair(addr, ch2)) + recordNonceLatest(g.latest, addr, n2) + recordCodeHashLatest(g.latest, addr, ch2) + g.recordOverlap(nonceKey, nonceBytes(n2)) + g.recordOverlap(codeHashKey, ch2[:]) + case dispositionTombstone: + *g.batch1 = append(*g.batch1, noncePair(addr, n1), codeHashPair(addr, ch1)) + *g.batch2 = append(*g.batch2, nonceDeletePair(addr), codeHashDeletePair(addr)) + removeAccountLatest(g.latest, addr) + g.recordTombstone(nonceKey) + g.recordTombstone(codeHashKey) + } +} + +func (g *evmIteratorGenerator) addNonceOnlyAccount() { + addr := g.uniqueAddr() + n := g.rngNonce() + *g.batch1 = append(*g.batch1, noncePair(addr, n)) + recordNonceLatest(g.latest, addr, n) +} + +func bankNamedCS(pairs ...*proto.KVPair) *proto.NamedChangeSet { + return &proto.NamedChangeSet{ + Name: "bank", + Changeset: proto.ChangeSet{Pairs: pairs}, + } +} + +func legacyPair(addr ktype.Address, val []byte) *proto.KVPair { + return &proto.KVPair{ + Key: append([]byte{0x09}, addr[:]...), + Value: val, + } +} + +func legacyDeletePair(addr ktype.Address) *proto.KVPair { + return &proto.KVPair{ + Key: append([]byte{0x09}, addr[:]...), + Delete: true, + } +} + +func setEvmLatest(latest map[string]evmIteratorEntry, key, value []byte) { + latest[string(key)] = evmIteratorEntry{ + Key: bytes.Clone(key), + Value: bytes.Clone(value), + } +} + +func removeEvmLatest(latest map[string]evmIteratorEntry, key []byte) { + delete(latest, string(key)) +} + +func recordStorageLatest(latest map[string]evmIteratorEntry, addr ktype.Address, slot ktype.Slot, val []byte) { + key := keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot)) + setEvmLatest(latest, key, padLeft32(val...)) +} + +func removeStorageLatest(latest map[string]evmIteratorEntry, addr ktype.Address, slot ktype.Slot) { + removeEvmLatest(latest, keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot))) +} + +func recordCodeLatest(latest map[string]evmIteratorEntry, addr ktype.Address, bytecode []byte) { + setEvmLatest(latest, keys.BuildEVMKey(keys.EVMKeyCode, addr[:]), bytes.Clone(bytecode)) +} + +func removeCodeLatest(latest map[string]evmIteratorEntry, addr ktype.Address) { + removeEvmLatest(latest, keys.BuildEVMKey(keys.EVMKeyCode, addr[:])) +} + +func recordLegacyLatest(latest map[string]evmIteratorEntry, addr ktype.Address, val []byte) { + key := append([]byte{0x09}, addr[:]...) + setEvmLatest(latest, key, bytes.Clone(val)) +} + +func removeLegacyLatest(latest map[string]evmIteratorEntry, addr ktype.Address) { + removeEvmLatest(latest, append([]byte{0x09}, addr[:]...)) +} + +func recordNonceLatest(latest map[string]evmIteratorEntry, addr ktype.Address, nonce uint64) { + setEvmLatest(latest, keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]), nonceBytes(nonce)) +} + +func recordCodeHashLatest(latest map[string]evmIteratorEntry, addr ktype.Address, ch vtype.CodeHash) { + key := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) + var zero vtype.CodeHash + if ch == zero { + removeEvmLatest(latest, key) + return + } + setEvmLatest(latest, key, ch[:]) +} + +func removeAccountLatest(latest map[string]evmIteratorEntry, addr ktype.Address) { + removeEvmLatest(latest, keys.BuildEVMKey(keys.EVMKeyNonce, addr[:])) + removeEvmLatest(latest, keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:])) +} + +func sortedEvmEntries(latest map[string]evmIteratorEntry) []evmIteratorEntry { + out := make([]evmIteratorEntry, 0, len(latest)) + for _, e := range latest { + out = append(out, e) + } + sort.Slice(out, func(i, j int) bool { + return bytes.Compare(out[i].Key, out[j].Key) < 0 + }) + return out +} + +func findEntry(entries []evmIteratorEntry, key []byte) (evmIteratorEntry, bool) { + for _, e := range entries { + if bytes.Equal(e.Key, key) { + return e, true + } + } + return evmIteratorEntry{}, false +} + +func subrange(sorted []evmIteratorEntry, start, end []byte, ascending bool) []evmIteratorEntry { + out := make([]evmIteratorEntry, 0, len(sorted)) + for _, e := range sorted { + if start != nil && bytes.Compare(e.Key, start) < 0 { + continue + } + if end != nil && bytes.Compare(e.Key, end) >= 0 { + continue + } + out = append(out, e) + } + if !ascending { + for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { + out[i], out[j] = out[j], out[i] + } + } + return out +} + +func collectIterEntries(t *testing.T, iter dbm.Iterator) []evmIteratorEntry { + t.Helper() + var out []evmIteratorEntry + for ; iter.Valid(); iter.Next() { + out = append(out, evmIteratorEntry{ + Key: bytes.Clone(iter.Key()), + Value: bytes.Clone(iter.Value()), + }) + } + require.NoError(t, iter.Error()) + return out +} + +func sumFlatKVTableIters(s *CommitStore) (int64, error) { + var sum int64 + for _, db := range s.dataDBs() { + n, err := pebbledb.TableIters(db) + if err != nil { + return 0, err + } + sum += n + } + return sum, nil +} diff --git a/sei-db/state_db/sc/flatkv/store_read.go b/sei-db/state_db/sc/flatkv/store_read.go index cea795be21..42e980bba1 100644 --- a/sei-db/state_db/sc/flatkv/store_read.go +++ b/sei-db/state_db/sc/flatkv/store_read.go @@ -4,10 +4,7 @@ import ( "encoding/binary" "fmt" - dbm "github.com/tendermint/tm-db" - errorutils "github.com/sei-protocol/sei-chain/sei-db/common/errors" - "github.com/sei-protocol/sei-chain/sei-db/common/iterators" "github.com/sei-protocol/sei-chain/sei-db/common/keys" seidbtypes "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/ktype" @@ -21,6 +18,13 @@ import ( // Returns (value, true) if found, (nil, false) if not found. // Panics on I/O errors or unsupported key types. func (s *CommitStore) Get(moduleName string, key []byte) ([]byte, bool) { + // Read lock: the internal getters (getAccountData, getStorageData, + // getCodeData, getLegacyData) read the pending-writes maps, which + // ApplyChangeSets/Commit mutate under the write lock. Has delegates to Get + // and must not take its own lock (RWMutex read locks are not reentrant). + s.mu.RLock() + defer s.mu.RUnlock() + if moduleName != keys.EVMStoreKey { value, err := s.getLegacyValue(moduleName, key) if err != nil { @@ -86,6 +90,11 @@ func (s *CommitStore) Get(moduleName string, key []byte) ([]byte, bool) { // Only supported for EVM keys; non-EVM legacy data does not track block height. // If not found, returns (-1, false, nil). func (s *CommitStore) GetBlockHeightModified(moduleName string, key []byte) (int64, bool, error) { + // Read lock: the internal getters (getStorageData, getAccountData, + // getCodeData) read the pending-writes maps mutated under the write lock. + s.mu.RLock() + defer s.mu.RUnlock() + if moduleName != keys.EVMStoreKey { return -1, false, fmt.Errorf("block height modified not tracked for module %q", moduleName) } @@ -218,48 +227,3 @@ func (s *CommitStore) getLegacyValue(moduleName string, key []byte) ([]byte, err } return ld.GetValue(), nil } - -// RawGlobalIterator returns an iterator over all committed keys across the -// data DBs (account, code, storage, legacy), merged in global lexicographic -// order. Within each DB, keys are in Pebble order. Per-DB _meta/* keys are -// skipped. Pending writes are not visible. metadataDB is not included. -func (s *CommitStore) RawGlobalIterator() (dbm.Iterator, error) { - dbs := s.dataDBs() - children := make([]dbm.Iterator, 0, len(dbs)) - for _, db := range dbs { - pebbleIter, err := db.NewIter(nil) - if err != nil { - closeIterators(children) - return nil, fmt.Errorf("open data DB iterator: %w", err) - } - mapped, err := iterators.NewMappingIterator(pebbleIter, skipMetaKeys) - if err != nil { - closeIterators(children) - return nil, err - } - children = append(children, mapped) - } - merged, err := iterators.NewMergingIterator(children...) - if err != nil { - closeIterators(children) - return nil, err - } - if err := merged.Error(); err != nil { - _ = merged.Close() - return nil, err - } - return merged, nil -} - -// Used to cause the raw global iterator to skip _meta/* keys. -func skipMetaKeys(key, value []byte) ([]byte, []byte, bool, error) { - return key, value, ktype.IsMetaKey(key), nil -} - -func closeIterators(iters []dbm.Iterator) { - for _, it := range iters { - if it != nil { - _ = it.Close() - } - } -} diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index 8d495b6bb2..5bd6c1bcb2 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -20,6 +20,16 @@ import ( // On crash, catchup replays WAL to recover incomplete commits. func (s *CommitStore) Commit() (version int64, err error) { start := time.Now() + + // TODO(concurrency): This takes a single coarse write lock for the whole + // commit, so it also blocks readers/iterator construction during the WAL + // fsync and the periodic auto-snapshot. That is fine today because commits + // are not pipelined with reads (there is currently no pipelining at all). + // When commit pipelining is introduced, replace this with a finer-grained + // scheme. + s.mu.Lock() + defer s.mu.Unlock() + pendingAccount := len(s.accountWrites) pendingCode := len(s.codeWrites) pendingStorage := len(s.storageWrites)