Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions sei-cosmos/storev2/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error {
keySizePerStore[currentStoreName] += int64(len(item.Key))
valueSizePerStore[currentStoreName] += int64(len(item.Value))
numKeysPerStore[currentStoreName] += 1
telemetry.IncrCounter(1, "state_sync", "num_keys_exported")
case string:
if err := protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Store{
Expand Down
32 changes: 24 additions & 8 deletions sei-db/state_db/sc/memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,10 @@

if result.mtree == nil {
// background snapshot rewrite failed
otelMetrics.NumSnapshotRewriteAttempts.Add(context.Background(), 1, metric.WithAttributes(attribute.String("success", "false")))
return fmt.Errorf("background snapshot rewriting failed: %w", result.err)
} else {
otelMetrics.NumSnapshotRewriteAttempts.Add(context.Background(), 1, metric.WithAttributes(attribute.String("success", "true")))
}

// wait for potential pending writes to finish, to make sure we catch up to latest state.
Expand All @@ -485,11 +488,15 @@
}

// catchup the remaining entries in rlog
startTime := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if wal := db.GetWAL(); wal != nil {
if err := result.mtree.Catchup(context.Background(), wal, db.walIndexDelta, 0); err != nil {
return fmt.Errorf("catchup failed: %w", err)
}
}
replayElapsedTime := time.Since(startTime).Seconds()
otelMetrics.CatchupBeforeReloadLatency.Record(context.Background(), replayElapsedTime)
db.logger.Info("successfully replayed and caught up before switching to new memiavl snapshot", "version", db.MultiTree.Version(), "latency_sec", replayElapsedTime)

// do the switch
if err := db.reloadMultiTree(result.mtree); err != nil {
Expand Down Expand Up @@ -519,7 +526,9 @@
db.logger.Info("pruneSnapshots started")
startTime := time.Now()
defer func() {
db.logger.Info("pruneSnapshots completed", "duration_sec", fmt.Sprintf("%.2fs", time.Since(startTime).Seconds()))
pruneLatency := time.Since(startTime).Seconds()
otelMetrics.SnapshotPruneLatency.Record(context.Background(), pruneLatency)
db.logger.Info("pruneSnapshots completed", "duration_sec", fmt.Sprintf("%.2fs", pruneLatency))
}()

currentVersion, err := currentVersion(db.dir)
Expand All @@ -541,15 +550,19 @@
}

name := snapshotName(version)
db.logger.Info("prune snapshot", "name", name)

if err := atomicRemoveDir(filepath.Join(db.dir, name)); err != nil {
db.logger.Error("failed to prune snapshot", "err", err)
otelMetrics.NumSnapshotPruneAttempts.Add(context.Background(), 1, metric.WithAttributes(attribute.String("success", "false")))
} else {
db.logger.Info("successfully pruned snapshot", "name", name)
otelMetrics.NumSnapshotPruneAttempts.Add(context.Background(), 1, metric.WithAttributes(attribute.String("success", "true")))
}

return false, nil
}); err != nil {
db.logger.Error("fail to prune snapshots", "err", err)
otelMetrics.NumSnapshotPruneAttempts.Add(context.Background(), 1, metric.WithAttributes(attribute.String("success", "false")))
return
}
}
Expand Down Expand Up @@ -651,6 +664,7 @@
// Rewrite tree snapshot if applicable
db.rewriteIfApplicable(v)
db.tryTruncateWAL()
otelMetrics.CurrentSnapshotHeight.Record(context.Background(), db.SnapshotVersion())

return v, nil
}
Expand Down Expand Up @@ -840,6 +854,7 @@

if err := db.rewriteSnapshotBackground(); err != nil {
db.logger.Error("failed to rewrite snapshot in background", "err", err)
otelMetrics.NumSnapshotRewriteAttempts.Add(context.Background(), 1, metric.WithAttributes(attribute.String("success", "false")))
}
}
}
Expand Down Expand Up @@ -881,7 +896,6 @@
defer close(ch)
startTime := time.Now()
cloned.logger.Info("start rewriting snapshot", "version", cloned.Version())

rewriteStart := time.Now()
if err := cloned.RewriteSnapshot(ctx); err != nil {
cloned.logger.Error("failed to rewrite snapshot", "error", err, "elapsed", time.Since(rewriteStart).Seconds())
Expand Down Expand Up @@ -921,15 +935,17 @@
ch <- snapshotResult{err: err}
return
}
cloned.logger.Info("finished best-effort catchup", "version", cloned.Version(), "latest", mtree.Version(), "elapsed", time.Since(catchupStart).Seconds())
catchupElapsed := time.Since(catchupStart).Seconds()
otelMetrics.CatchupAfterRewriteLatency.Record(context.Background(), catchupElapsed)
cloned.logger.Info("finished best-effort catchup after snapshot rewrite", "version", cloned.Version(), "latest", mtree.Version(), "elapsed", catchupElapsed)
}

ch <- snapshotResult{mtree: mtree}
totalElapsed := time.Since(startTime).Seconds()
cloned.logger.Info("snapshot rewrite process completed", "duration_sec", totalElapsed, "duration_min", totalElapsed/60)
otelMetrics.SnapshotCreationLatency.Record(
totalRewriteElapsed := time.Since(startTime).Seconds()
cloned.logger.Info("snapshot rewrite process completed", "duration_sec", totalRewriteElapsed, "duration_min", totalRewriteElapsed/60)

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
otelMetrics.SnapshotRewriteLatency.Record(
context.Background(),
totalElapsed,
totalRewriteElapsed,
)
}()

Expand Down
4 changes: 2 additions & 2 deletions sei-db/state_db/sc/memiavl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestSnapshotTriggerOnIntervalDiff(t *testing.T) {
return db.snapshotRewriteChan != nil
}, 100*time.Millisecond, 10*time.Millisecond, "rewrite should not start at height %d", i)
// snapshot version should remain 0 until rewrite
require.EqualValues(t, 0, db.MultiTree.SnapshotVersion())
require.EqualValues(t, 0, db.SnapshotVersion())
}

// Wait for minimum time interval to elapse (1 second + buffer)
Expand All @@ -223,7 +223,7 @@ func TestSnapshotTriggerOnIntervalDiff(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond)

// After completion, snapshot version should be 5
require.EqualValues(t, 5, db.MultiTree.SnapshotVersion())
require.EqualValues(t, 5, db.SnapshotVersion())

require.NoError(t, db.Close())
}
Expand Down
60 changes: 50 additions & 10 deletions sei-db/state_db/sc/memiavl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,64 @@ var (
meter = otel.Meter("seidb_memiavl")

otelMetrics = struct {
RestartLatency metric.Float64Histogram
SnapshotCreationLatency metric.Float64Histogram
CommitLatency metric.Float64Histogram
ApplyChangesetLatency metric.Float64Histogram
NumOfKVPairs metric.Int64Counter
MemNodeTotalSize metric.Int64Gauge
NumOfMemNode metric.Int64Gauge
RestartLatency metric.Float64Histogram
SnapshotRewriteLatency metric.Float64Histogram
NumSnapshotRewriteAttempts metric.Int64Counter
SnapshotPruneLatency metric.Float64Histogram
NumSnapshotPruneAttempts metric.Int64Counter
CatchupAfterRewriteLatency metric.Float64Histogram
CatchupBeforeReloadLatency metric.Float64Histogram
CatchupReplayNumBlocks metric.Int64Counter
CurrentSnapshotHeight metric.Int64Gauge
CommitLatency metric.Float64Histogram
ApplyChangesetLatency metric.Float64Histogram
NumOfKVPairs metric.Int64Counter
MemNodeTotalSize metric.Int64Gauge
NumOfMemNode metric.Int64Gauge
}{
RestartLatency: must(meter.Float64Histogram(
"memiavl_restart_latency",
metric.WithDescription("Time taken to restart the memiavl database"),
metric.WithUnit("s"),
)),
SnapshotCreationLatency: must(meter.Float64Histogram(
"memiavl_snapshot_creation_latency",
metric.WithDescription("Time taken to create memiavl snapshot"),
SnapshotRewriteLatency: must(meter.Float64Histogram(
"memiavl_snapshot_rewrite_latency",
metric.WithDescription("Time taken to write to the new memiavl snapshot"),
metric.WithUnit("s"),
)),
NumSnapshotRewriteAttempts: must(meter.Int64Counter(
"memiavl_num_snapshot_rewrite_attempts",
metric.WithDescription("Total num of memiavl snapshot rewrite attempts"),
metric.WithUnit("{count}"))),
SnapshotPruneLatency: must(meter.Float64Histogram(
"memiavl_snapshot_prune_latency",
metric.WithDescription("Time taken to prune memiavl snapshot"),
metric.WithUnit("s"),
)),
NumSnapshotPruneAttempts: must(meter.Int64Counter(
"memiavl_num_snapshot_prune_attempts",
metric.WithDescription("Total number of snapshot prune attempts"),
metric.WithUnit("{count}"),
)),
CatchupAfterRewriteLatency: must(meter.Float64Histogram(
"memiavl_snapshot_catchup_after_rewrite_latency",
metric.WithDescription("Time taken to catchup and replay after snapshot rewrite"),
metric.WithUnit("s"),
)),
CatchupBeforeReloadLatency: must(meter.Float64Histogram(
"memiavl_snapshot_catchup_before_reload_latency",
metric.WithDescription("Time taken to catchup and replay before switch to new snapshot"),
metric.WithUnit("s"),
)),
CatchupReplayNumBlocks: must(meter.Int64Counter(
"memiavl_snapshot_catchup_replay_num_blocks",
metric.WithDescription("Num of blocks memIAVL has replayed after snapshot creation"),
metric.WithUnit("{count}"),
)),
CurrentSnapshotHeight: must(meter.Int64Gauge(
"memiavl_current_snapshot_height",
metric.WithDescription("Current snapshot height"),
)),
CommitLatency: must(meter.Float64Histogram(
"memiavl_commit_latency",
metric.WithDescription("Time taken to commit"),
Expand Down
1 change: 1 addition & 0 deletions sei-db/state_db/sc/memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (t *MultiTree) Catchup(ctx context.Context, stream wal.ChangelogWAL, delta
t.lastCommitInfo.Version = entry.Version
t.lastCommitInfo.StoreInfos = []proto.StoreInfo{}
replayCount++
otelMetrics.CatchupReplayNumBlocks.Add(context.Background(), 1)
if replayCount%1000 == 0 {
t.logger.Info(fmt.Sprintf("Replayed %d changelog entries", replayCount))
}
Expand Down
33 changes: 1 addition & 32 deletions sei-db/state_db/sc/memiavl/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,7 @@ type snapshotWriter struct {
wg sync.WaitGroup // Wait for all writer goroutines

// Pipeline metrics for each channel
maxKvFill int
maxLeafFill int
maxBranchFill int
kvFillSum int64
leafFillSum int64
branchFillSum int64
kvFillCount int64
leafFillCount int64
branchFillCount int64

lastMetricsReport time.Time
}

Expand Down Expand Up @@ -864,22 +856,6 @@ func (w *snapshotWriter) writeKeyValueDirect(key, value []byte) error {

// writeLeaf sends leaf and KV write operations to the pipeline
func (w *snapshotWriter) writeLeaf(version uint32, key, value, hash []byte) error {
// Track channel fill metrics for all channels
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these since it seems they are not being used

kvFill := len(w.kvChan)
leafFill := len(w.leafChan)

if kvFill > w.maxKvFill {
w.maxKvFill = kvFill
}
if leafFill > w.maxLeafFill {
w.maxLeafFill = leafFill
}

atomic.AddInt64(&w.kvFillSum, int64(kvFill))
atomic.AddInt64(&w.kvFillCount, 1)
atomic.AddInt64(&w.leafFillSum, int64(leafFill))
atomic.AddInt64(&w.leafFillCount, 1)

// Calculate key offset BEFORE sending to KV channel
keyOffset := w.kvsOffset
keyLen := uint32(len(key)) //nolint:gosec
Expand Down Expand Up @@ -942,13 +918,6 @@ func (w *snapshotWriter) writeLeafDirect(version uint32, keyLen uint32, keyOffse

// writeBranch sends a branch write operation to the pipeline
func (w *snapshotWriter) writeBranch(version, size uint32, height, preTrees uint8, keyLeaf uint32, hash []byte) error {
// Track channel fill metrics
branchFill := len(w.branchChan)
if branchFill > w.maxBranchFill {
w.maxBranchFill = branchFill
}
atomic.AddInt64(&w.branchFillSum, int64(branchFill))
atomic.AddInt64(&w.branchFillCount, 1)

// Make copy of hash since we're sending to another goroutine
hashCopy := make([]byte, len(hash))
Expand Down
Loading