From f0c3418b580b6823329a363ac79c4e86378c9651 Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 16 Feb 2026 10:47:27 -0800 Subject: [PATCH 1/9] Port sei-v3 PR #512: persist AppQC and blocks to disk Extract generic A/B file persistence into a reusable consensus/persist/ sub-package and add block-file persistence for crash-safe availability state recovery. Changes: - Move persist.go and persist_test.go into consensus/persist/ (git mv to preserve history), exporting Persister, NewPersister, WriteAndSync, SuffixA, SuffixB. - Add persist/blocks.go: per-block file persistence using _.pb files in a blocks/ subdirectory, with load, delete-before, and header-mismatch validation. - Wire avail.NewState to accept stateDir, create A/B persister for AppQC and BlockPersister for signed lane proposals, and restore both on restart (contiguous block runs, queue alignment). - Update avail/state.go to persist AppQC on prune and delete obsolete block files after each AppQC advance. - Thread PersistentStateDir from consensus.Config through to avail.NewState. - Expand consensus/inner.go doc comment with full persistence design (what, why, recovery, write behavior, rebroadcasting). - Move TestRunOutputsPersistErrorPropagates to consensus/inner_test.go for proper package alignment. - Add comprehensive tests for blocks persistence (empty dir, multi-lane, corrupt/mismatched skip, DeleteBefore, filename roundtrip). Ref: sei-protocol/sei-v3#512 Co-authored-by: Cursor --- .../internal/autobahn/avail/inner.go | 67 +++- .../internal/autobahn/avail/inner_test.go | 287 +++++++++++++++++- .../internal/autobahn/avail/state.go | 113 ++++++- .../internal/autobahn/avail/state_test.go | 140 ++++++++- .../internal/autobahn/consensus/inner.go | 80 ++++- .../internal/autobahn/consensus/inner_test.go | 42 ++- .../autobahn/consensus/persist/blocks.go | 163 ++++++++++ .../autobahn/consensus/persist/blocks_test.go | 235 ++++++++++++++ .../consensus/{ => persist}/persist.go | 41 +-- .../consensus/{ => persist}/persist_test.go | 90 ++---- .../internal/autobahn/consensus/state.go | 18 +- 11 files changed, 1166 insertions(+), 110 deletions(-) create mode 100644 sei-tendermint/internal/autobahn/consensus/persist/blocks.go create mode 100644 sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go rename sei-tendermint/internal/autobahn/consensus/{ => persist}/persist.go (87%) rename sei-tendermint/internal/autobahn/consensus/{ => persist}/persist_test.go (75%) diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index ec17fc983e..47f3a4e027 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -16,14 +16,22 @@ type inner struct { votes map[types.LaneID]*queue[types.BlockNumber, blockVotes] } -func newInner(c *types.Committee) *inner { +// loadedAvailState holds data loaded from disk on restart. +// nil means fresh start (no persisted data). +type loadedAvailState struct { + appQC utils.Option[*types.AppQC] + blocks map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal] +} + +func newInner(c *types.Committee, loaded *loadedAvailState) *inner { votes := map[types.LaneID]*queue[types.BlockNumber, blockVotes]{} blocks := map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]]{} for _, lane := range c.Lanes().All() { votes[lane] = newQueue[types.BlockNumber, blockVotes]() blocks[lane] = newQueue[types.BlockNumber, *types.Signed[*types.LaneProposal]]() } - return &inner{ + + i := &inner{ latestAppQC: utils.None[*types.AppQC](), latestCommitQC: utils.NewAtomicSend(utils.None[*types.CommitQC]()), appVotes: newQueue[types.GlobalBlockNumber, appVotes](), @@ -31,6 +39,61 @@ func newInner(c *types.Committee) *inner { blocks: blocks, votes: votes, } + + if loaded == nil { + return i + } + + // Restore AppQC and advance queues past already-processed indices. + i.latestAppQC = loaded.appQC + if aq, ok := loaded.appQC.Get(); ok { + // CommitQCs through this index have been processed; skip them. + i.commitQCs.first = aq.Proposal().RoadIndex() + 1 + i.commitQCs.next = i.commitQCs.first + // AppVotes through this global block number have been processed. + i.appVotes.first = aq.Proposal().GlobalNumber() + 1 + i.appVotes.next = i.appVotes.first + } + + // Restore persisted blocks into their lane queues. + for lane, bs := range loaded.blocks { + q, ok := i.blocks[lane] + if !ok { + continue // skip blocks for unknown lanes + } + if len(bs) == 0 { + continue + } + // Find the minimum block number. + first := true + var minN types.BlockNumber + for n := range bs { + if first || n < minN { + minN = n + } + first = false + } + // Load contiguous blocks starting from minN. Stop at the first gap + // (e.g. a corrupt file that was skipped during load). Blocks after + // the gap will be re-fetched from peers. + q.first = minN + q.next = minN + for { + b, ok := bs[q.next] + if !ok { + break + } + q.q[q.next] = b + q.next++ + } + // Advance the votes queue to match so headers() returns ErrPruned + // for already-committed blocks instead of blocking forever. + vq := i.votes[lane] + vq.first = minN + vq.next = minN + } + + return i } func (i *inner) laneQC(c *types.Committee, lane types.LaneID, n types.BlockNumber) (*types.LaneQC, bool) { diff --git a/sei-tendermint/internal/autobahn/avail/inner_test.go b/sei-tendermint/internal/autobahn/avail/inner_test.go index d5b619815c..5272d233f7 100644 --- a/sei-tendermint/internal/autobahn/avail/inner_test.go +++ b/sei-tendermint/internal/autobahn/avail/inner_test.go @@ -16,7 +16,8 @@ func TestPruneMismatchedIndices(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state, err := NewState(keys[0], ds, utils.None[string]()) + require.NoError(t, err) // Helper to create a CommitQC for a specific index makeQC := func(index types.RoadIndex, prev utils.Option[*types.CommitQC]) *types.CommitQC { @@ -45,7 +46,7 @@ func TestPruneMismatchedIndices(t *testing.T) { appQC1 := types.NewAppQC(makeAppVotes(keys, appProposal1)) // Now call PushAppQC with appQC1 (index 1) and qc0 (index 0) - err := state.PushAppQC(appQC1, qc0) + err = state.PushAppQC(appQC1, qc0) require.Error(t, err) require.Contains(t, err.Error(), "mismatched QCs") @@ -60,3 +61,285 @@ func TestPruneMismatchedIndices(t *testing.T) { require.False(t, inner.latestAppQC.IsPresent(), "latestAppQC should not have been updated") } } + +// testSignedBlock creates a signed lane proposal for a given lane, block number, and parent hash. +func testSignedBlock(key types.SecretKey, lane types.LaneID, n types.BlockNumber, parent types.BlockHeaderHash, rng utils.Rng) *types.Signed[*types.LaneProposal] { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + return types.Sign(key, types.NewLaneProposal(block)) +} + +func TestNewInnerFreshStart(t *testing.T) { + rng := utils.TestRng() + committee, _ := types.GenCommittee(rng, 4) + + i := newInner(committee, nil) + + require.False(t, i.latestAppQC.IsPresent()) + require.Equal(t, types.RoadIndex(0), i.commitQCs.first) + require.Equal(t, types.RoadIndex(0), i.commitQCs.next) + require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.first) + require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.next) + for _, lane := range committee.Lanes().All() { + require.Equal(t, types.BlockNumber(0), i.blocks[lane].first) + require.Equal(t, types.BlockNumber(0), i.blocks[lane].next) + require.Equal(t, types.BlockNumber(0), i.votes[lane].first) + require.Equal(t, types.BlockNumber(0), i.votes[lane].next) + } +} + +func TestNewInnerLoadedAppQCAdvancesQueues(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + + roadIdx := types.RoadIndex(5) + globalNum := types.GlobalBlockNumber(42) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + loaded := &loadedAvailState{ + appQC: utils.Some[*types.AppQC](appQC), + blocks: nil, + } + + i := newInner(committee, loaded) + + // latestAppQC should be restored. + aq, ok := i.latestAppQC.Get() + require.True(t, ok) + require.Equal(t, roadIdx, aq.Proposal().RoadIndex()) + require.Equal(t, globalNum, aq.Proposal().GlobalNumber()) + + // commitQCs queue should skip past the loaded AppQC's road index. + require.Equal(t, roadIdx+1, i.commitQCs.first) + require.Equal(t, roadIdx+1, i.commitQCs.next) + + // appVotes queue should skip past the loaded AppQC's global block number. + require.Equal(t, globalNum+1, i.appVotes.first) + require.Equal(t, globalNum+1, i.appVotes.next) +} + +func TestNewInnerLoadedAppQCNone(t *testing.T) { + rng := utils.TestRng() + committee, _ := types.GenCommittee(rng, 4) + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: nil, + } + + i := newInner(committee, loaded) + + // No AppQC loaded, queues should start at 0. + require.False(t, i.latestAppQC.IsPresent()) + require.Equal(t, types.RoadIndex(0), i.commitQCs.first) + require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.first) +} + +func TestNewInnerLoadedBlocksContiguous(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + // Build 3 contiguous blocks: 5, 6, 7. + var parent types.BlockHeaderHash + bs := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + for n := types.BlockNumber(5); n < 8; n++ { + b := testSignedBlock(keys[0], lane, n, parent, rng) + parent = b.Msg().Block().Header().Hash() + bs[n] = b + } + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{lane: bs}, + } + + i := newInner(committee, loaded) + + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(5), q.first) + require.Equal(t, types.BlockNumber(8), q.next) + for n := types.BlockNumber(5); n < 8; n++ { + require.Equal(t, bs[n], q.q[n]) + } + + // Votes queue should be aligned. + vq := i.votes[lane] + require.Equal(t, types.BlockNumber(5), vq.first) + require.Equal(t, types.BlockNumber(5), vq.next) +} + +func TestNewInnerLoadedBlocksWithGap(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + // Blocks 3, 4, 6 (gap at 5). + var parent types.BlockHeaderHash + bs := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + for _, n := range []types.BlockNumber{3, 4, 6} { + b := testSignedBlock(keys[0], lane, n, parent, rng) + parent = b.Msg().Block().Header().Hash() + bs[n] = b + } + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{lane: bs}, + } + + i := newInner(committee, loaded) + + // Should load only 3, 4 (stop at gap before 5). + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(3), q.first) + require.Equal(t, types.BlockNumber(5), q.next) + require.Equal(t, bs[3], q.q[types.BlockNumber(3)]) + require.Equal(t, bs[4], q.q[types.BlockNumber(4)]) + _, has6 := q.q[types.BlockNumber(6)] + require.False(t, has6, "block 6 should not be loaded (after gap)") +} + +func TestNewInnerLoadedBlocksEmptyMap(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ + lane: {}, + }, + } + + i := newInner(committee, loaded) + + // Empty block map should leave queue at 0. + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(0), q.first) + require.Equal(t, types.BlockNumber(0), q.next) +} + +func TestNewInnerLoadedBlocksUnknownLane(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + + // Create a lane that doesn't belong to the committee. + unknownKey := types.GenSecretKey(rng) + unknownLane := unknownKey.Public() + + b := testSignedBlock(unknownKey, unknownLane, 0, types.BlockHeaderHash{}, rng) + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ + unknownLane: {0: b}, + }, + } + + i := newInner(committee, loaded) + + // Unknown lane should be silently skipped; known lanes unaffected. + for _, lane := range committee.Lanes().All() { + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(0), q.first) + require.Equal(t, types.BlockNumber(0), q.next) + } + _ = keys // suppress unused +} + +func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + roadIdx := types.RoadIndex(3) + globalNum := types.GlobalBlockNumber(10) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + // Build 2 contiguous blocks: 7, 8. + var parent types.BlockHeaderHash + bs := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + for n := types.BlockNumber(7); n < 9; n++ { + b := testSignedBlock(keys[0], lane, n, parent, rng) + parent = b.Msg().Block().Header().Hash() + bs[n] = b + } + + loaded := &loadedAvailState{ + appQC: utils.Some[*types.AppQC](appQC), + blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ + lane: bs, + }, + } + + i := newInner(committee, loaded) + + // AppQC should be restored. + aq, ok := i.latestAppQC.Get() + require.True(t, ok) + require.Equal(t, roadIdx, aq.Proposal().RoadIndex()) + + // Queues advanced by AppQC. + require.Equal(t, roadIdx+1, i.commitQCs.first) + require.Equal(t, globalNum+1, i.appVotes.first) + + // Blocks restored. + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(7), q.first) + require.Equal(t, types.BlockNumber(9), q.next) + + // Votes aligned. + vq := i.votes[lane] + require.Equal(t, types.BlockNumber(7), vq.first) + require.Equal(t, types.BlockNumber(7), vq.next) +} + +func TestNewInnerLoadedBlocksMultipleLanes(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane0 := keys[0].Public() + lane1 := keys[1].Public() + + // Lane 0: blocks 2, 3. + bs0 := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + var parent0 types.BlockHeaderHash + for n := types.BlockNumber(2); n < 4; n++ { + b := testSignedBlock(keys[0], lane0, n, parent0, rng) + parent0 = b.Msg().Block().Header().Hash() + bs0[n] = b + } + + // Lane 1: blocks 0, 1, 2. + bs1 := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + var parent1 types.BlockHeaderHash + for n := types.BlockNumber(0); n < 3; n++ { + b := testSignedBlock(keys[1], lane1, n, parent1, rng) + parent1 = b.Msg().Block().Header().Hash() + bs1[n] = b + } + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ + lane0: bs0, + lane1: bs1, + }, + } + + i := newInner(committee, loaded) + + // Lane 0. + q0 := i.blocks[lane0] + require.Equal(t, types.BlockNumber(2), q0.first) + require.Equal(t, types.BlockNumber(4), q0.next) + + // Lane 1. + q1 := i.blocks[lane1] + require.Equal(t, types.BlockNumber(0), q1.first) + require.Equal(t, types.BlockNumber(3), q1.next) + + // Votes aligned per lane. + require.Equal(t, types.BlockNumber(2), i.votes[lane0].first) + require.Equal(t, types.BlockNumber(0), i.votes[lane1].first) +} diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 9c98327db8..52986566d0 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -6,11 +6,14 @@ import ( "fmt" "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" ) // ErrBadLane . @@ -38,15 +41,65 @@ type State struct { key types.SecretKey data *data.State inner utils.Watch[*inner] + + // persister writes avail inner state to disk using A/B files; None when persistence is disabled. + persister utils.Option[persist.Persister] + // blockPersist writes/deletes individual block files; nil when persistence is disabled. + blockPersist *persist.BlockPersister } +// innerFile is the A/B file prefix for avail inner state persistence. +const innerFile = "avail_inner" + // NewState constructs a new availability state. -func NewState(key types.SecretKey, data *data.State) *State { - return &State{ - key: key, - data: data, - inner: utils.NewWatch(newInner(data.Committee())), +// stateDir is None when persistence is disabled (testing only). +func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[string]) (*State, error) { + var loaded *loadedAvailState + var p utils.Option[persist.Persister] + var bp *persist.BlockPersister + + if dir, ok := stateDir.Get(); ok { + // Create A/B persister for inner state. + persister, persistedData, err := persist.NewPersister(dir, innerFile) + if err != nil { + return nil, fmt.Errorf("NewPersister %s: %w", innerFile, err) + } + p = utils.Some[persist.Persister](persister) + + // Decode persisted AppQC. + var appQC utils.Option[*types.AppQC] + if bz, ok := persistedData.Get(); ok { + qc, err := types.AppQCConv.Unmarshal(bz) + if err != nil { + return nil, fmt.Errorf("unmarshal persisted AppQC: %w", err) + } + log.Info(). + Uint64("roadIndex", uint64(qc.Proposal().RoadIndex())). + Uint64("globalNumber", uint64(qc.Proposal().GlobalNumber())). + Msg("loaded persisted AppQC") + appQC = utils.Some(qc) + } + + // Load persisted blocks from the blocks/ subdirectory. + var blocks map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal] + bp, blocks, err = persist.NewBlockPersister(dir) + if err != nil { + return nil, fmt.Errorf("NewBlockPersister: %w", err) + } + + loaded = &loadedAvailState{ + appQC: appQC, + blocks: blocks, + } } + + return &State{ + key: key, + data: data, + inner: utils.NewWatch(newInner(data.Committee(), loaded)), + persister: p, + blockPersist: bp, + }, nil } func (s *State) FirstCommitQC() types.RoadIndex { @@ -151,6 +204,7 @@ func (s *State) PushAppVote(ctx context.Context, v *types.Signed[*types.AppVote] if err := s.waitForCommitQC(ctx, idx); err != nil { return err } + var laneFirsts map[types.LaneID]types.BlockNumber for inner, ctrl := range s.inner.Lock() { // Early exit if not useful (we collect <=1 AppQC per road index). if idx < types.NextOpt(inner.latestAppQC) { @@ -176,9 +230,16 @@ func (s *State) PushAppVote(ctx context.Context, v *types.Signed[*types.AppVote] return err } if updated { + if err := s.persistAppQC(appQC); err != nil { + return err + } + laneFirsts = snapshotLaneFirsts(inner) ctrl.Updated() } } + if s.blockPersist != nil { + s.blockPersist.DeleteBefore(laneFirsts) + } return nil } @@ -200,18 +261,48 @@ func (s *State) PushAppQC(appQC *types.AppQC, commitQC *types.CommitQC) error { if appQC.Proposal().RoadIndex() != commitQC.Proposal().Index() { return fmt.Errorf("mismatched QCs: appQC index %v, commitQC index %v", appQC.Proposal().RoadIndex(), commitQC.Proposal().Index()) } + var laneFirsts map[types.LaneID]types.BlockNumber for inner, ctrl := range s.inner.Lock() { updated, err := inner.prune(appQC, commitQC) if err != nil { return err } if updated { + if err := s.persistAppQC(appQC); err != nil { + return err + } + laneFirsts = snapshotLaneFirsts(inner) ctrl.Updated() } } + if s.blockPersist != nil { + s.blockPersist.DeleteBefore(laneFirsts) + } return nil } +func snapshotLaneFirsts(inner *inner) map[types.LaneID]types.BlockNumber { + m := make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) + for lane, q := range inner.blocks { + m[lane] = q.first + } + return m +} + +// persistAppQC writes the AppQC to the A/B file. Called under the inner lock +// so the checkpoint is durable before ctrl.Updated() notifies watchers. +func (s *State) persistAppQC(appQC *types.AppQC) error { + p, ok := s.persister.Get() + if !ok { + return nil + } + bz, err := proto.Marshal(types.AppQCConv.Encode(appQC)) + if err != nil { + return fmt.Errorf("marshal AppQC: %w", err) + } + return p.Persist(bz) +} + // NextBlock returns the index of the next missing block in local storage for the given lane. func (s *State) NextBlock(lane types.LaneID) types.BlockNumber { for inner := range s.inner.Lock() { @@ -290,6 +381,12 @@ func (s *State) PushBlock(ctx context.Context, p *types.Signed[*types.LanePropos return nil } } + // Persist block to disk before adding to in-memory state. + if bp := s.blockPersist; bp != nil { + if err := bp.PersistBlock(h.Lane(), h.BlockNumber(), p); err != nil { + return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err) + } + } q.pushBack(p) ctrl.Updated() } @@ -444,6 +541,12 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload * parent = q.q[q.next-1].Msg().Block().Header().Hash() } p := types.Sign(key, types.NewLaneProposal(types.NewBlock(lane, q.next, parent, payload))) + // Persist block to disk before adding to in-memory state. + if bp := s.blockPersist; bp != nil { + if err := bp.PersistBlock(lane, q.next, p); err != nil { + return nil, fmt.Errorf("persist block %s/%d: %w", lane, q.next, err) + } + } q.q[q.next] = p q.next += 1 ctrl.Updated() diff --git a/sei-tendermint/internal/autobahn/avail/state_test.go b/sei-tendermint/internal/autobahn/avail/state_test.go index c00e090030..a7820c3de0 100644 --- a/sei-tendermint/internal/autobahn/avail/state_test.go +++ b/sei-tendermint/internal/autobahn/avail/state_test.go @@ -7,6 +7,9 @@ import ( "testing" "time" + "google.golang.org/protobuf/proto" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -88,7 +91,8 @@ func TestState(t *testing.T) { s.SpawnBgNamed("data.State.Run()", func() error { return utils.IgnoreCancel(ds.Run(ctx)) }) - state := NewState(keys[0], ds) + state, err := NewState(keys[0], ds, utils.None[string]()) + require.NoError(t, err) s.SpawnBgNamed("da.State.Run()", func() error { return utils.IgnoreCancel(state.Run(ctx)) }) @@ -200,7 +204,8 @@ func TestStateMismatchedQCs(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state, err := NewState(keys[0], ds, utils.None[string]()) + require.NoError(t, err) ctx := context.Background() // Helper to create a CommitQC for a specific index @@ -292,3 +297,134 @@ func TestPushBlockRejectsWrongSigner(t *testing.T) { err := state.PushBlock(ctx, prop) require.Error(t, err) } + +func TestNewStateWithPersistence(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + + t.Run("empty dir loads fresh state", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + // No persisted AppQC → None. + require.False(t, state.LastAppQC().IsPresent()) + // Queues start at 0. + require.Equal(t, types.RoadIndex(0), state.FirstCommitQC()) + }) + + t.Run("loads persisted AppQC", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + // Persist an AppQC to the A/B file. + roadIdx := types.RoadIndex(7) + globalNum := types.GlobalBlockNumber(50) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + persister, _, err := persist.NewPersister(dir, innerFile) + require.NoError(t, err) + bz, err := proto.Marshal(types.AppQCConv.Encode(appQC)) + require.NoError(t, err) + require.NoError(t, persister.Persist(bz)) + + // Now construct state — it should load the AppQC. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + aq := state.LastAppQC() + got, ok := aq.Get() + require.True(t, ok) + require.Equal(t, roadIdx, got.Proposal().RoadIndex()) + require.Equal(t, globalNum, got.Proposal().GlobalNumber()) + + // commitQCs queue should be advanced past the AppQC's road index. + require.Equal(t, roadIdx+1, state.FirstCommitQC()) + }) + + t.Run("loads persisted blocks", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + lane := keys[0].Public() + + // Persist blocks using BlockPersister. + bp, _, err := persist.NewBlockPersister(dir) + require.NoError(t, err) + + var parent types.BlockHeaderHash + for n := types.BlockNumber(0); n < 3; n++ { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + signed := types.Sign(keys[0], types.NewLaneProposal(block)) + parent = block.Header().Hash() + require.NoError(t, bp.PersistBlock(lane, n, signed)) + } + + // Now construct state — it should load the blocks. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + require.Equal(t, types.BlockNumber(3), state.NextBlock(lane)) + }) + + t.Run("loads persisted AppQC and blocks together", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + lane := keys[0].Public() + + // Persist AppQC. + roadIdx := types.RoadIndex(2) + globalNum := types.GlobalBlockNumber(5) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + persister, _, err := persist.NewPersister(dir, innerFile) + require.NoError(t, err) + bz, err := proto.Marshal(types.AppQCConv.Encode(appQC)) + require.NoError(t, err) + require.NoError(t, persister.Persist(bz)) + + // Persist blocks. + bp, _, err := persist.NewBlockPersister(dir) + require.NoError(t, err) + + var parent types.BlockHeaderHash + for n := types.BlockNumber(10); n < 13; n++ { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + signed := types.Sign(keys[0], types.NewLaneProposal(block)) + parent = block.Header().Hash() + require.NoError(t, bp.PersistBlock(lane, n, signed)) + } + + // Construct state. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + // AppQC loaded. + got, ok := state.LastAppQC().Get() + require.True(t, ok) + require.Equal(t, roadIdx, got.Proposal().RoadIndex()) + + // Blocks loaded. + require.Equal(t, types.BlockNumber(13), state.NextBlock(lane)) + + // commitQCs advanced. + require.Equal(t, roadIdx+1, state.FirstCommitQC()) + }) + + t.Run("corrupt AppQC data returns error", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + // Write garbage data as the "AppQC". + persister, _, err := persist.NewPersister(dir, innerFile) + require.NoError(t, err) + require.NoError(t, persister.Persist([]byte("not a valid protobuf"))) + + _, err = NewState(keys[0], ds, utils.Some(dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "unmarshal persisted AppQC") + }) +} diff --git a/sei-tendermint/internal/autobahn/consensus/inner.go b/sei-tendermint/internal/autobahn/consensus/inner.go index ab1268920c..143d801da7 100644 --- a/sei-tendermint/internal/autobahn/consensus/inner.go +++ b/sei-tendermint/internal/autobahn/consensus/inner.go @@ -1,3 +1,79 @@ +// Persistence for consensus state. +// +// # What We Persist +// +// All consensus state is persisted atomically in a single A/B file pair (inner_a.pb/inner_b.pb): +// - CommitQC: justified entering the current index +// - TimeoutQC: justified entering the current view number +// - PrepareQC: needed for timeoutVote on restart +// - PrepareVote, CommitVote, TimeoutVote: this node's votes for the current view +// +// # Why We Persist +// +// Safety: Votes prevent double-voting on restart - a critical safety property. +// +// Liveness: View justification (QCs) enables fast view synchronization after +// cluster-wide outages. Without persisted QCs, lagging validators would be stuck. +// +// Example failure case without QC persistence: +// - All validators have CommitQC for index 4 +// - Within index 5, validators timeout multiple times (view numbers 0→1→2→3) +// - A, B, C reach view (5, 3) via TimeoutQC for view (5, 2) +// - D, E are slower, only at view (5, 2) via TimeoutQC for view (5, 1) +// - Cluster crashes +// - Without persisted QCs, nodes only have their timeout VOTES +// - A, B, C have timeout votes for (5, 2), D, E have timeout votes for (5, 1) +// - On restart, D, E need TimeoutQC(5, 1) to justify being at (5, 2) +// - But TimeoutQC(5, 1) requires 2/3 votes for view (5, 1) +// - Only D, E have those votes - not enough for quorum +// - D, E are stuck at view (5, 1), cannot advance +// +// With QC persistence: +// - A, B, C have TimeoutQC(5, 2) persisted - justifies view (5, 3) +// - D, E have TimeoutQC(5, 1) persisted - justifies view (5, 2) +// - On restart, everyone rebroadcasts their persisted QCs +// - D, E are already at (5, 2), they broadcast TimeoutQC(5, 1) (helps no one) +// - A, B, C broadcast TimeoutQC(5, 2) +// - D, E receive TimeoutQC(5, 2), can now advance to (5, 3) +// - Everyone converges to the highest view +// +// # stateDir Configuration +// +// At config level, stateDir is an Option[string]. NewState creates a persister when +// stateDir is Some(path). When None, no persister is created (persistence +// disabled - DANGEROUS, may lead to SLASHING if the node restarts and double-votes; +// only use for testing). When Some(path), the path must already exist and be +// writable (verified by writing a temp file at startup); returns error otherwise. +// TODO: surface the None warning in CLI --help (e.g. stream command or config docs). +// +// # Recovery Behavior +// +// - Fresh start (files don't exist): Logged at INFO level, node starts clean +// - Successful restore: Logged at INFO level with state details +// - One file corrupt (e.g. crash during write): Uses the other file; logged at WARN +// - Both files corrupt or unreadable: Returns error to caller +// - Inconsistent state: Returns error to caller with message indicating which field is corrupt +// Examples of inconsistent state: +// - Vote from a future view (how could we vote for a view we haven't reached?) +// - TimeoutQC at index > 0 without CommitQC (how did we advance past index 0?) +// - TimeoutQC at index > CommitQC.Index + 1 (how did we skip intermediate commits?) +// +// # Write Behavior +// +// - State directory must already exist (we do not create it). +// - Writes are synchronous. +// - Writes are idempotent, so retries on next state change are safe +// +// # Rebroadcasting +// +// On restart, the consensus layer propagates loaded state to output watches, +// which triggers rebroadcasting to peers: +// - Votes (prepareVote, commitVote, timeoutVote): YES - rebroadcast via sendUpdates +// - TimeoutQC: YES - rebroadcast via myTimeoutQC watch +// - CommitQC: NO - used locally for view justification but not rebroadcast; +// CommitQCs are served via StreamCommitQCs from the data layer, not from +// the persisted viewSpec. TODO: consider rebroadcasting CommitQC on restart +// to help peers sync faster after cluster-wide outages. package consensus import ( @@ -10,14 +86,14 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) -// Persisted state file prefix. +// Persisted state file prefix for consensus inner state. const innerFile = "inner" type inner struct { persistedInner } -// newInner creates the inner state from persisted data loaded by newPersister. +// newInner creates the inner state from persisted data loaded by NewPersister. // data is None on fresh start (persistence disabled or no prior state). // Returns error if persisted state is corrupt (see persistedInner.validate). func newInner(data utils.Option[[]byte], committee *types.Committee) (inner, error) { diff --git a/sei-tendermint/internal/autobahn/consensus/inner_test.go b/sei-tendermint/internal/autobahn/consensus/inner_test.go index 8b8a604e5e..430df96451 100644 --- a/sei-tendermint/internal/autobahn/consensus/inner_test.go +++ b/sei-tendermint/internal/autobahn/consensus/inner_test.go @@ -1,8 +1,13 @@ package consensus import ( + "context" + "errors" "testing" + "time" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" @@ -23,7 +28,7 @@ func testCommittee(keys ...types.SecretKey) *types.Committee { // seedPersistedInner is a test helper that persists a persistedInner using the public API. func seedPersistedInner(dir string, state *persistedInner) { - p, _, err := newPersister(dir, innerFile) + p, _, err := persist.NewPersister(dir, innerFile) if err != nil { panic(err) } @@ -33,9 +38,9 @@ func seedPersistedInner(dir string, state *persistedInner) { } // loadInner is a test helper that loads persisted data and creates inner. -// Mirrors what NewState does: newPersister → newInner. +// Mirrors what NewState does: NewPersister → newInner. func loadInner(dir string, committee *types.Committee) (inner, error) { - _, data, err := newPersister(dir, innerFile) + _, data, err := persist.NewPersister(dir, innerFile) if err != nil { return inner{}, err } @@ -920,3 +925,34 @@ func TestPushTimeoutQCClearsStaleState(t *testing.T) { require.False(t, newInner.CommitVote.IsPresent(), "commitVote should be cleared") require.False(t, newInner.TimeoutVote.IsPresent(), "timeoutVote should be cleared") } + +// failPersister is a Persister that always returns an error. +type failPersister struct{ err error } + +func (f failPersister) Persist([]byte) error { return f.err } + +func TestRunOutputsPersistErrorPropagates(t *testing.T) { + // Verify that a persist error in runOutputs propagates + // and terminates the consensus component (instead of panicking). + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + cs, err := NewState(&Config{ + Key: keys[0], + ViewTimeout: func(types.View) time.Duration { return time.Hour }, + }, ds) + require.NoError(t, err) + + // Inject a persister that always fails. + wantErr := errors.New("disk on fire") + cs.persister = utils.Some[persist.Persister](failPersister{err: wantErr}) + + // runOutputs should fail on the first Iter callback when it tries to persist. + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + err = cs.runOutputs(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "persist inner") + require.ErrorIs(t, err, wantErr) +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go new file mode 100644 index 0000000000..94d4317639 --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go @@ -0,0 +1,163 @@ +// TODO: Block file persistence is a temporary solution. It does not handle many +// corner cases (e.g. disk full, partial directory listings, orphaned files after +// lane changes, no garbage collection on unclean shutdown). This will be replaced +// by proper storage solution before launch. + +package persist + +import ( + "encoding/hex" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" +) + +// BlockPersister manages individual block files in a blocks/ subdirectory. +// Each block is stored as _.pb. +type BlockPersister struct { + dir string // full path to the blocks/ subdirectory +} + +// NewBlockPersister creates the blocks/ subdirectory if it doesn't exist and +// returns a block persister. Loads all persisted blocks from disk. +func NewBlockPersister(stateDir string) (*BlockPersister, map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal], error) { + dir := filepath.Join(stateDir, "blocks") + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) + } + + bp := &BlockPersister{dir: dir} + blocks, err := bp.loadAll() + if err != nil { + return nil, nil, err + } + return bp, blocks, nil +} + +func blockFilename(lane types.LaneID, n types.BlockNumber) string { + return hex.EncodeToString(lane.Bytes()) + "_" + strconv.FormatUint(uint64(n), 10) + ".pb" +} + +func parseBlockFilename(name string) (types.LaneID, types.BlockNumber, error) { + name = strings.TrimSuffix(name, ".pb") + parts := strings.SplitN(name, "_", 2) + if len(parts) != 2 { + return types.PublicKey{}, 0, fmt.Errorf("bad block filename %q", name) + } + keyBytes, err := hex.DecodeString(parts[0]) + if err != nil { + return types.PublicKey{}, 0, fmt.Errorf("bad lane hex in %q: %w", name, err) + } + lane, err := types.PublicKeyFromBytes(keyBytes) + if err != nil { + return types.PublicKey{}, 0, fmt.Errorf("bad lane key in %q: %w", name, err) + } + n, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return types.PublicKey{}, 0, fmt.Errorf("bad block number in %q: %w", name, err) + } + return lane, types.BlockNumber(n), nil +} + +// PersistBlock writes a signed lane proposal to its own file. +func (bp *BlockPersister) PersistBlock(lane types.LaneID, n types.BlockNumber, proposal *types.Signed[*types.LaneProposal]) error { + pb := types.SignedMsgConv[*types.LaneProposal]().Encode(proposal) + data, err := proto.Marshal(pb) + if err != nil { + return fmt.Errorf("marshal block %s/%d: %w", lane, n, err) + } + path := filepath.Join(bp.dir, blockFilename(lane, n)) + return WriteAndSync(path, data) +} + +// DeleteBefore removes all persisted block files for lanes in the given map +// where the block number is below the map value. Scans the directory once. +// Best-effort: logs warnings on individual failures. +func (bp *BlockPersister) DeleteBefore(laneFirsts map[types.LaneID]types.BlockNumber) { + if len(laneFirsts) == 0 { + return + } + entries, err := os.ReadDir(bp.dir) + if err != nil { + log.Warn().Err(err).Str("dir", bp.dir).Msg("failed to list blocks dir for cleanup") + return + } + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + continue + } + lane, fileN, err := parseBlockFilename(entry.Name()) + if err != nil { + continue + } + first, ok := laneFirsts[lane] + if !ok || fileN >= first { + continue + } + path := filepath.Join(bp.dir, entry.Name()) + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + log.Warn().Err(err).Str("path", path).Msg("failed to delete block file") + } + } +} + +// loadAll loads all persisted blocks from the blocks/ directory. +func (bp *BlockPersister) loadAll() (map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal], error) { + entries, err := os.ReadDir(bp.dir) + if err != nil { + return nil, fmt.Errorf("read blocks dir %s: %w", bp.dir, err) + } + + result := map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + continue + } + lane, n, err := parseBlockFilename(entry.Name()) + if err != nil { + log.Warn().Err(err).Str("file", entry.Name()).Msg("skipping unrecognized block file") + continue + } + proposal, err := loadBlockFile(filepath.Join(bp.dir, entry.Name())) + if err != nil { + // Corrupt or partially-written file (e.g. crash mid-write). + // Skip it — the block will be re-received from peers or re-produced. + log.Warn().Err(err).Str("file", entry.Name()).Msg("skipping corrupt block file") + continue + } + // Verify the block's header matches the filename. + h := proposal.Msg().Block().Header() + if h.Lane() != lane || h.BlockNumber() != n { + log.Warn(). + Str("file", entry.Name()). + Stringer("headerLane", h.Lane()). + Uint64("headerNum", uint64(h.BlockNumber())). + Stringer("filenameLane", lane). + Uint64("filenameNum", uint64(n)). + Msg("skipping block file with mismatched header") + continue + } + if result[lane] == nil { + result[lane] = map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + } + result[lane][n] = proposal + log.Info().Str("lane", lane.String()).Uint64("block", uint64(n)).Msg("loaded persisted block") + } + return result, nil +} + +func loadBlockFile(path string) (*types.Signed[*types.LaneProposal], error) { + data, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename; not user-controlled + if err != nil { + return nil, err + } + conv := types.SignedMsgConv[*types.LaneProposal]() + return conv.Unmarshal(data) +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go new file mode 100644 index 0000000000..5d80015a72 --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go @@ -0,0 +1,235 @@ +package persist + +import ( + "encoding/hex" + "os" + "path/filepath" + "testing" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" +) + +func testSignedProposal(rng utils.Rng, key types.SecretKey, n types.BlockNumber) *types.Signed[*types.LaneProposal] { + lane := key.Public() + block := types.NewBlock(lane, n, types.GenBlockHeaderHash(rng), types.GenPayload(rng)) + return types.Sign(key, types.NewLaneProposal(block)) +} + +func TestNewBlockPersisterEmptyDir(t *testing.T) { + dir := t.TempDir() + bp, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.NotNil(t, bp) + require.Equal(t, 0, len(blocks)) + // blocks/ subdirectory should exist + fi, err := os.Stat(filepath.Join(dir, "blocks")) + require.NoError(t, err) + require.True(t, fi.IsDir()) +} + +func TestPersistBlockAndLoad(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + b0 := testSignedProposal(rng, key, 0) + b1 := testSignedProposal(rng, key, 1) + require.NoError(t, bp.PersistBlock(lane, 0, b0)) + require.NoError(t, bp.PersistBlock(lane, 1, b1)) + + // Reload from disk + bp2, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.NotNil(t, bp2) + require.Equal(t, 1, len(blocks), "should have 1 lane") + require.Equal(t, 2, len(blocks[lane]), "should have 2 blocks") + require.NoError(t, utils.TestDiff(b0, blocks[lane][0])) + require.NoError(t, utils.TestDiff(b1, blocks[lane][1])) +} + +func TestPersistBlockMultipleLanes(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + b1 := testSignedProposal(rng, key1, 0) + b2 := testSignedProposal(rng, key2, 0) + require.NoError(t, bp.PersistBlock(lane1, 0, b1)) + require.NoError(t, bp.PersistBlock(lane2, 0, b2)) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 2, len(blocks), "should have 2 lanes") + require.NoError(t, utils.TestDiff(b1, blocks[lane1][0])) + require.NoError(t, utils.TestDiff(b2, blocks[lane2][0])) +} + +func TestLoadSkipsCorruptBlockFile(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Write a good block + b0 := testSignedProposal(rng, key, 0) + require.NoError(t, bp.PersistBlock(lane, 0, b0)) + + // Write a corrupt file with a valid filename + corruptName := blockFilename(lane, 1) + require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + + // Reload — should load b0 and skip the corrupt one + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane]), "should only load the valid block") + require.NoError(t, utils.TestDiff(b0, blocks[lane][0])) +} + +func TestLoadSkipsMismatchedHeader(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Write block for lane1 but save it under lane2's filename + b := testSignedProposal(rng, key1, 5) + require.NoError(t, bp.PersistBlock(lane1, 5, b)) + + // Rename the file to use lane2 in the filename + oldPath := filepath.Join(dir, "blocks", blockFilename(lane1, 5)) + newPath := filepath.Join(dir, "blocks", blockFilename(lane2, 5)) + require.NoError(t, os.Rename(oldPath, newPath)) + + // Reload — should skip the mismatched file + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 0, len(blocks), "mismatched header should be skipped") +} + +func TestLoadSkipsUnrecognizedFilename(t *testing.T) { + dir := t.TempDir() + + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + _ = bp + + // Write files with bad names + blocksDir := filepath.Join(dir, "blocks") + require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "notablock.pb"), []byte("data"), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "readme.txt"), []byte("hi"), 0600)) + + // Reload — should skip both + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 0, len(blocks)) +} + +func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Persist blocks 0..4 + for i := types.BlockNumber(0); i < 5; i++ { + require.NoError(t, bp.PersistBlock(lane, i, testSignedProposal(rng, key, i))) + } + + // Delete blocks before 3 + bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3}) + + // Reload and verify only 3, 4 remain + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") + _, has0 := blocks[lane][0] + _, has1 := blocks[lane][1] + _, has2 := blocks[lane][2] + _, has3 := blocks[lane][3] + _, has4 := blocks[lane][4] + require.False(t, has0) + require.False(t, has1) + require.False(t, has2) + require.True(t, has3) + require.True(t, has4) +} + +func TestDeleteBeforeMultipleLanes(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Lane1: blocks 0,1,2; Lane2: blocks 0,1,2 + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(lane1, i, testSignedProposal(rng, key1, i))) + require.NoError(t, bp.PersistBlock(lane2, i, testSignedProposal(rng, key2, i))) + } + + // Delete lane1 < 2, lane2 < 1 + bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 2, lane2: 1}) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") + require.Equal(t, 2, len(blocks[lane2]), "lane2 should have blocks 1,2") +} + +func TestDeleteBeforeEmptyMap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + require.NoError(t, bp.PersistBlock(lane, 0, testSignedProposal(rng, key, 0))) + + // Empty map — should not delete anything + bp.DeleteBefore(map[types.LaneID]types.BlockNumber{}) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane])) +} + +func TestBlockFilenameRoundTrip(t *testing.T) { + rng := utils.TestRng() + lane := types.GenSecretKey(rng).Public() + n := types.BlockNumber(42) + + name := blockFilename(lane, n) + parsedLane, parsedN, err := parseBlockFilename(name) + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(lane.Bytes()), hex.EncodeToString(parsedLane.Bytes())) + require.Equal(t, n, parsedN) +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist.go b/sei-tendermint/internal/autobahn/consensus/persist/persist.go similarity index 87% rename from sei-tendermint/internal/autobahn/consensus/persist.go rename to sei-tendermint/internal/autobahn/consensus/persist/persist.go index 16e72c693c..9a178607b8 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/persist.go @@ -1,14 +1,5 @@ // Crash-safe A/B file persistence. // -// # stateDir Configuration -// -// At config level, stateDir is an Option[string]. NewState creates a persister when -// stateDir is Some(path). When None, no persister is created (persistence -// disabled — DANGEROUS, may lead to SLASHING if the node restarts and double-votes; -// only use for testing). When Some(path), the path must already exist and be -// writable (verified by writing a temp file at startup); returns error otherwise. -// TODO: surface the None warning in CLI --help (e.g. stream command or config docs). -// // # A/B File Strategy // // We use an A/B file pair (_a.pb/_b.pb) instead of the traditional @@ -40,7 +31,7 @@ // - Writes are synchronous (fsync after each write). // - Writes are idempotent, so retries on next state change are safe. // - Seq is only advanced after a successful write (rollback on failure). -package consensus +package persist import ( "errors" @@ -55,9 +46,10 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) +// A/B file suffixes. const ( - suffixA = "_a.pb" - suffixB = "_b.pb" + SuffixA = "_a.pb" + SuffixB = "_b.pb" ) // ErrNoData is returned by loadPersisted when no persisted files exist for the prefix. @@ -84,11 +76,11 @@ type persister struct { seq uint64 } -// newPersister creates a crash-safe persister for the given directory and prefix. +// NewPersister creates a crash-safe persister for the given directory and prefix. // dir must already exist and be a directory (we do not create it); returns error otherwise. -// Also returns the loaded data (None on fresh start) for the caller to pass to newInner. +// Also returns the loaded data (None on fresh start) for the caller to decode. // This encapsulates all on-disk format details (A/B files, seq wrapper) in one place. -func newPersister(dir string, prefix string) (*persister, utils.Option[[]byte], error) { +func NewPersister(dir string, prefix string) (Persister, utils.Option[[]byte], error) { none := utils.None[[]byte]() fi, err := os.Stat(dir) @@ -116,7 +108,7 @@ func newPersister(dir string, prefix string) (*persister, utils.Option[[]byte], // Ensure both A/B files exist and are writable so Persist never creates new // directory entries. Empty files are treated as non-existent by loadWrapped, // so they won't interfere with loading on restart. - for _, suffix := range []string{suffixA, suffixB} { + for _, suffix := range []string{SuffixA, SuffixB} { path := filepath.Join(dir, prefix+suffix) f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled if err != nil { @@ -153,9 +145,9 @@ func (w *persister) Persist(data []byte) error { seq := w.seq + 1 // Odd seq → A, even seq → B. - suffix := suffixB + suffix := SuffixB if seq%2 == 1 { - suffix = suffixA + suffix = SuffixA } filename := w.prefix + suffix @@ -168,7 +160,7 @@ func (w *persister) Persist(data []byte) error { return fmt.Errorf("marshal wrapper: %w", err) } - if err := writeAndSync(w.dir, filename, bz); err != nil { + if err := WriteAndSync(filepath.Join(w.dir, filename), bz); err != nil { return fmt.Errorf("persist to %s: %w", filename, err) } w.seq = seq @@ -192,7 +184,7 @@ func loadWrapped(stateDir, filename string) (*pb.PersistedWrapper, error) { return nil, fmt.Errorf("read %s: %w", filename, err) } // Treat empty files as non-existent. A valid wrapper must contain at least - // a seq number. Empty files are created by newPersister to pre-populate + // a seq number. Empty files are created by NewPersister to pre-populate // directory entries so that Persist never needs to dir-sync. if len(bz) == 0 { return nil, os.ErrNotExist @@ -209,7 +201,7 @@ func loadWrapped(stateDir, filename string) (*pb.PersistedWrapper, error) { // so the validator can restart. Returns ErrNoData when no persisted files exist (use errors.Is). // Returns other error only when both files fail to load or state is inconsistent (same seq). func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) { - fileA, fileB := prefix+suffixA, prefix+suffixB + fileA, fileB := prefix+SuffixA, prefix+SuffixB wrapperA, errA := loadWrapped(dir, fileA) wrapperB, errB := loadWrapped(dir, fileB) @@ -254,10 +246,9 @@ func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) { } } -// writeAndSync writes data to file and fsyncs. No dir sync needed because -// newPersister pre-creates both A/B files at startup. -func writeAndSync(stateDir string, filename string, data []byte) error { - path := filepath.Join(stateDir, filename) +// WriteAndSync writes data to a file path and fsyncs. No dir sync needed because +// NewPersister pre-creates both A/B files at startup. +func WriteAndSync(path string, data []byte) error { f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled if err != nil { return err diff --git a/sei-tendermint/internal/autobahn/consensus/persist_test.go b/sei-tendermint/internal/autobahn/consensus/persist/persist_test.go similarity index 75% rename from sei-tendermint/internal/autobahn/consensus/persist_test.go rename to sei-tendermint/internal/autobahn/consensus/persist/persist_test.go index 1e34bc0acb..25b558188c 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/persist_test.go @@ -1,29 +1,24 @@ -package consensus +package persist import ( - "context" "errors" "os" "path/filepath" "runtime" "testing" - "time" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" ) func TestPersisterAlternates(t *testing.T) { dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) - // Both files should be pre-created (empty) by newPersister for dir-sync optimization. - _, errA := os.Stat(filepath.Join(dir, "test"+suffixA)) - _, errB := os.Stat(filepath.Join(dir, "test"+suffixB)) + // Both files should be pre-created (empty) by NewPersister for dir-sync optimization. + _, errA := os.Stat(filepath.Join(dir, "test"+SuffixA)) + _, errB := os.Stat(filepath.Join(dir, "test"+SuffixB)) require.NoError(t, errA, "A should be pre-created") require.NoError(t, errB, "B should be pre-created") @@ -45,7 +40,7 @@ func TestPersisterAlternates(t *testing.T) { func TestPersisterPicksHigherSeq(t *testing.T) { dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) // Write three times: A(seq=1), B(seq=2), A(seq=3) @@ -63,13 +58,13 @@ func TestLoadPersistedOneCorruptFileSucceeds(t *testing.T) { dir := t.TempDir() // Write to both files: A(seq=1), B(seq=2) - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("first"))) // seq=1, A require.NoError(t, w.Persist([]byte("second"))) // seq=2, B // Corrupt B (the winner) — should fall back to A - err = os.WriteFile(filepath.Join(dir, "test"+suffixB), []byte("corrupt"), 0600) + err = os.WriteFile(filepath.Join(dir, "test"+SuffixB), []byte("corrupt"), 0600) require.NoError(t, err) wrapper, err := loadPersisted(dir, "test") @@ -81,12 +76,12 @@ func TestLoadPersistedBothCorruptError(t *testing.T) { dir := t.TempDir() // Write valid wrapped data to A only - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("valid"))) // seq=1, A // Corrupt A (B is empty, treated as non-existent) — both files fail - err = os.WriteFile(filepath.Join(dir, "test"+suffixA), []byte("corrupt"), 0600) + err = os.WriteFile(filepath.Join(dir, "test"+SuffixA), []byte("corrupt"), 0600) require.NoError(t, err) _, err = loadPersisted(dir, "test") @@ -98,16 +93,16 @@ func TestNewPersisterOneCorruptFileSucceeds(t *testing.T) { dir := t.TempDir() // Write to both files: A(seq=1), B(seq=2) - w1, _, err := newPersister(dir, "test") + w1, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w1.Persist([]byte("first"))) // seq=1, A require.NoError(t, w1.Persist([]byte("second"))) // seq=2, B - // Corrupt B (the winner) — newPersister should still succeed using A - err = os.WriteFile(filepath.Join(dir, "test"+suffixB), []byte("corrupt"), 0600) + // Corrupt B (the winner) — NewPersister should still succeed using A + err = os.WriteFile(filepath.Join(dir, "test"+SuffixB), []byte("corrupt"), 0600) require.NoError(t, err) - w2, _, err := newPersister(dir, "test") + w2, _, err := NewPersister(dir, "test") require.NoError(t, err) // A won (seq=1), so next write goes to B (the corrupt/loser slot) @@ -128,7 +123,7 @@ func TestLoadPersistedEmptyDir(t *testing.T) { func TestNewPersisterInvalidDirError(t *testing.T) { // State dir must already exist; invalid (nonexistent or not a directory) returns error - _, _, err := newPersister("/nonexistent/path/that/does/not/exist", "test") + _, _, err := NewPersister("/nonexistent/path/that/does/not/exist", "test") require.Error(t, err) require.Contains(t, err.Error(), "invalid state dir") } @@ -140,12 +135,12 @@ func TestPersistWriteErrorReturnsError(t *testing.T) { t.Skip("chmod 000 on directory not reliable on Windows") } dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("data1"))) // Remove all permissions from dir so OpenFile fails with EACCES require.NoError(t, os.Chmod(dir, 0000)) - defer os.Chmod(dir, 0700) + defer os.Chmod(dir, 0700) //nolint:errcheck err = w.Persist([]byte("data2")) require.Error(t, err) } @@ -158,7 +153,7 @@ func TestPersistWriteErrorDoesNotAdvanceSeq(t *testing.T) { t.Skip("chmod 000 on directory not reliable on Windows") } dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) // Successful write: seq=1 → A @@ -191,15 +186,15 @@ func TestLoadPersistedOSErrorPropagates(t *testing.T) { dir := t.TempDir() // Write to both files: A(seq=1), B(seq=2) - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("first"))) // seq=1, A require.NoError(t, w.Persist([]byte("second"))) // seq=2, B // Make B unreadable (OS error, not corrupt data) - pathB := filepath.Join(dir, "test"+suffixB) + pathB := filepath.Join(dir, "test"+SuffixB) require.NoError(t, os.Chmod(pathB, 0000)) - defer os.Chmod(pathB, 0600) + defer os.Chmod(pathB, 0600) //nolint:errcheck // loadPersisted should fail — not silently fall back to A _, err = loadPersisted(dir, "test") @@ -213,13 +208,13 @@ func TestLoadPersistedCorruptFallsBack(t *testing.T) { // fall back to the other file — this is the crash recovery path. dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("first"))) // seq=1, A require.NoError(t, w.Persist([]byte("second"))) // seq=2, B // Corrupt B (simulates crash mid-write) - err = os.WriteFile(filepath.Join(dir, "test"+suffixB), []byte("garbage"), 0600) + err = os.WriteFile(filepath.Join(dir, "test"+SuffixB), []byte("garbage"), 0600) require.NoError(t, err) // Should succeed using A, since B's error is ErrCorrupt (tolerable) @@ -232,7 +227,7 @@ func TestNewPersisterResumeSeq(t *testing.T) { dir := t.TempDir() // Create persister and write some data - w1, _, err := newPersister(dir, "test") + w1, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w1.Persist([]byte("data1"))) // seq=1, A require.NoError(t, w1.Persist([]byte("data2"))) // seq=2, B @@ -240,7 +235,7 @@ func TestNewPersisterResumeSeq(t *testing.T) { // Create new persister (simulates restart) // A has seq=3 (winner), so new persister should write to B first to preserve A - w2, _, err := newPersister(dir, "test") + w2, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w2.Persist([]byte("data4"))) // seq=4, B (preserves A) @@ -254,13 +249,13 @@ func TestNewPersisterPreservesWinner(t *testing.T) { dir := t.TempDir() // Write to both files: A=seq1, B=seq2 (B wins) - w1, _, err := newPersister(dir, "test") + w1, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w1.Persist([]byte("old"))) // seq=1, A require.NoError(t, w1.Persist([]byte("winner"))) // seq=2, B (winner) // New persister should write to A first (preserve B) - w2, _, err := newPersister(dir, "test") + w2, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w2.Persist([]byte("new"))) // seq=3, A (preserves B) @@ -269,34 +264,3 @@ func TestNewPersisterPreservesWinner(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("new"), wrapper.GetData()) } - -// failPersister is a Persister that always returns an error. -type failPersister struct{ err error } - -func (f failPersister) Persist([]byte) error { return f.err } - -func TestRunOutputsPersistErrorPropagates(t *testing.T) { - // Verify that a persist error in runOutputs propagates - // and terminates the consensus component (instead of panicking). - rng := utils.TestRng() - committee, keys := types.GenCommittee(rng, 4) - ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) - - cs, err := NewState(&Config{ - Key: keys[0], - ViewTimeout: func(types.View) time.Duration { return time.Hour }, - }, ds) - require.NoError(t, err) - - // Inject a persister that always fails. - wantErr := errors.New("disk on fire") - cs.persister = utils.Some[Persister](failPersister{err: wantErr}) - - // runOutputs should fail on the first Iter callback when it tries to persist. - ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) - defer cancel() - err = cs.runOutputs(ctx) - require.Error(t, err) - require.Contains(t, err.Error(), "persist inner") - require.ErrorIs(t, err, wantErr) -} diff --git a/sei-tendermint/internal/autobahn/consensus/state.go b/sei-tendermint/internal/autobahn/consensus/state.go index a2e0930d8f..26290f07e9 100644 --- a/sei-tendermint/internal/autobahn/consensus/state.go +++ b/sei-tendermint/internal/autobahn/consensus/state.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/avail" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -44,7 +45,7 @@ type State struct { innerRecv utils.AtomicRecv[inner] // persister writes inner's persistedInner to disk when PersistentStateDir is set; None when disabled. - persister utils.Option[Persister] + persister utils.Option[persist.Persister] timeoutVotes utils.Mutex[*timeoutVotes] prepareVotes utils.Mutex[*prepareVotes] @@ -79,14 +80,14 @@ func (s *State) SubscribeTimeoutQC() utils.AtomicRecv[utils.Option[*types.Timeou func NewState(cfg *Config, data *data.State) (*State, error) { // Create persister first so newInner can receive the loaded data // instead of reading the files directly. - var pers utils.Option[Persister] + var pers utils.Option[persist.Persister] var persistedData utils.Option[[]byte] if dir, ok := cfg.PersistentStateDir.Get(); ok { - p, d, err := newPersister(dir, innerFile) + p, d, err := persist.NewPersister(dir, innerFile) if err != nil { - return nil, fmt.Errorf("newPersister: %w", err) + return nil, fmt.Errorf("NewPersister: %w", err) } - pers = utils.Some[Persister](p) + pers = utils.Some[persist.Persister](p) persistedData = d } @@ -95,11 +96,16 @@ func NewState(cfg *Config, data *data.State) (*State, error) { return nil, fmt.Errorf("newInner: %w", err) } + availState, err := avail.NewState(cfg.Key, data, cfg.PersistentStateDir) + if err != nil { + return nil, fmt.Errorf("avail.NewState: %w", err) + } + innerSend := utils.Alloc(utils.NewAtomicSend(initialInner)) s := &State{ cfg: cfg, // metrics: NewMetrics(), - avail: avail.NewState(cfg.Key, data), + avail: availState, inner: utils.NewMutex(innerSend), innerRecv: innerSend.Subscribe(), persister: pers, From 70713842ab5d19227be8aee010f68c217eb204fa Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 17 Feb 2026 17:50:27 -0800 Subject: [PATCH 2/9] Refactor: extract loadPersistedState from NewState Move persisted data loading (AppQC deserialization and block loading) into a dedicated function for readability. Co-authored-by: Cursor --- .../internal/autobahn/avail/state.go | 63 ++++++++++--------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 52986566d0..1c3566dbad 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -51,6 +51,34 @@ type State struct { // innerFile is the A/B file prefix for avail inner state persistence. const innerFile = "avail_inner" +// loadPersistedState loads persisted avail state from disk and creates persisters for ongoing writes. +func loadPersistedState(dir string) (*loadedAvailState, persist.Persister, *persist.BlockPersister, error) { + persister, persistedData, err := persist.NewPersister(dir, innerFile) + if err != nil { + return nil, nil, nil, fmt.Errorf("NewPersister %s: %w", innerFile, err) + } + + var appQC utils.Option[*types.AppQC] + if bz, ok := persistedData.Get(); ok { + qc, err := types.AppQCConv.Unmarshal(bz) + if err != nil { + return nil, nil, nil, fmt.Errorf("unmarshal persisted AppQC: %w", err) + } + log.Info(). + Uint64("roadIndex", uint64(qc.Proposal().RoadIndex())). + Uint64("globalNumber", uint64(qc.Proposal().GlobalNumber())). + Msg("loaded persisted AppQC") + appQC = utils.Some(qc) + } + + bp, blocks, err := persist.NewBlockPersister(dir) + if err != nil { + return nil, nil, nil, fmt.Errorf("NewBlockPersister: %w", err) + } + + return &loadedAvailState{appQC: appQC, blocks: blocks}, persister, bp, nil +} + // NewState constructs a new availability state. // stateDir is None when persistence is disabled (testing only). func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[string]) (*State, error) { @@ -59,38 +87,13 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin var bp *persist.BlockPersister if dir, ok := stateDir.Get(); ok { - // Create A/B persister for inner state. - persister, persistedData, err := persist.NewPersister(dir, innerFile) - if err != nil { - return nil, fmt.Errorf("NewPersister %s: %w", innerFile, err) - } - p = utils.Some[persist.Persister](persister) - - // Decode persisted AppQC. - var appQC utils.Option[*types.AppQC] - if bz, ok := persistedData.Get(); ok { - qc, err := types.AppQCConv.Unmarshal(bz) - if err != nil { - return nil, fmt.Errorf("unmarshal persisted AppQC: %w", err) - } - log.Info(). - Uint64("roadIndex", uint64(qc.Proposal().RoadIndex())). - Uint64("globalNumber", uint64(qc.Proposal().GlobalNumber())). - Msg("loaded persisted AppQC") - appQC = utils.Some(qc) - } - - // Load persisted blocks from the blocks/ subdirectory. - var blocks map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal] - bp, blocks, err = persist.NewBlockPersister(dir) + var persister persist.Persister + var err error + loaded, persister, bp, err = loadPersistedState(dir) if err != nil { - return nil, fmt.Errorf("NewBlockPersister: %w", err) - } - - loaded = &loadedAvailState{ - appQC: appQC, - blocks: blocks, + return nil, err } + p = utils.Some(persister) } return &State{ From 5444e65b9bbd29f8cca605b73a201745bd29b9fc Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 17 Feb 2026 20:18:59 -0800 Subject: [PATCH 3/9] Refactor: move block sorting and gap handling into persist layer Move block sorting, contiguous-prefix extraction, and gap truncation from avail/inner.go into persist/blocks.go so all disk-recovery logic lives in one place. This isolates storage concerns in the persistence layer, simplifying newInner and preparing for a future storage backend swap. Co-authored-by: Cursor --- .../internal/autobahn/avail/inner.go | 38 +++----- .../internal/autobahn/avail/inner_test.go | 90 +++++++------------ .../internal/autobahn/avail/state.go | 1 - .../autobahn/consensus/persist/blocks.go | 43 +++++++-- .../autobahn/consensus/persist/blocks_test.go | 78 ++++++++++++---- 5 files changed, 139 insertions(+), 111 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index 47f3a4e027..63a48e588f 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -3,6 +3,7 @@ package avail import ( "fmt" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) @@ -18,9 +19,10 @@ type inner struct { // loadedAvailState holds data loaded from disk on restart. // nil means fresh start (no persisted data). +// blocks are sorted by number and contiguous (gaps already resolved by loader). type loadedAvailState struct { appQC utils.Option[*types.AppQC] - blocks map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal] + blocks map[types.LaneID][]persist.LoadedBlock } func newInner(c *types.Committee, loaded *loadedAvailState) *inner { @@ -58,39 +60,21 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { // Restore persisted blocks into their lane queues. for lane, bs := range loaded.blocks { q, ok := i.blocks[lane] - if !ok { - continue // skip blocks for unknown lanes - } - if len(bs) == 0 { + if !ok || len(bs) == 0 { continue } - // Find the minimum block number. - first := true - var minN types.BlockNumber - for n := range bs { - if first || n < minN { - minN = n - } - first = false - } - // Load contiguous blocks starting from minN. Stop at the first gap - // (e.g. a corrupt file that was skipped during load). Blocks after - // the gap will be re-fetched from peers. - q.first = minN - q.next = minN - for { - b, ok := bs[q.next] - if !ok { - break - } - q.q[q.next] = b + first := bs[0].Number + q.first = first + q.next = first + for _, b := range bs { + q.q[q.next] = b.Proposal q.next++ } // Advance the votes queue to match so headers() returns ErrPruned // for already-committed blocks instead of blocking forever. vq := i.votes[lane] - vq.first = minN - vq.next = minN + vq.first = first + vq.next = first } return i diff --git a/sei-tendermint/internal/autobahn/avail/inner_test.go b/sei-tendermint/internal/autobahn/avail/inner_test.go index 5272d233f7..e6bf2455de 100644 --- a/sei-tendermint/internal/autobahn/avail/inner_test.go +++ b/sei-tendermint/internal/autobahn/avail/inner_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -97,8 +98,7 @@ func TestNewInnerLoadedAppQCAdvancesQueues(t *testing.T) { appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) loaded := &loadedAvailState{ - appQC: utils.Some[*types.AppQC](appQC), - blocks: nil, + appQC: utils.Some[*types.AppQC](appQC), } i := newInner(committee, loaded) @@ -123,8 +123,7 @@ func TestNewInnerLoadedAppQCNone(t *testing.T) { committee, _ := types.GenCommittee(rng, 4) loaded := &loadedAvailState{ - appQC: utils.None[*types.AppQC](), - blocks: nil, + appQC: utils.None[*types.AppQC](), } i := newInner(committee, loaded) @@ -142,16 +141,16 @@ func TestNewInnerLoadedBlocksContiguous(t *testing.T) { // Build 3 contiguous blocks: 5, 6, 7. var parent types.BlockHeaderHash - bs := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + var bs []persist.LoadedBlock for n := types.BlockNumber(5); n < 8; n++ { b := testSignedBlock(keys[0], lane, n, parent, rng) parent = b.Msg().Block().Header().Hash() - bs[n] = b + bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b}) } loaded := &loadedAvailState{ appQC: utils.None[*types.AppQC](), - blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{lane: bs}, + blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } i := newInner(committee, loaded) @@ -159,8 +158,8 @@ func TestNewInnerLoadedBlocksContiguous(t *testing.T) { q := i.blocks[lane] require.Equal(t, types.BlockNumber(5), q.first) require.Equal(t, types.BlockNumber(8), q.next) - for n := types.BlockNumber(5); n < 8; n++ { - require.Equal(t, bs[n], q.q[n]) + for j, b := range bs { + require.Equal(t, b.Proposal, q.q[types.BlockNumber(5)+types.BlockNumber(j)]) } // Votes queue should be aligned. @@ -169,52 +168,46 @@ func TestNewInnerLoadedBlocksContiguous(t *testing.T) { require.Equal(t, types.BlockNumber(5), vq.next) } -func TestNewInnerLoadedBlocksWithGap(t *testing.T) { +func TestNewInnerLoadedBlocksContiguousPrefix(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) lane := keys[0].Public() - // Blocks 3, 4, 6 (gap at 5). + // Loader already resolved the gap: only contiguous prefix [3, 4] is passed. var parent types.BlockHeaderHash - bs := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} - for _, n := range []types.BlockNumber{3, 4, 6} { + var bs []persist.LoadedBlock + for _, n := range []types.BlockNumber{3, 4} { b := testSignedBlock(keys[0], lane, n, parent, rng) parent = b.Msg().Block().Header().Hash() - bs[n] = b + bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b}) } loaded := &loadedAvailState{ appQC: utils.None[*types.AppQC](), - blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{lane: bs}, + blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } i := newInner(committee, loaded) - // Should load only 3, 4 (stop at gap before 5). q := i.blocks[lane] require.Equal(t, types.BlockNumber(3), q.first) require.Equal(t, types.BlockNumber(5), q.next) - require.Equal(t, bs[3], q.q[types.BlockNumber(3)]) - require.Equal(t, bs[4], q.q[types.BlockNumber(4)]) - _, has6 := q.q[types.BlockNumber(6)] - require.False(t, has6, "block 6 should not be loaded (after gap)") + require.Equal(t, bs[0].Proposal, q.q[types.BlockNumber(3)]) + require.Equal(t, bs[1].Proposal, q.q[types.BlockNumber(4)]) } -func TestNewInnerLoadedBlocksEmptyMap(t *testing.T) { +func TestNewInnerLoadedBlocksEmptySlice(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) lane := keys[0].Public() loaded := &loadedAvailState{ - appQC: utils.None[*types.AppQC](), - blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ - lane: {}, - }, + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{lane: {}}, } i := newInner(committee, loaded) - // Empty block map should leave queue at 0. q := i.blocks[lane] require.Equal(t, types.BlockNumber(0), q.first) require.Equal(t, types.BlockNumber(0), q.next) @@ -224,27 +217,23 @@ func TestNewInnerLoadedBlocksUnknownLane(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) - // Create a lane that doesn't belong to the committee. unknownKey := types.GenSecretKey(rng) unknownLane := unknownKey.Public() b := testSignedBlock(unknownKey, unknownLane, 0, types.BlockHeaderHash{}, rng) loaded := &loadedAvailState{ - appQC: utils.None[*types.AppQC](), - blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ - unknownLane: {0: b}, - }, + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{unknownLane: {{Number: 0, Proposal: b}}}, } i := newInner(committee, loaded) - // Unknown lane should be silently skipped; known lanes unaffected. for _, lane := range committee.Lanes().All() { q := i.blocks[lane] require.Equal(t, types.BlockNumber(0), q.first) require.Equal(t, types.BlockNumber(0), q.next) } - _ = keys // suppress unused + _ = keys } func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) { @@ -257,39 +246,32 @@ func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) { appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) - // Build 2 contiguous blocks: 7, 8. var parent types.BlockHeaderHash - bs := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + var bs []persist.LoadedBlock for n := types.BlockNumber(7); n < 9; n++ { b := testSignedBlock(keys[0], lane, n, parent, rng) parent = b.Msg().Block().Header().Hash() - bs[n] = b + bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b}) } loaded := &loadedAvailState{ - appQC: utils.Some[*types.AppQC](appQC), - blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ - lane: bs, - }, + appQC: utils.Some[*types.AppQC](appQC), + blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } i := newInner(committee, loaded) - // AppQC should be restored. aq, ok := i.latestAppQC.Get() require.True(t, ok) require.Equal(t, roadIdx, aq.Proposal().RoadIndex()) - // Queues advanced by AppQC. require.Equal(t, roadIdx+1, i.commitQCs.first) require.Equal(t, globalNum+1, i.appVotes.first) - // Blocks restored. q := i.blocks[lane] require.Equal(t, types.BlockNumber(7), q.first) require.Equal(t, types.BlockNumber(9), q.next) - // Votes aligned. vq := i.votes[lane] require.Equal(t, types.BlockNumber(7), vq.first) require.Equal(t, types.BlockNumber(7), vq.next) @@ -301,45 +283,37 @@ func TestNewInnerLoadedBlocksMultipleLanes(t *testing.T) { lane0 := keys[0].Public() lane1 := keys[1].Public() - // Lane 0: blocks 2, 3. - bs0 := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} var parent0 types.BlockHeaderHash + var bs0 []persist.LoadedBlock for n := types.BlockNumber(2); n < 4; n++ { b := testSignedBlock(keys[0], lane0, n, parent0, rng) parent0 = b.Msg().Block().Header().Hash() - bs0[n] = b + bs0 = append(bs0, persist.LoadedBlock{Number: n, Proposal: b}) } - // Lane 1: blocks 0, 1, 2. - bs1 := map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} var parent1 types.BlockHeaderHash + var bs1 []persist.LoadedBlock for n := types.BlockNumber(0); n < 3; n++ { b := testSignedBlock(keys[1], lane1, n, parent1, rng) parent1 = b.Msg().Block().Header().Hash() - bs1[n] = b + bs1 = append(bs1, persist.LoadedBlock{Number: n, Proposal: b}) } loaded := &loadedAvailState{ - appQC: utils.None[*types.AppQC](), - blocks: map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{ - lane0: bs0, - lane1: bs1, - }, + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{lane0: bs0, lane1: bs1}, } i := newInner(committee, loaded) - // Lane 0. q0 := i.blocks[lane0] require.Equal(t, types.BlockNumber(2), q0.first) require.Equal(t, types.BlockNumber(4), q0.next) - // Lane 1. q1 := i.blocks[lane1] require.Equal(t, types.BlockNumber(0), q1.first) require.Equal(t, types.BlockNumber(3), q1.next) - // Votes aligned per lane. require.Equal(t, types.BlockNumber(2), i.votes[lane0].first) require.Equal(t, types.BlockNumber(0), i.votes[lane1].first) } diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 1c3566dbad..5e7e51f60d 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go index 94d4317639..c90b9edbe4 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go @@ -8,8 +8,10 @@ package persist import ( "encoding/hex" "fmt" + "maps" "os" "path/filepath" + "slices" "strconv" "strings" @@ -19,6 +21,12 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" ) +// LoadedBlock is a block loaded from disk during state restoration. +type LoadedBlock struct { + Number types.BlockNumber + Proposal *types.Signed[*types.LaneProposal] +} + // BlockPersister manages individual block files in a blocks/ subdirectory. // Each block is stored as _.pb. type BlockPersister struct { @@ -26,8 +34,10 @@ type BlockPersister struct { } // NewBlockPersister creates the blocks/ subdirectory if it doesn't exist and -// returns a block persister. Loads all persisted blocks from disk. -func NewBlockPersister(stateDir string) (*BlockPersister, map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal], error) { +// returns a block persister. Loads all persisted blocks from disk as sorted, +// contiguous slices per lane. Gaps from corrupt or missing files are resolved +// by truncating at the first gap; blocks after the gap will be re-fetched. +func NewBlockPersister(stateDir string) (*BlockPersister, map[types.LaneID][]LoadedBlock, error) { dir := filepath.Join(stateDir, "blocks") if err := os.MkdirAll(dir, 0700); err != nil { return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) @@ -109,13 +119,14 @@ func (bp *BlockPersister) DeleteBefore(laneFirsts map[types.LaneID]types.BlockNu } // loadAll loads all persisted blocks from the blocks/ directory. -func (bp *BlockPersister) loadAll() (map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal], error) { +// Returns sorted, contiguous slices per lane (truncated at the first gap). +func (bp *BlockPersister) loadAll() (map[types.LaneID][]LoadedBlock, error) { entries, err := os.ReadDir(bp.dir) if err != nil { return nil, fmt.Errorf("read blocks dir %s: %w", bp.dir, err) } - result := map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + raw := map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} for _, entry := range entries { if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { continue @@ -144,12 +155,30 @@ func (bp *BlockPersister) loadAll() (map[types.LaneID]map[types.BlockNumber]*typ Msg("skipping block file with mismatched header") continue } - if result[lane] == nil { - result[lane] = map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + if raw[lane] == nil { + raw[lane] = map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} } - result[lane][n] = proposal + raw[lane][n] = proposal log.Info().Str("lane", lane.String()).Uint64("block", uint64(n)).Msg("loaded persisted block") } + + result := map[types.LaneID][]LoadedBlock{} + for lane, bs := range raw { + sorted := slices.Sorted(maps.Keys(bs)) + var contiguous []LoadedBlock + for i, n := range sorted { + if i > 0 && n != sorted[i-1]+1 { + log.Warn(). + Str("lane", lane.String()). + Uint64("gapAt", uint64(sorted[i-1]+1)). + Int("skipped", len(sorted)-i). + Msg("truncating loaded blocks at gap; remaining will be re-fetched") + break + } + contiguous = append(contiguous, LoadedBlock{Number: n, Proposal: bs[n]}) + } + result[lane] = contiguous + } return result, nil } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go index 5d80015a72..1cb58415de 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go @@ -43,14 +43,15 @@ func TestPersistBlockAndLoad(t *testing.T) { require.NoError(t, bp.PersistBlock(lane, 0, b0)) require.NoError(t, bp.PersistBlock(lane, 1, b1)) - // Reload from disk bp2, blocks, err := NewBlockPersister(dir) require.NoError(t, err) require.NotNil(t, bp2) require.Equal(t, 1, len(blocks), "should have 1 lane") require.Equal(t, 2, len(blocks[lane]), "should have 2 blocks") - require.NoError(t, utils.TestDiff(b0, blocks[lane][0])) - require.NoError(t, utils.TestDiff(b1, blocks[lane][1])) + require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(1), blocks[lane][1].Number) + require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) + require.NoError(t, utils.TestDiff(b1, blocks[lane][1].Proposal)) } func TestPersistBlockMultipleLanes(t *testing.T) { @@ -72,8 +73,10 @@ func TestPersistBlockMultipleLanes(t *testing.T) { _, blocks, err := NewBlockPersister(dir) require.NoError(t, err) require.Equal(t, 2, len(blocks), "should have 2 lanes") - require.NoError(t, utils.TestDiff(b1, blocks[lane1][0])) - require.NoError(t, utils.TestDiff(b2, blocks[lane2][0])) + require.Equal(t, 1, len(blocks[lane1])) + require.Equal(t, 1, len(blocks[lane2])) + require.NoError(t, utils.TestDiff(b1, blocks[lane1][0].Proposal)) + require.NoError(t, utils.TestDiff(b2, blocks[lane2][0].Proposal)) } func TestLoadSkipsCorruptBlockFile(t *testing.T) { @@ -93,11 +96,56 @@ func TestLoadSkipsCorruptBlockFile(t *testing.T) { corruptName := blockFilename(lane, 1) require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) - // Reload — should load b0 and skip the corrupt one _, blocks, err := NewBlockPersister(dir) require.NoError(t, err) require.Equal(t, 1, len(blocks[lane]), "should only load the valid block") - require.NoError(t, utils.TestDiff(b0, blocks[lane][0])) + require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) +} + +func TestLoadCorruptMidSequenceTruncatesAtGap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Persist blocks 0, 2 (valid) and corrupt block 1. + // After skipping corrupt-1, raw has {0, 2} → gap at 1 → contiguous prefix [0]. + b0 := testSignedProposal(rng, key, 0) + b2 := testSignedProposal(rng, key, 2) + require.NoError(t, bp.PersistBlock(lane, 0, b0)) + require.NoError(t, bp.PersistBlock(lane, 2, b2)) + corruptName := blockFilename(lane, 1) + require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane]), "corrupt mid-sequence creates gap; only block 0 survives") + require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) + require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) +} + +func TestLoadTruncatesAtGap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Persist blocks 3, 4, 6, 7 (gap at 5). + for _, n := range []types.BlockNumber{3, 4, 6, 7} { + require.NoError(t, bp.PersistBlock(lane, n, testSignedProposal(rng, key, n))) + } + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 2, len(blocks[lane]), "should have contiguous prefix [3, 4]") + require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) } func TestLoadSkipsMismatchedHeader(t *testing.T) { @@ -161,20 +209,11 @@ func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { // Delete blocks before 3 bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3}) - // Reload and verify only 3, 4 remain _, blocks, err := NewBlockPersister(dir) require.NoError(t, err) require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") - _, has0 := blocks[lane][0] - _, has1 := blocks[lane][1] - _, has2 := blocks[lane][2] - _, has3 := blocks[lane][3] - _, has4 := blocks[lane][4] - require.False(t, has0) - require.False(t, has1) - require.False(t, has2) - require.True(t, has3) - require.True(t, has4) + require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) } func TestDeleteBeforeMultipleLanes(t *testing.T) { @@ -200,7 +239,10 @@ func TestDeleteBeforeMultipleLanes(t *testing.T) { _, blocks, err := NewBlockPersister(dir) require.NoError(t, err) require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") + require.Equal(t, types.BlockNumber(2), blocks[lane1][0].Number) require.Equal(t, 2, len(blocks[lane2]), "lane2 should have blocks 1,2") + require.Equal(t, types.BlockNumber(1), blocks[lane2][0].Number) + require.Equal(t, types.BlockNumber(2), blocks[lane2][1].Number) } func TestDeleteBeforeEmptyMap(t *testing.T) { From 91078d8aee6cad4a75ac550193f565bd815960be Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 17 Feb 2026 21:15:14 -0800 Subject: [PATCH 4/9] Fix goimports formatting in avail/state.go Co-authored-by: Cursor --- sei-tendermint/internal/autobahn/avail/state.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 5e7e51f60d..5598164615 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -4,15 +4,15 @@ import ( "context" "errors" "fmt" + "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" - - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" ) // ErrBadLane . From 284d29ba1bedac6b5f9129afaaee1b0ad035245e Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 18 Feb 2026 13:05:33 -0800 Subject: [PATCH 5/9] Async block persistence: move fsync off the critical path PushBlock and ProduceBlock now add blocks to the in-memory queue immediately and send a persist job to a background goroutine via a buffered channel. The background writer fsyncs each block to disk and advances a per-lane blockPersisted cursor under the inner lock. RecvBatch gates on this cursor so votes are only signed for blocks that have been durably written to disk. When persistence is disabled (testing), the cursor is nil and RecvBatch falls back to bq.next. Co-authored-by: Cursor --- .../internal/autobahn/avail/inner.go | 18 ++++- .../internal/autobahn/avail/inner_test.go | 18 ++--- .../internal/autobahn/avail/state.go | 79 +++++++++++++++---- .../internal/autobahn/avail/subscriptions.go | 8 +- 4 files changed, 94 insertions(+), 29 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index 63a48e588f..96c807ec3c 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -15,6 +15,10 @@ type inner struct { commitQCs *queue[types.RoadIndex, *types.CommitQC] blocks map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]] votes map[types.LaneID]*queue[types.BlockNumber, blockVotes] + // blockPersisted tracks per-lane how far block persistence has progressed. + // RecvBatch only yields blocks below this cursor for voting. + // nil when persistence is disabled (testing); RecvBatch then uses q.next. + blockPersisted map[types.LaneID]types.BlockNumber } // loadedAvailState holds data loaded from disk on restart. @@ -25,7 +29,7 @@ type loadedAvailState struct { blocks map[types.LaneID][]persist.LoadedBlock } -func newInner(c *types.Committee, loaded *loadedAvailState) *inner { +func newInner(c *types.Committee, loaded *loadedAvailState, persistEnabled bool) *inner { votes := map[types.LaneID]*queue[types.BlockNumber, blockVotes]{} blocks := map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]]{} for _, lane := range c.Lanes().All() { @@ -33,6 +37,14 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { blocks[lane] = newQueue[types.BlockNumber, *types.Signed[*types.LaneProposal]]() } + var blockPersisted map[types.LaneID]types.BlockNumber + if persistEnabled { + blockPersisted = make(map[types.LaneID]types.BlockNumber, c.Lanes().Len()) + for _, lane := range c.Lanes().All() { + blockPersisted[lane] = 0 + } + } + i := &inner{ latestAppQC: utils.None[*types.AppQC](), latestCommitQC: utils.NewAtomicSend(utils.None[*types.CommitQC]()), @@ -40,6 +52,7 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { commitQCs: newQueue[types.RoadIndex, *types.CommitQC](), blocks: blocks, votes: votes, + blockPersisted: blockPersisted, } if loaded == nil { @@ -70,6 +83,9 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { q.q[q.next] = b.Proposal q.next++ } + if i.blockPersisted != nil { + i.blockPersisted[lane] = q.next + } // Advance the votes queue to match so headers() returns ErrPruned // for already-committed blocks instead of blocking forever. vq := i.votes[lane] diff --git a/sei-tendermint/internal/autobahn/avail/inner_test.go b/sei-tendermint/internal/autobahn/avail/inner_test.go index e6bf2455de..4fb957dc95 100644 --- a/sei-tendermint/internal/autobahn/avail/inner_test.go +++ b/sei-tendermint/internal/autobahn/avail/inner_test.go @@ -73,7 +73,7 @@ func TestNewInnerFreshStart(t *testing.T) { rng := utils.TestRng() committee, _ := types.GenCommittee(rng, 4) - i := newInner(committee, nil) + i := newInner(committee, nil, false) require.False(t, i.latestAppQC.IsPresent()) require.Equal(t, types.RoadIndex(0), i.commitQCs.first) @@ -101,7 +101,7 @@ func TestNewInnerLoadedAppQCAdvancesQueues(t *testing.T) { appQC: utils.Some[*types.AppQC](appQC), } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) // latestAppQC should be restored. aq, ok := i.latestAppQC.Get() @@ -126,7 +126,7 @@ func TestNewInnerLoadedAppQCNone(t *testing.T) { appQC: utils.None[*types.AppQC](), } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) // No AppQC loaded, queues should start at 0. require.False(t, i.latestAppQC.IsPresent()) @@ -153,7 +153,7 @@ func TestNewInnerLoadedBlocksContiguous(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) q := i.blocks[lane] require.Equal(t, types.BlockNumber(5), q.first) @@ -187,7 +187,7 @@ func TestNewInnerLoadedBlocksContiguousPrefix(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) q := i.blocks[lane] require.Equal(t, types.BlockNumber(3), q.first) @@ -206,7 +206,7 @@ func TestNewInnerLoadedBlocksEmptySlice(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: {}}, } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) q := i.blocks[lane] require.Equal(t, types.BlockNumber(0), q.first) @@ -226,7 +226,7 @@ func TestNewInnerLoadedBlocksUnknownLane(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{unknownLane: {{Number: 0, Proposal: b}}}, } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) for _, lane := range committee.Lanes().All() { q := i.blocks[lane] @@ -259,7 +259,7 @@ func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) aq, ok := i.latestAppQC.Get() require.True(t, ok) @@ -304,7 +304,7 @@ func TestNewInnerLoadedBlocksMultipleLanes(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane0: bs0, lane1: bs1}, } - i := newInner(committee, loaded) + i := newInner(committee, loaded, false) q0 := i.blocks[lane0] require.Equal(t, types.BlockNumber(2), q0.first) diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 5598164615..fa8e066f45 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -45,6 +45,15 @@ type State struct { persister utils.Option[persist.Persister] // blockPersist writes/deletes individual block files; nil when persistence is disabled. blockPersist *persist.BlockPersister + // persistCh carries blocks to the background writer for async fsync. + // nil when persistence is disabled (testing). + persistCh chan persistJob +} + +type persistJob struct { + lane types.LaneID + number types.BlockNumber + proposal *types.Signed[*types.LaneProposal] } // innerFile is the A/B file prefix for avail inner state persistence. @@ -95,12 +104,18 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin p = utils.Some(persister) } + var persistCh chan persistJob + if bp != nil { + persistCh = make(chan persistJob, BlocksPerLane) + } + return &State{ key: key, data: data, - inner: utils.NewWatch(newInner(data.Committee(), loaded)), + inner: utils.NewWatch(newInner(data.Committee(), loaded, bp != nil)), persister: p, blockPersist: bp, + persistCh: persistCh, }, nil } @@ -383,15 +398,16 @@ func (s *State) PushBlock(ctx context.Context, p *types.Signed[*types.LanePropos return nil } } - // Persist block to disk before adding to in-memory state. - if bp := s.blockPersist; bp != nil { - if err := bp.PersistBlock(h.Lane(), h.BlockNumber(), p); err != nil { - return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err) - } - } q.pushBack(p) ctrl.Updated() } + if s.persistCh != nil { + select { + case s.persistCh <- persistJob{lane: h.Lane(), number: h.BlockNumber(), proposal: p}: + case <-ctx.Done(): + return ctx.Err() + } + } return nil } @@ -530,6 +546,8 @@ func (s *State) ProduceBlock(ctx context.Context, payload *types.Payload) (*type // TODO: produceBlock is a separate function for testing - consider improving the tests to use ProduceBlock only. func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { lane := key.Public() + var result *types.Signed[*types.LaneProposal] + var blockNum types.BlockNumber for inner, ctrl := range s.inner.Lock() { q, ok := inner.blocks[lane] if !ok { @@ -542,24 +560,30 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload * if q.first < q.next { parent = q.q[q.next-1].Msg().Block().Header().Hash() } - p := types.Sign(key, types.NewLaneProposal(types.NewBlock(lane, q.next, parent, payload))) - // Persist block to disk before adding to in-memory state. - if bp := s.blockPersist; bp != nil { - if err := bp.PersistBlock(lane, q.next, p); err != nil { - return nil, fmt.Errorf("persist block %s/%d: %w", lane, q.next, err) - } - } - q.q[q.next] = p + blockNum = q.next + result = types.Sign(key, types.NewLaneProposal(types.NewBlock(lane, blockNum, parent, payload))) + q.q[q.next] = result q.next += 1 ctrl.Updated() - return p, nil } - panic("unreachable") + if s.persistCh != nil { + select { + case s.persistCh <- persistJob{lane: lane, number: blockNum, proposal: result}: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return result, nil } // Run runs the background tasks of the state. func (s *State) Run(ctx context.Context) error { return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error { + if s.persistCh != nil { + scope.SpawnNamed("blockPersistWriter", func() error { + return s.runBlockPersistWriter(ctx) + }) + } // Task inserting FullCommitQCs and local blocks to data state. scope.SpawnNamed("s.data.PushQC", func() error { c := s.data.Committee() @@ -595,3 +619,24 @@ func (s *State) Run(ctx context.Context) error { return nil }) } + +// runBlockPersistWriter drains persistCh and fsyncs each block to disk, +// then advances the per-lane blockPersisted cursor under the inner lock. +func (s *State) runBlockPersistWriter(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case job := <-s.persistCh: + if err := s.blockPersist.PersistBlock(job.lane, job.number, job.proposal); err != nil { + return fmt.Errorf("persist block %s/%d: %w", job.lane, job.number, err) + } + for inner, ctrl := range s.inner.Lock() { + if inner.blockPersisted[job.lane] == job.number { + inner.blockPersisted[job.lane] = job.number + 1 + } + ctrl.Updated() + } + } + } +} diff --git a/sei-tendermint/internal/autobahn/avail/subscriptions.go b/sei-tendermint/internal/autobahn/avail/subscriptions.go index c59a69e39d..93312061dd 100644 --- a/sei-tendermint/internal/autobahn/avail/subscriptions.go +++ b/sei-tendermint/internal/autobahn/avail/subscriptions.go @@ -48,10 +48,14 @@ func (r *LaneVotesRecv) RecvBatch(ctx context.Context) ([]*types.Signed[*types.L for inner, ctrl := range r.state.inner.Lock() { for { for lane, bq := range inner.blocks { - for i := max(bq.first, r.next[lane]); i < bq.next; i++ { + upperBound := bq.next + if inner.blockPersisted != nil { + upperBound = min(upperBound, inner.blockPersisted[lane]) + } + for i := max(bq.first, r.next[lane]); i < upperBound; i++ { batch = append(batch, bq.q[i].Msg().Block().Header()) } - r.next[lane] = bq.next + r.next[lane] = upperBound } if len(batch) > 0 { break From 7edd471c3428634f069919dca77799f2ee34f3a2 Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 18 Feb 2026 13:52:54 -0800 Subject: [PATCH 6/9] Derive persistEnabled from loaded instead of extra arg newInner no longer takes a separate persistEnabled bool; loaded != nil already implies persistence is enabled. Tests with loaded data now correctly reflect this. Co-authored-by: Cursor --- .../internal/autobahn/avail/inner.go | 8 +++----- .../internal/autobahn/avail/inner_test.go | 18 +++++++++--------- .../internal/autobahn/avail/state.go | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index 96c807ec3c..79eb0ac682 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -29,7 +29,7 @@ type loadedAvailState struct { blocks map[types.LaneID][]persist.LoadedBlock } -func newInner(c *types.Committee, loaded *loadedAvailState, persistEnabled bool) *inner { +func newInner(c *types.Committee, loaded *loadedAvailState) *inner { votes := map[types.LaneID]*queue[types.BlockNumber, blockVotes]{} blocks := map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]]{} for _, lane := range c.Lanes().All() { @@ -38,7 +38,7 @@ func newInner(c *types.Committee, loaded *loadedAvailState, persistEnabled bool) } var blockPersisted map[types.LaneID]types.BlockNumber - if persistEnabled { + if loaded != nil { blockPersisted = make(map[types.LaneID]types.BlockNumber, c.Lanes().Len()) for _, lane := range c.Lanes().All() { blockPersisted[lane] = 0 @@ -83,9 +83,7 @@ func newInner(c *types.Committee, loaded *loadedAvailState, persistEnabled bool) q.q[q.next] = b.Proposal q.next++ } - if i.blockPersisted != nil { - i.blockPersisted[lane] = q.next - } + i.blockPersisted[lane] = q.next // Advance the votes queue to match so headers() returns ErrPruned // for already-committed blocks instead of blocking forever. vq := i.votes[lane] diff --git a/sei-tendermint/internal/autobahn/avail/inner_test.go b/sei-tendermint/internal/autobahn/avail/inner_test.go index 4fb957dc95..e6bf2455de 100644 --- a/sei-tendermint/internal/autobahn/avail/inner_test.go +++ b/sei-tendermint/internal/autobahn/avail/inner_test.go @@ -73,7 +73,7 @@ func TestNewInnerFreshStart(t *testing.T) { rng := utils.TestRng() committee, _ := types.GenCommittee(rng, 4) - i := newInner(committee, nil, false) + i := newInner(committee, nil) require.False(t, i.latestAppQC.IsPresent()) require.Equal(t, types.RoadIndex(0), i.commitQCs.first) @@ -101,7 +101,7 @@ func TestNewInnerLoadedAppQCAdvancesQueues(t *testing.T) { appQC: utils.Some[*types.AppQC](appQC), } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) // latestAppQC should be restored. aq, ok := i.latestAppQC.Get() @@ -126,7 +126,7 @@ func TestNewInnerLoadedAppQCNone(t *testing.T) { appQC: utils.None[*types.AppQC](), } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) // No AppQC loaded, queues should start at 0. require.False(t, i.latestAppQC.IsPresent()) @@ -153,7 +153,7 @@ func TestNewInnerLoadedBlocksContiguous(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) q := i.blocks[lane] require.Equal(t, types.BlockNumber(5), q.first) @@ -187,7 +187,7 @@ func TestNewInnerLoadedBlocksContiguousPrefix(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) q := i.blocks[lane] require.Equal(t, types.BlockNumber(3), q.first) @@ -206,7 +206,7 @@ func TestNewInnerLoadedBlocksEmptySlice(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: {}}, } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) q := i.blocks[lane] require.Equal(t, types.BlockNumber(0), q.first) @@ -226,7 +226,7 @@ func TestNewInnerLoadedBlocksUnknownLane(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{unknownLane: {{Number: 0, Proposal: b}}}, } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) for _, lane := range committee.Lanes().All() { q := i.blocks[lane] @@ -259,7 +259,7 @@ func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) aq, ok := i.latestAppQC.Get() require.True(t, ok) @@ -304,7 +304,7 @@ func TestNewInnerLoadedBlocksMultipleLanes(t *testing.T) { blocks: map[types.LaneID][]persist.LoadedBlock{lane0: bs0, lane1: bs1}, } - i := newInner(committee, loaded, false) + i := newInner(committee, loaded) q0 := i.blocks[lane0] require.Equal(t, types.BlockNumber(2), q0.first) diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index fa8e066f45..7ac4c1ab9a 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -112,7 +112,7 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin return &State{ key: key, data: data, - inner: utils.NewWatch(newInner(data.Committee(), loaded, bp != nil)), + inner: utils.NewWatch(newInner(data.Committee(), loaded)), persister: p, blockPersist: bp, persistCh: persistCh, From 2f0bbad77136e621a366fa21b863939d73d5053b Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 18 Feb 2026 16:25:34 -0800 Subject: [PATCH 7/9] Move blockPersisted init next to block restoration in newInner blockPersisted is reconstructed from disk on restart, not persisted itself. Move its creation to just above the block restoration loop (past the loaded==nil early return) so the code reads top-down. Co-authored-by: Cursor --- sei-tendermint/internal/autobahn/avail/inner.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index 79eb0ac682..fc224584be 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -18,6 +18,8 @@ type inner struct { // blockPersisted tracks per-lane how far block persistence has progressed. // RecvBatch only yields blocks below this cursor for voting. // nil when persistence is disabled (testing); RecvBatch then uses q.next. + // blockPersisted itself is not persisted to disk: on restart it is + // reconstructed from the blocks already on disk (see newInner). blockPersisted map[types.LaneID]types.BlockNumber } @@ -37,14 +39,6 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { blocks[lane] = newQueue[types.BlockNumber, *types.Signed[*types.LaneProposal]]() } - var blockPersisted map[types.LaneID]types.BlockNumber - if loaded != nil { - blockPersisted = make(map[types.LaneID]types.BlockNumber, c.Lanes().Len()) - for _, lane := range c.Lanes().All() { - blockPersisted[lane] = 0 - } - } - i := &inner{ latestAppQC: utils.None[*types.AppQC](), latestCommitQC: utils.NewAtomicSend(utils.None[*types.CommitQC]()), @@ -52,7 +46,6 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { commitQCs: newQueue[types.RoadIndex, *types.CommitQC](), blocks: blocks, votes: votes, - blockPersisted: blockPersisted, } if loaded == nil { @@ -70,6 +63,12 @@ func newInner(c *types.Committee, loaded *loadedAvailState) *inner { i.appVotes.next = i.appVotes.first } + // Reconstruct blockPersisted from the blocks on disk. + i.blockPersisted = make(map[types.LaneID]types.BlockNumber, c.Lanes().Len()) + for _, lane := range c.Lanes().All() { + i.blockPersisted[lane] = 0 + } + // Restore persisted blocks into their lane queues. for lane, bs := range loaded.blocks { q, ok := i.blocks[lane] From 869066f38a185b2e76e5002177e320eb0f9cc79d Mon Sep 17 00:00:00 2001 From: Wen Date: Fri, 20 Feb 2026 10:48:27 -0800 Subject: [PATCH 8/9] Fix NewState call sites after rebase: add persistence dir arg Co-authored-by: Cursor --- sei-tendermint/internal/autobahn/avail/state_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/state_test.go b/sei-tendermint/internal/autobahn/avail/state_test.go index a7820c3de0..b17558075f 100644 --- a/sei-tendermint/internal/autobahn/avail/state_test.go +++ b/sei-tendermint/internal/autobahn/avail/state_test.go @@ -262,7 +262,7 @@ func TestPushBlockRejectsBadParentHash(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]())) // Produce a valid first block on our lane. _, err := state.ProduceBlock(ctx, types.GenPayload(rng)) @@ -287,7 +287,7 @@ func TestPushBlockRejectsWrongSigner(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]())) // Create a block on keys[0]'s lane but sign it with keys[1]. lane := keys[0].Public() From bed68249de0e34fc95115656214dffa23d198b3a Mon Sep 17 00:00:00 2001 From: Wen Date: Fri, 20 Feb 2026 13:16:08 -0800 Subject: [PATCH 9/9] Encapsulate async block persistence in BlockPersister Move persistCh, persistJob, and the writer loop from avail/State into BlockPersister.Queue + BlockPersister.Run, so callers just call Queue() and the persist layer owns the channel, buffer sizing, and drain loop. Queue blocks with context to avoid holes in the sequential blockPersisted cursor (which would permanently stall voting). Call sites use utils.IgnoreAfterCancel to swallow shutdown errors. Co-authored-by: Cursor --- .../internal/autobahn/avail/state.go | 67 +++++-------------- .../autobahn/consensus/persist/blocks.go | 44 +++++++++++- 2 files changed, 61 insertions(+), 50 deletions(-) diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 7ac4c1ab9a..cc9ff1bf46 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -45,15 +45,6 @@ type State struct { persister utils.Option[persist.Persister] // blockPersist writes/deletes individual block files; nil when persistence is disabled. blockPersist *persist.BlockPersister - // persistCh carries blocks to the background writer for async fsync. - // nil when persistence is disabled (testing). - persistCh chan persistJob -} - -type persistJob struct { - lane types.LaneID - number types.BlockNumber - proposal *types.Signed[*types.LaneProposal] } // innerFile is the A/B file prefix for avail inner state persistence. @@ -104,18 +95,12 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin p = utils.Some(persister) } - var persistCh chan persistJob - if bp != nil { - persistCh = make(chan persistJob, BlocksPerLane) - } - return &State{ key: key, data: data, inner: utils.NewWatch(newInner(data.Committee(), loaded)), persister: p, blockPersist: bp, - persistCh: persistCh, }, nil } @@ -401,12 +386,10 @@ func (s *State) PushBlock(ctx context.Context, p *types.Signed[*types.LanePropos q.pushBack(p) ctrl.Updated() } - if s.persistCh != nil { - select { - case s.persistCh <- persistJob{lane: h.Lane(), number: h.BlockNumber(), proposal: p}: - case <-ctx.Done(): - return ctx.Err() - } + if s.blockPersist != nil { + // Blocking: called outside the inner lock so only this goroutine stalls. + // See Queue() for why we must not drop blocks. + return utils.IgnoreAfterCancel(ctx, s.blockPersist.Queue(ctx, h.Lane(), h.BlockNumber(), p)) } return nil } @@ -566,11 +549,11 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload * q.next += 1 ctrl.Updated() } - if s.persistCh != nil { - select { - case s.persistCh <- persistJob{lane: lane, number: blockNum, proposal: result}: - case <-ctx.Done(): - return nil, ctx.Err() + if s.blockPersist != nil { + // Blocking: called outside the inner lock so only this goroutine stalls. + // See Queue() for why we must not drop blocks. + if err := utils.IgnoreAfterCancel(ctx, s.blockPersist.Queue(ctx, lane, blockNum, result)); err != nil { + return nil, err } } return result, nil @@ -579,9 +562,16 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload * // Run runs the background tasks of the state. func (s *State) Run(ctx context.Context) error { return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error { - if s.persistCh != nil { + if s.blockPersist != nil { scope.SpawnNamed("blockPersistWriter", func() error { - return s.runBlockPersistWriter(ctx) + return s.blockPersist.Run(ctx, func(lane types.LaneID, n types.BlockNumber) { + for inner, ctrl := range s.inner.Lock() { + if inner.blockPersisted[lane] == n { + inner.blockPersisted[lane] = n + 1 + } + ctrl.Updated() + } + }) }) } // Task inserting FullCommitQCs and local blocks to data state. @@ -619,24 +609,3 @@ func (s *State) Run(ctx context.Context) error { return nil }) } - -// runBlockPersistWriter drains persistCh and fsyncs each block to disk, -// then advances the per-lane blockPersisted cursor under the inner lock. -func (s *State) runBlockPersistWriter(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case job := <-s.persistCh: - if err := s.blockPersist.PersistBlock(job.lane, job.number, job.proposal); err != nil { - return fmt.Errorf("persist block %s/%d: %w", job.lane, job.number, err) - } - for inner, ctrl := range s.inner.Lock() { - if inner.blockPersisted[job.lane] == job.number { - inner.blockPersisted[job.lane] = job.number + 1 - } - ctrl.Updated() - } - } - } -} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go index c90b9edbe4..01042c6d6f 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go @@ -6,6 +6,7 @@ package persist import ( + "context" "encoding/hex" "fmt" "maps" @@ -31,8 +32,20 @@ type LoadedBlock struct { // Each block is stored as _.pb. type BlockPersister struct { dir string // full path to the blocks/ subdirectory + ch chan persistJob } +type persistJob struct { + lane types.LaneID + number types.BlockNumber + proposal *types.Signed[*types.LaneProposal] +} + +// persistQueueSize is the buffer for async block persistence. With 40 validators, +// a 1/3 Byzantine burst produces up to ~13 lanes × 30 blocks = 390 blocks at once. +// 512 covers that with margin. +const persistQueueSize = 512 + // NewBlockPersister creates the blocks/ subdirectory if it doesn't exist and // returns a block persister. Loads all persisted blocks from disk as sorted, // contiguous slices per lane. Gaps from corrupt or missing files are resolved @@ -43,7 +56,7 @@ func NewBlockPersister(stateDir string) (*BlockPersister, map[types.LaneID][]Loa return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) } - bp := &BlockPersister{dir: dir} + bp := &BlockPersister{dir: dir, ch: make(chan persistJob, persistQueueSize)} blocks, err := bp.loadAll() if err != nil { return nil, nil, err @@ -118,6 +131,35 @@ func (bp *BlockPersister) DeleteBefore(laneFirsts map[types.LaneID]types.BlockNu } } +// Queue enqueues a block for async persistence. Blocks if the queue is full +// until space is available or ctx is cancelled. We must not drop blocks because +// the blockPersisted cursor advances sequentially — a hole would permanently +// stall voting on the affected lane. +func (bp *BlockPersister) Queue(ctx context.Context, lane types.LaneID, n types.BlockNumber, proposal *types.Signed[*types.LaneProposal]) error { + select { + case bp.ch <- persistJob{lane: lane, number: n, proposal: proposal}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Run drains the internal queue, fsyncs each block to disk, and calls +// onPersisted after each successful write. Blocks until ctx is cancelled. +func (bp *BlockPersister) Run(ctx context.Context, onPersisted func(types.LaneID, types.BlockNumber)) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case job := <-bp.ch: + if err := bp.PersistBlock(job.lane, job.number, job.proposal); err != nil { + return fmt.Errorf("persist block %s/%d: %w", job.lane, job.number, err) + } + onPersisted(job.lane, job.number) + } + } +} + // loadAll loads all persisted blocks from the blocks/ directory. // Returns sorted, contiguous slices per lane (truncated at the first gap). func (bp *BlockPersister) loadAll() (map[types.LaneID][]LoadedBlock, error) {