diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index ec17fc983e..fc224584be 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -3,6 +3,7 @@ package avail import ( "fmt" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) @@ -14,16 +15,31 @@ type inner struct { commitQCs *queue[types.RoadIndex, *types.CommitQC] blocks map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]] votes map[types.LaneID]*queue[types.BlockNumber, blockVotes] + // blockPersisted tracks per-lane how far block persistence has progressed. + // RecvBatch only yields blocks below this cursor for voting. + // nil when persistence is disabled (testing); RecvBatch then uses q.next. + // blockPersisted itself is not persisted to disk: on restart it is + // reconstructed from the blocks already on disk (see newInner). + blockPersisted map[types.LaneID]types.BlockNumber } -func newInner(c *types.Committee) *inner { +// loadedAvailState holds data loaded from disk on restart. +// nil means fresh start (no persisted data). +// blocks are sorted by number and contiguous (gaps already resolved by loader). +type loadedAvailState struct { + appQC utils.Option[*types.AppQC] + blocks map[types.LaneID][]persist.LoadedBlock +} + +func newInner(c *types.Committee, loaded *loadedAvailState) *inner { votes := map[types.LaneID]*queue[types.BlockNumber, blockVotes]{} blocks := map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]]{} for _, lane := range c.Lanes().All() { votes[lane] = newQueue[types.BlockNumber, blockVotes]() blocks[lane] = newQueue[types.BlockNumber, *types.Signed[*types.LaneProposal]]() } - return &inner{ + + i := &inner{ latestAppQC: utils.None[*types.AppQC](), latestCommitQC: utils.NewAtomicSend(utils.None[*types.CommitQC]()), appVotes: newQueue[types.GlobalBlockNumber, appVotes](), @@ -31,6 +47,50 @@ func newInner(c *types.Committee) *inner { blocks: blocks, votes: votes, } + + if loaded == nil { + return i + } + + // Restore AppQC and advance queues past already-processed indices. + i.latestAppQC = loaded.appQC + if aq, ok := loaded.appQC.Get(); ok { + // CommitQCs through this index have been processed; skip them. + i.commitQCs.first = aq.Proposal().RoadIndex() + 1 + i.commitQCs.next = i.commitQCs.first + // AppVotes through this global block number have been processed. + i.appVotes.first = aq.Proposal().GlobalNumber() + 1 + i.appVotes.next = i.appVotes.first + } + + // Reconstruct blockPersisted from the blocks on disk. + i.blockPersisted = make(map[types.LaneID]types.BlockNumber, c.Lanes().Len()) + for _, lane := range c.Lanes().All() { + i.blockPersisted[lane] = 0 + } + + // Restore persisted blocks into their lane queues. + for lane, bs := range loaded.blocks { + q, ok := i.blocks[lane] + if !ok || len(bs) == 0 { + continue + } + first := bs[0].Number + q.first = first + q.next = first + for _, b := range bs { + q.q[q.next] = b.Proposal + q.next++ + } + i.blockPersisted[lane] = q.next + // Advance the votes queue to match so headers() returns ErrPruned + // for already-committed blocks instead of blocking forever. + vq := i.votes[lane] + vq.first = first + vq.next = first + } + + return i } func (i *inner) laneQC(c *types.Committee, lane types.LaneID, n types.BlockNumber) (*types.LaneQC, bool) { diff --git a/sei-tendermint/internal/autobahn/avail/inner_test.go b/sei-tendermint/internal/autobahn/avail/inner_test.go index d5b619815c..e6bf2455de 100644 --- a/sei-tendermint/internal/autobahn/avail/inner_test.go +++ b/sei-tendermint/internal/autobahn/avail/inner_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -16,7 +17,8 @@ func TestPruneMismatchedIndices(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state, err := NewState(keys[0], ds, utils.None[string]()) + require.NoError(t, err) // Helper to create a CommitQC for a specific index makeQC := func(index types.RoadIndex, prev utils.Option[*types.CommitQC]) *types.CommitQC { @@ -45,7 +47,7 @@ func TestPruneMismatchedIndices(t *testing.T) { appQC1 := types.NewAppQC(makeAppVotes(keys, appProposal1)) // Now call PushAppQC with appQC1 (index 1) and qc0 (index 0) - err := state.PushAppQC(appQC1, qc0) + err = state.PushAppQC(appQC1, qc0) require.Error(t, err) require.Contains(t, err.Error(), "mismatched QCs") @@ -60,3 +62,258 @@ func TestPruneMismatchedIndices(t *testing.T) { require.False(t, inner.latestAppQC.IsPresent(), "latestAppQC should not have been updated") } } + +// testSignedBlock creates a signed lane proposal for a given lane, block number, and parent hash. +func testSignedBlock(key types.SecretKey, lane types.LaneID, n types.BlockNumber, parent types.BlockHeaderHash, rng utils.Rng) *types.Signed[*types.LaneProposal] { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + return types.Sign(key, types.NewLaneProposal(block)) +} + +func TestNewInnerFreshStart(t *testing.T) { + rng := utils.TestRng() + committee, _ := types.GenCommittee(rng, 4) + + i := newInner(committee, nil) + + require.False(t, i.latestAppQC.IsPresent()) + require.Equal(t, types.RoadIndex(0), i.commitQCs.first) + require.Equal(t, types.RoadIndex(0), i.commitQCs.next) + require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.first) + require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.next) + for _, lane := range committee.Lanes().All() { + require.Equal(t, types.BlockNumber(0), i.blocks[lane].first) + require.Equal(t, types.BlockNumber(0), i.blocks[lane].next) + require.Equal(t, types.BlockNumber(0), i.votes[lane].first) + require.Equal(t, types.BlockNumber(0), i.votes[lane].next) + } +} + +func TestNewInnerLoadedAppQCAdvancesQueues(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + + roadIdx := types.RoadIndex(5) + globalNum := types.GlobalBlockNumber(42) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + loaded := &loadedAvailState{ + appQC: utils.Some[*types.AppQC](appQC), + } + + i := newInner(committee, loaded) + + // latestAppQC should be restored. + aq, ok := i.latestAppQC.Get() + require.True(t, ok) + require.Equal(t, roadIdx, aq.Proposal().RoadIndex()) + require.Equal(t, globalNum, aq.Proposal().GlobalNumber()) + + // commitQCs queue should skip past the loaded AppQC's road index. + require.Equal(t, roadIdx+1, i.commitQCs.first) + require.Equal(t, roadIdx+1, i.commitQCs.next) + + // appVotes queue should skip past the loaded AppQC's global block number. + require.Equal(t, globalNum+1, i.appVotes.first) + require.Equal(t, globalNum+1, i.appVotes.next) +} + +func TestNewInnerLoadedAppQCNone(t *testing.T) { + rng := utils.TestRng() + committee, _ := types.GenCommittee(rng, 4) + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + } + + i := newInner(committee, loaded) + + // No AppQC loaded, queues should start at 0. + require.False(t, i.latestAppQC.IsPresent()) + require.Equal(t, types.RoadIndex(0), i.commitQCs.first) + require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.first) +} + +func TestNewInnerLoadedBlocksContiguous(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + // Build 3 contiguous blocks: 5, 6, 7. + var parent types.BlockHeaderHash + var bs []persist.LoadedBlock + for n := types.BlockNumber(5); n < 8; n++ { + b := testSignedBlock(keys[0], lane, n, parent, rng) + parent = b.Msg().Block().Header().Hash() + bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b}) + } + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, + } + + i := newInner(committee, loaded) + + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(5), q.first) + require.Equal(t, types.BlockNumber(8), q.next) + for j, b := range bs { + require.Equal(t, b.Proposal, q.q[types.BlockNumber(5)+types.BlockNumber(j)]) + } + + // Votes queue should be aligned. + vq := i.votes[lane] + require.Equal(t, types.BlockNumber(5), vq.first) + require.Equal(t, types.BlockNumber(5), vq.next) +} + +func TestNewInnerLoadedBlocksContiguousPrefix(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + // Loader already resolved the gap: only contiguous prefix [3, 4] is passed. + var parent types.BlockHeaderHash + var bs []persist.LoadedBlock + for _, n := range []types.BlockNumber{3, 4} { + b := testSignedBlock(keys[0], lane, n, parent, rng) + parent = b.Msg().Block().Header().Hash() + bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b}) + } + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, + } + + i := newInner(committee, loaded) + + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(3), q.first) + require.Equal(t, types.BlockNumber(5), q.next) + require.Equal(t, bs[0].Proposal, q.q[types.BlockNumber(3)]) + require.Equal(t, bs[1].Proposal, q.q[types.BlockNumber(4)]) +} + +func TestNewInnerLoadedBlocksEmptySlice(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{lane: {}}, + } + + i := newInner(committee, loaded) + + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(0), q.first) + require.Equal(t, types.BlockNumber(0), q.next) +} + +func TestNewInnerLoadedBlocksUnknownLane(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + + unknownKey := types.GenSecretKey(rng) + unknownLane := unknownKey.Public() + + b := testSignedBlock(unknownKey, unknownLane, 0, types.BlockHeaderHash{}, rng) + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{unknownLane: {{Number: 0, Proposal: b}}}, + } + + i := newInner(committee, loaded) + + for _, lane := range committee.Lanes().All() { + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(0), q.first) + require.Equal(t, types.BlockNumber(0), q.next) + } + _ = keys +} + +func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane := keys[0].Public() + + roadIdx := types.RoadIndex(3) + globalNum := types.GlobalBlockNumber(10) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + var parent types.BlockHeaderHash + var bs []persist.LoadedBlock + for n := types.BlockNumber(7); n < 9; n++ { + b := testSignedBlock(keys[0], lane, n, parent, rng) + parent = b.Msg().Block().Header().Hash() + bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b}) + } + + loaded := &loadedAvailState{ + appQC: utils.Some[*types.AppQC](appQC), + blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs}, + } + + i := newInner(committee, loaded) + + aq, ok := i.latestAppQC.Get() + require.True(t, ok) + require.Equal(t, roadIdx, aq.Proposal().RoadIndex()) + + require.Equal(t, roadIdx+1, i.commitQCs.first) + require.Equal(t, globalNum+1, i.appVotes.first) + + q := i.blocks[lane] + require.Equal(t, types.BlockNumber(7), q.first) + require.Equal(t, types.BlockNumber(9), q.next) + + vq := i.votes[lane] + require.Equal(t, types.BlockNumber(7), vq.first) + require.Equal(t, types.BlockNumber(7), vq.next) +} + +func TestNewInnerLoadedBlocksMultipleLanes(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + lane0 := keys[0].Public() + lane1 := keys[1].Public() + + var parent0 types.BlockHeaderHash + var bs0 []persist.LoadedBlock + for n := types.BlockNumber(2); n < 4; n++ { + b := testSignedBlock(keys[0], lane0, n, parent0, rng) + parent0 = b.Msg().Block().Header().Hash() + bs0 = append(bs0, persist.LoadedBlock{Number: n, Proposal: b}) + } + + var parent1 types.BlockHeaderHash + var bs1 []persist.LoadedBlock + for n := types.BlockNumber(0); n < 3; n++ { + b := testSignedBlock(keys[1], lane1, n, parent1, rng) + parent1 = b.Msg().Block().Header().Hash() + bs1 = append(bs1, persist.LoadedBlock{Number: n, Proposal: b}) + } + + loaded := &loadedAvailState{ + appQC: utils.None[*types.AppQC](), + blocks: map[types.LaneID][]persist.LoadedBlock{lane0: bs0, lane1: bs1}, + } + + i := newInner(committee, loaded) + + q0 := i.blocks[lane0] + require.Equal(t, types.BlockNumber(2), q0.first) + require.Equal(t, types.BlockNumber(4), q0.next) + + q1 := i.blocks[lane1] + require.Equal(t, types.BlockNumber(0), q1.first) + require.Equal(t, types.BlockNumber(3), q1.next) + + require.Equal(t, types.BlockNumber(2), i.votes[lane0].first) + require.Equal(t, types.BlockNumber(0), i.votes[lane1].first) +} diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index 9c98327db8..cc9ff1bf46 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -6,7 +6,9 @@ import ( "fmt" "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -38,15 +40,68 @@ type State struct { key types.SecretKey data *data.State inner utils.Watch[*inner] + + // persister writes avail inner state to disk using A/B files; None when persistence is disabled. + persister utils.Option[persist.Persister] + // blockPersist writes/deletes individual block files; nil when persistence is disabled. + blockPersist *persist.BlockPersister +} + +// innerFile is the A/B file prefix for avail inner state persistence. +const innerFile = "avail_inner" + +// loadPersistedState loads persisted avail state from disk and creates persisters for ongoing writes. +func loadPersistedState(dir string) (*loadedAvailState, persist.Persister, *persist.BlockPersister, error) { + persister, persistedData, err := persist.NewPersister(dir, innerFile) + if err != nil { + return nil, nil, nil, fmt.Errorf("NewPersister %s: %w", innerFile, err) + } + + var appQC utils.Option[*types.AppQC] + if bz, ok := persistedData.Get(); ok { + qc, err := types.AppQCConv.Unmarshal(bz) + if err != nil { + return nil, nil, nil, fmt.Errorf("unmarshal persisted AppQC: %w", err) + } + log.Info(). + Uint64("roadIndex", uint64(qc.Proposal().RoadIndex())). + Uint64("globalNumber", uint64(qc.Proposal().GlobalNumber())). + Msg("loaded persisted AppQC") + appQC = utils.Some(qc) + } + + bp, blocks, err := persist.NewBlockPersister(dir) + if err != nil { + return nil, nil, nil, fmt.Errorf("NewBlockPersister: %w", err) + } + + return &loadedAvailState{appQC: appQC, blocks: blocks}, persister, bp, nil } // NewState constructs a new availability state. -func NewState(key types.SecretKey, data *data.State) *State { - return &State{ - key: key, - data: data, - inner: utils.NewWatch(newInner(data.Committee())), +// stateDir is None when persistence is disabled (testing only). +func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[string]) (*State, error) { + var loaded *loadedAvailState + var p utils.Option[persist.Persister] + var bp *persist.BlockPersister + + if dir, ok := stateDir.Get(); ok { + var persister persist.Persister + var err error + loaded, persister, bp, err = loadPersistedState(dir) + if err != nil { + return nil, err + } + p = utils.Some(persister) } + + return &State{ + key: key, + data: data, + inner: utils.NewWatch(newInner(data.Committee(), loaded)), + persister: p, + blockPersist: bp, + }, nil } func (s *State) FirstCommitQC() types.RoadIndex { @@ -151,6 +206,7 @@ func (s *State) PushAppVote(ctx context.Context, v *types.Signed[*types.AppVote] if err := s.waitForCommitQC(ctx, idx); err != nil { return err } + var laneFirsts map[types.LaneID]types.BlockNumber for inner, ctrl := range s.inner.Lock() { // Early exit if not useful (we collect <=1 AppQC per road index). if idx < types.NextOpt(inner.latestAppQC) { @@ -176,9 +232,16 @@ func (s *State) PushAppVote(ctx context.Context, v *types.Signed[*types.AppVote] return err } if updated { + if err := s.persistAppQC(appQC); err != nil { + return err + } + laneFirsts = snapshotLaneFirsts(inner) ctrl.Updated() } } + if s.blockPersist != nil { + s.blockPersist.DeleteBefore(laneFirsts) + } return nil } @@ -200,18 +263,48 @@ func (s *State) PushAppQC(appQC *types.AppQC, commitQC *types.CommitQC) error { if appQC.Proposal().RoadIndex() != commitQC.Proposal().Index() { return fmt.Errorf("mismatched QCs: appQC index %v, commitQC index %v", appQC.Proposal().RoadIndex(), commitQC.Proposal().Index()) } + var laneFirsts map[types.LaneID]types.BlockNumber for inner, ctrl := range s.inner.Lock() { updated, err := inner.prune(appQC, commitQC) if err != nil { return err } if updated { + if err := s.persistAppQC(appQC); err != nil { + return err + } + laneFirsts = snapshotLaneFirsts(inner) ctrl.Updated() } } + if s.blockPersist != nil { + s.blockPersist.DeleteBefore(laneFirsts) + } return nil } +func snapshotLaneFirsts(inner *inner) map[types.LaneID]types.BlockNumber { + m := make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) + for lane, q := range inner.blocks { + m[lane] = q.first + } + return m +} + +// persistAppQC writes the AppQC to the A/B file. Called under the inner lock +// so the checkpoint is durable before ctrl.Updated() notifies watchers. +func (s *State) persistAppQC(appQC *types.AppQC) error { + p, ok := s.persister.Get() + if !ok { + return nil + } + bz, err := proto.Marshal(types.AppQCConv.Encode(appQC)) + if err != nil { + return fmt.Errorf("marshal AppQC: %w", err) + } + return p.Persist(bz) +} + // NextBlock returns the index of the next missing block in local storage for the given lane. func (s *State) NextBlock(lane types.LaneID) types.BlockNumber { for inner := range s.inner.Lock() { @@ -293,6 +386,11 @@ func (s *State) PushBlock(ctx context.Context, p *types.Signed[*types.LanePropos q.pushBack(p) ctrl.Updated() } + if s.blockPersist != nil { + // Blocking: called outside the inner lock so only this goroutine stalls. + // See Queue() for why we must not drop blocks. + return utils.IgnoreAfterCancel(ctx, s.blockPersist.Queue(ctx, h.Lane(), h.BlockNumber(), p)) + } return nil } @@ -431,6 +529,8 @@ func (s *State) ProduceBlock(ctx context.Context, payload *types.Payload) (*type // TODO: produceBlock is a separate function for testing - consider improving the tests to use ProduceBlock only. func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { lane := key.Public() + var result *types.Signed[*types.LaneProposal] + var blockNum types.BlockNumber for inner, ctrl := range s.inner.Lock() { q, ok := inner.blocks[lane] if !ok { @@ -443,18 +543,37 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload * if q.first < q.next { parent = q.q[q.next-1].Msg().Block().Header().Hash() } - p := types.Sign(key, types.NewLaneProposal(types.NewBlock(lane, q.next, parent, payload))) - q.q[q.next] = p + blockNum = q.next + result = types.Sign(key, types.NewLaneProposal(types.NewBlock(lane, blockNum, parent, payload))) + q.q[q.next] = result q.next += 1 ctrl.Updated() - return p, nil } - panic("unreachable") + if s.blockPersist != nil { + // Blocking: called outside the inner lock so only this goroutine stalls. + // See Queue() for why we must not drop blocks. + if err := utils.IgnoreAfterCancel(ctx, s.blockPersist.Queue(ctx, lane, blockNum, result)); err != nil { + return nil, err + } + } + return result, nil } // Run runs the background tasks of the state. func (s *State) Run(ctx context.Context) error { return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error { + if s.blockPersist != nil { + scope.SpawnNamed("blockPersistWriter", func() error { + return s.blockPersist.Run(ctx, func(lane types.LaneID, n types.BlockNumber) { + for inner, ctrl := range s.inner.Lock() { + if inner.blockPersisted[lane] == n { + inner.blockPersisted[lane] = n + 1 + } + ctrl.Updated() + } + }) + }) + } // Task inserting FullCommitQCs and local blocks to data state. scope.SpawnNamed("s.data.PushQC", func() error { c := s.data.Committee() diff --git a/sei-tendermint/internal/autobahn/avail/state_test.go b/sei-tendermint/internal/autobahn/avail/state_test.go index c00e090030..b17558075f 100644 --- a/sei-tendermint/internal/autobahn/avail/state_test.go +++ b/sei-tendermint/internal/autobahn/avail/state_test.go @@ -7,6 +7,9 @@ import ( "testing" "time" + "google.golang.org/protobuf/proto" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -88,7 +91,8 @@ func TestState(t *testing.T) { s.SpawnBgNamed("data.State.Run()", func() error { return utils.IgnoreCancel(ds.Run(ctx)) }) - state := NewState(keys[0], ds) + state, err := NewState(keys[0], ds, utils.None[string]()) + require.NoError(t, err) s.SpawnBgNamed("da.State.Run()", func() error { return utils.IgnoreCancel(state.Run(ctx)) }) @@ -200,7 +204,8 @@ func TestStateMismatchedQCs(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state, err := NewState(keys[0], ds, utils.None[string]()) + require.NoError(t, err) ctx := context.Background() // Helper to create a CommitQC for a specific index @@ -257,7 +262,7 @@ func TestPushBlockRejectsBadParentHash(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]())) // Produce a valid first block on our lane. _, err := state.ProduceBlock(ctx, types.GenPayload(rng)) @@ -282,7 +287,7 @@ func TestPushBlockRejectsWrongSigner(t *testing.T) { ds := data.NewState(&data.Config{ Committee: committee, }, utils.None[data.BlockStore]()) - state := NewState(keys[0], ds) + state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]())) // Create a block on keys[0]'s lane but sign it with keys[1]. lane := keys[0].Public() @@ -292,3 +297,134 @@ func TestPushBlockRejectsWrongSigner(t *testing.T) { err := state.PushBlock(ctx, prop) require.Error(t, err) } + +func TestNewStateWithPersistence(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + + t.Run("empty dir loads fresh state", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + // No persisted AppQC → None. + require.False(t, state.LastAppQC().IsPresent()) + // Queues start at 0. + require.Equal(t, types.RoadIndex(0), state.FirstCommitQC()) + }) + + t.Run("loads persisted AppQC", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + // Persist an AppQC to the A/B file. + roadIdx := types.RoadIndex(7) + globalNum := types.GlobalBlockNumber(50) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + persister, _, err := persist.NewPersister(dir, innerFile) + require.NoError(t, err) + bz, err := proto.Marshal(types.AppQCConv.Encode(appQC)) + require.NoError(t, err) + require.NoError(t, persister.Persist(bz)) + + // Now construct state — it should load the AppQC. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + aq := state.LastAppQC() + got, ok := aq.Get() + require.True(t, ok) + require.Equal(t, roadIdx, got.Proposal().RoadIndex()) + require.Equal(t, globalNum, got.Proposal().GlobalNumber()) + + // commitQCs queue should be advanced past the AppQC's road index. + require.Equal(t, roadIdx+1, state.FirstCommitQC()) + }) + + t.Run("loads persisted blocks", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + lane := keys[0].Public() + + // Persist blocks using BlockPersister. + bp, _, err := persist.NewBlockPersister(dir) + require.NoError(t, err) + + var parent types.BlockHeaderHash + for n := types.BlockNumber(0); n < 3; n++ { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + signed := types.Sign(keys[0], types.NewLaneProposal(block)) + parent = block.Header().Hash() + require.NoError(t, bp.PersistBlock(lane, n, signed)) + } + + // Now construct state — it should load the blocks. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + require.Equal(t, types.BlockNumber(3), state.NextBlock(lane)) + }) + + t.Run("loads persisted AppQC and blocks together", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + lane := keys[0].Public() + + // Persist AppQC. + roadIdx := types.RoadIndex(2) + globalNum := types.GlobalBlockNumber(5) + appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + + persister, _, err := persist.NewPersister(dir, innerFile) + require.NoError(t, err) + bz, err := proto.Marshal(types.AppQCConv.Encode(appQC)) + require.NoError(t, err) + require.NoError(t, persister.Persist(bz)) + + // Persist blocks. + bp, _, err := persist.NewBlockPersister(dir) + require.NoError(t, err) + + var parent types.BlockHeaderHash + for n := types.BlockNumber(10); n < 13; n++ { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + signed := types.Sign(keys[0], types.NewLaneProposal(block)) + parent = block.Header().Hash() + require.NoError(t, bp.PersistBlock(lane, n, signed)) + } + + // Construct state. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + // AppQC loaded. + got, ok := state.LastAppQC().Get() + require.True(t, ok) + require.Equal(t, roadIdx, got.Proposal().RoadIndex()) + + // Blocks loaded. + require.Equal(t, types.BlockNumber(13), state.NextBlock(lane)) + + // commitQCs advanced. + require.Equal(t, roadIdx+1, state.FirstCommitQC()) + }) + + t.Run("corrupt AppQC data returns error", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + // Write garbage data as the "AppQC". + persister, _, err := persist.NewPersister(dir, innerFile) + require.NoError(t, err) + require.NoError(t, persister.Persist([]byte("not a valid protobuf"))) + + _, err = NewState(keys[0], ds, utils.Some(dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "unmarshal persisted AppQC") + }) +} diff --git a/sei-tendermint/internal/autobahn/avail/subscriptions.go b/sei-tendermint/internal/autobahn/avail/subscriptions.go index c59a69e39d..93312061dd 100644 --- a/sei-tendermint/internal/autobahn/avail/subscriptions.go +++ b/sei-tendermint/internal/autobahn/avail/subscriptions.go @@ -48,10 +48,14 @@ func (r *LaneVotesRecv) RecvBatch(ctx context.Context) ([]*types.Signed[*types.L for inner, ctrl := range r.state.inner.Lock() { for { for lane, bq := range inner.blocks { - for i := max(bq.first, r.next[lane]); i < bq.next; i++ { + upperBound := bq.next + if inner.blockPersisted != nil { + upperBound = min(upperBound, inner.blockPersisted[lane]) + } + for i := max(bq.first, r.next[lane]); i < upperBound; i++ { batch = append(batch, bq.q[i].Msg().Block().Header()) } - r.next[lane] = bq.next + r.next[lane] = upperBound } if len(batch) > 0 { break diff --git a/sei-tendermint/internal/autobahn/consensus/inner.go b/sei-tendermint/internal/autobahn/consensus/inner.go index ab1268920c..143d801da7 100644 --- a/sei-tendermint/internal/autobahn/consensus/inner.go +++ b/sei-tendermint/internal/autobahn/consensus/inner.go @@ -1,3 +1,79 @@ +// Persistence for consensus state. +// +// # What We Persist +// +// All consensus state is persisted atomically in a single A/B file pair (inner_a.pb/inner_b.pb): +// - CommitQC: justified entering the current index +// - TimeoutQC: justified entering the current view number +// - PrepareQC: needed for timeoutVote on restart +// - PrepareVote, CommitVote, TimeoutVote: this node's votes for the current view +// +// # Why We Persist +// +// Safety: Votes prevent double-voting on restart - a critical safety property. +// +// Liveness: View justification (QCs) enables fast view synchronization after +// cluster-wide outages. Without persisted QCs, lagging validators would be stuck. +// +// Example failure case without QC persistence: +// - All validators have CommitQC for index 4 +// - Within index 5, validators timeout multiple times (view numbers 0→1→2→3) +// - A, B, C reach view (5, 3) via TimeoutQC for view (5, 2) +// - D, E are slower, only at view (5, 2) via TimeoutQC for view (5, 1) +// - Cluster crashes +// - Without persisted QCs, nodes only have their timeout VOTES +// - A, B, C have timeout votes for (5, 2), D, E have timeout votes for (5, 1) +// - On restart, D, E need TimeoutQC(5, 1) to justify being at (5, 2) +// - But TimeoutQC(5, 1) requires 2/3 votes for view (5, 1) +// - Only D, E have those votes - not enough for quorum +// - D, E are stuck at view (5, 1), cannot advance +// +// With QC persistence: +// - A, B, C have TimeoutQC(5, 2) persisted - justifies view (5, 3) +// - D, E have TimeoutQC(5, 1) persisted - justifies view (5, 2) +// - On restart, everyone rebroadcasts their persisted QCs +// - D, E are already at (5, 2), they broadcast TimeoutQC(5, 1) (helps no one) +// - A, B, C broadcast TimeoutQC(5, 2) +// - D, E receive TimeoutQC(5, 2), can now advance to (5, 3) +// - Everyone converges to the highest view +// +// # stateDir Configuration +// +// At config level, stateDir is an Option[string]. NewState creates a persister when +// stateDir is Some(path). When None, no persister is created (persistence +// disabled - DANGEROUS, may lead to SLASHING if the node restarts and double-votes; +// only use for testing). When Some(path), the path must already exist and be +// writable (verified by writing a temp file at startup); returns error otherwise. +// TODO: surface the None warning in CLI --help (e.g. stream command or config docs). +// +// # Recovery Behavior +// +// - Fresh start (files don't exist): Logged at INFO level, node starts clean +// - Successful restore: Logged at INFO level with state details +// - One file corrupt (e.g. crash during write): Uses the other file; logged at WARN +// - Both files corrupt or unreadable: Returns error to caller +// - Inconsistent state: Returns error to caller with message indicating which field is corrupt +// Examples of inconsistent state: +// - Vote from a future view (how could we vote for a view we haven't reached?) +// - TimeoutQC at index > 0 without CommitQC (how did we advance past index 0?) +// - TimeoutQC at index > CommitQC.Index + 1 (how did we skip intermediate commits?) +// +// # Write Behavior +// +// - State directory must already exist (we do not create it). +// - Writes are synchronous. +// - Writes are idempotent, so retries on next state change are safe +// +// # Rebroadcasting +// +// On restart, the consensus layer propagates loaded state to output watches, +// which triggers rebroadcasting to peers: +// - Votes (prepareVote, commitVote, timeoutVote): YES - rebroadcast via sendUpdates +// - TimeoutQC: YES - rebroadcast via myTimeoutQC watch +// - CommitQC: NO - used locally for view justification but not rebroadcast; +// CommitQCs are served via StreamCommitQCs from the data layer, not from +// the persisted viewSpec. TODO: consider rebroadcasting CommitQC on restart +// to help peers sync faster after cluster-wide outages. package consensus import ( @@ -10,14 +86,14 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) -// Persisted state file prefix. +// Persisted state file prefix for consensus inner state. const innerFile = "inner" type inner struct { persistedInner } -// newInner creates the inner state from persisted data loaded by newPersister. +// newInner creates the inner state from persisted data loaded by NewPersister. // data is None on fresh start (persistence disabled or no prior state). // Returns error if persisted state is corrupt (see persistedInner.validate). func newInner(data utils.Option[[]byte], committee *types.Committee) (inner, error) { diff --git a/sei-tendermint/internal/autobahn/consensus/inner_test.go b/sei-tendermint/internal/autobahn/consensus/inner_test.go index 8b8a604e5e..430df96451 100644 --- a/sei-tendermint/internal/autobahn/consensus/inner_test.go +++ b/sei-tendermint/internal/autobahn/consensus/inner_test.go @@ -1,8 +1,13 @@ package consensus import ( + "context" + "errors" "testing" + "time" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" @@ -23,7 +28,7 @@ func testCommittee(keys ...types.SecretKey) *types.Committee { // seedPersistedInner is a test helper that persists a persistedInner using the public API. func seedPersistedInner(dir string, state *persistedInner) { - p, _, err := newPersister(dir, innerFile) + p, _, err := persist.NewPersister(dir, innerFile) if err != nil { panic(err) } @@ -33,9 +38,9 @@ func seedPersistedInner(dir string, state *persistedInner) { } // loadInner is a test helper that loads persisted data and creates inner. -// Mirrors what NewState does: newPersister → newInner. +// Mirrors what NewState does: NewPersister → newInner. func loadInner(dir string, committee *types.Committee) (inner, error) { - _, data, err := newPersister(dir, innerFile) + _, data, err := persist.NewPersister(dir, innerFile) if err != nil { return inner{}, err } @@ -920,3 +925,34 @@ func TestPushTimeoutQCClearsStaleState(t *testing.T) { require.False(t, newInner.CommitVote.IsPresent(), "commitVote should be cleared") require.False(t, newInner.TimeoutVote.IsPresent(), "timeoutVote should be cleared") } + +// failPersister is a Persister that always returns an error. +type failPersister struct{ err error } + +func (f failPersister) Persist([]byte) error { return f.err } + +func TestRunOutputsPersistErrorPropagates(t *testing.T) { + // Verify that a persist error in runOutputs propagates + // and terminates the consensus component (instead of panicking). + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + cs, err := NewState(&Config{ + Key: keys[0], + ViewTimeout: func(types.View) time.Duration { return time.Hour }, + }, ds) + require.NoError(t, err) + + // Inject a persister that always fails. + wantErr := errors.New("disk on fire") + cs.persister = utils.Some[persist.Persister](failPersister{err: wantErr}) + + // runOutputs should fail on the first Iter callback when it tries to persist. + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + err = cs.runOutputs(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "persist inner") + require.ErrorIs(t, err, wantErr) +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go new file mode 100644 index 0000000000..01042c6d6f --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go @@ -0,0 +1,234 @@ +// TODO: Block file persistence is a temporary solution. It does not handle many +// corner cases (e.g. disk full, partial directory listings, orphaned files after +// lane changes, no garbage collection on unclean shutdown). This will be replaced +// by proper storage solution before launch. + +package persist + +import ( + "context" + "encoding/hex" + "fmt" + "maps" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + + "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" +) + +// LoadedBlock is a block loaded from disk during state restoration. +type LoadedBlock struct { + Number types.BlockNumber + Proposal *types.Signed[*types.LaneProposal] +} + +// BlockPersister manages individual block files in a blocks/ subdirectory. +// Each block is stored as _.pb. +type BlockPersister struct { + dir string // full path to the blocks/ subdirectory + ch chan persistJob +} + +type persistJob struct { + lane types.LaneID + number types.BlockNumber + proposal *types.Signed[*types.LaneProposal] +} + +// persistQueueSize is the buffer for async block persistence. With 40 validators, +// a 1/3 Byzantine burst produces up to ~13 lanes × 30 blocks = 390 blocks at once. +// 512 covers that with margin. +const persistQueueSize = 512 + +// NewBlockPersister creates the blocks/ subdirectory if it doesn't exist and +// returns a block persister. Loads all persisted blocks from disk as sorted, +// contiguous slices per lane. Gaps from corrupt or missing files are resolved +// by truncating at the first gap; blocks after the gap will be re-fetched. +func NewBlockPersister(stateDir string) (*BlockPersister, map[types.LaneID][]LoadedBlock, error) { + dir := filepath.Join(stateDir, "blocks") + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) + } + + bp := &BlockPersister{dir: dir, ch: make(chan persistJob, persistQueueSize)} + blocks, err := bp.loadAll() + if err != nil { + return nil, nil, err + } + return bp, blocks, nil +} + +func blockFilename(lane types.LaneID, n types.BlockNumber) string { + return hex.EncodeToString(lane.Bytes()) + "_" + strconv.FormatUint(uint64(n), 10) + ".pb" +} + +func parseBlockFilename(name string) (types.LaneID, types.BlockNumber, error) { + name = strings.TrimSuffix(name, ".pb") + parts := strings.SplitN(name, "_", 2) + if len(parts) != 2 { + return types.PublicKey{}, 0, fmt.Errorf("bad block filename %q", name) + } + keyBytes, err := hex.DecodeString(parts[0]) + if err != nil { + return types.PublicKey{}, 0, fmt.Errorf("bad lane hex in %q: %w", name, err) + } + lane, err := types.PublicKeyFromBytes(keyBytes) + if err != nil { + return types.PublicKey{}, 0, fmt.Errorf("bad lane key in %q: %w", name, err) + } + n, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return types.PublicKey{}, 0, fmt.Errorf("bad block number in %q: %w", name, err) + } + return lane, types.BlockNumber(n), nil +} + +// PersistBlock writes a signed lane proposal to its own file. +func (bp *BlockPersister) PersistBlock(lane types.LaneID, n types.BlockNumber, proposal *types.Signed[*types.LaneProposal]) error { + pb := types.SignedMsgConv[*types.LaneProposal]().Encode(proposal) + data, err := proto.Marshal(pb) + if err != nil { + return fmt.Errorf("marshal block %s/%d: %w", lane, n, err) + } + path := filepath.Join(bp.dir, blockFilename(lane, n)) + return WriteAndSync(path, data) +} + +// DeleteBefore removes all persisted block files for lanes in the given map +// where the block number is below the map value. Scans the directory once. +// Best-effort: logs warnings on individual failures. +func (bp *BlockPersister) DeleteBefore(laneFirsts map[types.LaneID]types.BlockNumber) { + if len(laneFirsts) == 0 { + return + } + entries, err := os.ReadDir(bp.dir) + if err != nil { + log.Warn().Err(err).Str("dir", bp.dir).Msg("failed to list blocks dir for cleanup") + return + } + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + continue + } + lane, fileN, err := parseBlockFilename(entry.Name()) + if err != nil { + continue + } + first, ok := laneFirsts[lane] + if !ok || fileN >= first { + continue + } + path := filepath.Join(bp.dir, entry.Name()) + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + log.Warn().Err(err).Str("path", path).Msg("failed to delete block file") + } + } +} + +// Queue enqueues a block for async persistence. Blocks if the queue is full +// until space is available or ctx is cancelled. We must not drop blocks because +// the blockPersisted cursor advances sequentially — a hole would permanently +// stall voting on the affected lane. +func (bp *BlockPersister) Queue(ctx context.Context, lane types.LaneID, n types.BlockNumber, proposal *types.Signed[*types.LaneProposal]) error { + select { + case bp.ch <- persistJob{lane: lane, number: n, proposal: proposal}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Run drains the internal queue, fsyncs each block to disk, and calls +// onPersisted after each successful write. Blocks until ctx is cancelled. +func (bp *BlockPersister) Run(ctx context.Context, onPersisted func(types.LaneID, types.BlockNumber)) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case job := <-bp.ch: + if err := bp.PersistBlock(job.lane, job.number, job.proposal); err != nil { + return fmt.Errorf("persist block %s/%d: %w", job.lane, job.number, err) + } + onPersisted(job.lane, job.number) + } + } +} + +// loadAll loads all persisted blocks from the blocks/ directory. +// Returns sorted, contiguous slices per lane (truncated at the first gap). +func (bp *BlockPersister) loadAll() (map[types.LaneID][]LoadedBlock, error) { + entries, err := os.ReadDir(bp.dir) + if err != nil { + return nil, fmt.Errorf("read blocks dir %s: %w", bp.dir, err) + } + + raw := map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + continue + } + lane, n, err := parseBlockFilename(entry.Name()) + if err != nil { + log.Warn().Err(err).Str("file", entry.Name()).Msg("skipping unrecognized block file") + continue + } + proposal, err := loadBlockFile(filepath.Join(bp.dir, entry.Name())) + if err != nil { + // Corrupt or partially-written file (e.g. crash mid-write). + // Skip it — the block will be re-received from peers or re-produced. + log.Warn().Err(err).Str("file", entry.Name()).Msg("skipping corrupt block file") + continue + } + // Verify the block's header matches the filename. + h := proposal.Msg().Block().Header() + if h.Lane() != lane || h.BlockNumber() != n { + log.Warn(). + Str("file", entry.Name()). + Stringer("headerLane", h.Lane()). + Uint64("headerNum", uint64(h.BlockNumber())). + Stringer("filenameLane", lane). + Uint64("filenameNum", uint64(n)). + Msg("skipping block file with mismatched header") + continue + } + if raw[lane] == nil { + raw[lane] = map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + } + raw[lane][n] = proposal + log.Info().Str("lane", lane.String()).Uint64("block", uint64(n)).Msg("loaded persisted block") + } + + result := map[types.LaneID][]LoadedBlock{} + for lane, bs := range raw { + sorted := slices.Sorted(maps.Keys(bs)) + var contiguous []LoadedBlock + for i, n := range sorted { + if i > 0 && n != sorted[i-1]+1 { + log.Warn(). + Str("lane", lane.String()). + Uint64("gapAt", uint64(sorted[i-1]+1)). + Int("skipped", len(sorted)-i). + Msg("truncating loaded blocks at gap; remaining will be re-fetched") + break + } + contiguous = append(contiguous, LoadedBlock{Number: n, Proposal: bs[n]}) + } + result[lane] = contiguous + } + return result, nil +} + +func loadBlockFile(path string) (*types.Signed[*types.LaneProposal], error) { + data, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename; not user-controlled + if err != nil { + return nil, err + } + conv := types.SignedMsgConv[*types.LaneProposal]() + return conv.Unmarshal(data) +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go new file mode 100644 index 0000000000..1cb58415de --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go @@ -0,0 +1,277 @@ +package persist + +import ( + "encoding/hex" + "os" + "path/filepath" + "testing" + + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" +) + +func testSignedProposal(rng utils.Rng, key types.SecretKey, n types.BlockNumber) *types.Signed[*types.LaneProposal] { + lane := key.Public() + block := types.NewBlock(lane, n, types.GenBlockHeaderHash(rng), types.GenPayload(rng)) + return types.Sign(key, types.NewLaneProposal(block)) +} + +func TestNewBlockPersisterEmptyDir(t *testing.T) { + dir := t.TempDir() + bp, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.NotNil(t, bp) + require.Equal(t, 0, len(blocks)) + // blocks/ subdirectory should exist + fi, err := os.Stat(filepath.Join(dir, "blocks")) + require.NoError(t, err) + require.True(t, fi.IsDir()) +} + +func TestPersistBlockAndLoad(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + b0 := testSignedProposal(rng, key, 0) + b1 := testSignedProposal(rng, key, 1) + require.NoError(t, bp.PersistBlock(lane, 0, b0)) + require.NoError(t, bp.PersistBlock(lane, 1, b1)) + + bp2, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.NotNil(t, bp2) + require.Equal(t, 1, len(blocks), "should have 1 lane") + require.Equal(t, 2, len(blocks[lane]), "should have 2 blocks") + require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(1), blocks[lane][1].Number) + require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) + require.NoError(t, utils.TestDiff(b1, blocks[lane][1].Proposal)) +} + +func TestPersistBlockMultipleLanes(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + b1 := testSignedProposal(rng, key1, 0) + b2 := testSignedProposal(rng, key2, 0) + require.NoError(t, bp.PersistBlock(lane1, 0, b1)) + require.NoError(t, bp.PersistBlock(lane2, 0, b2)) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 2, len(blocks), "should have 2 lanes") + require.Equal(t, 1, len(blocks[lane1])) + require.Equal(t, 1, len(blocks[lane2])) + require.NoError(t, utils.TestDiff(b1, blocks[lane1][0].Proposal)) + require.NoError(t, utils.TestDiff(b2, blocks[lane2][0].Proposal)) +} + +func TestLoadSkipsCorruptBlockFile(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Write a good block + b0 := testSignedProposal(rng, key, 0) + require.NoError(t, bp.PersistBlock(lane, 0, b0)) + + // Write a corrupt file with a valid filename + corruptName := blockFilename(lane, 1) + require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane]), "should only load the valid block") + require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) +} + +func TestLoadCorruptMidSequenceTruncatesAtGap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Persist blocks 0, 2 (valid) and corrupt block 1. + // After skipping corrupt-1, raw has {0, 2} → gap at 1 → contiguous prefix [0]. + b0 := testSignedProposal(rng, key, 0) + b2 := testSignedProposal(rng, key, 2) + require.NoError(t, bp.PersistBlock(lane, 0, b0)) + require.NoError(t, bp.PersistBlock(lane, 2, b2)) + corruptName := blockFilename(lane, 1) + require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane]), "corrupt mid-sequence creates gap; only block 0 survives") + require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) + require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) +} + +func TestLoadTruncatesAtGap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Persist blocks 3, 4, 6, 7 (gap at 5). + for _, n := range []types.BlockNumber{3, 4, 6, 7} { + require.NoError(t, bp.PersistBlock(lane, n, testSignedProposal(rng, key, n))) + } + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 2, len(blocks[lane]), "should have contiguous prefix [3, 4]") + require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) +} + +func TestLoadSkipsMismatchedHeader(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Write block for lane1 but save it under lane2's filename + b := testSignedProposal(rng, key1, 5) + require.NoError(t, bp.PersistBlock(lane1, 5, b)) + + // Rename the file to use lane2 in the filename + oldPath := filepath.Join(dir, "blocks", blockFilename(lane1, 5)) + newPath := filepath.Join(dir, "blocks", blockFilename(lane2, 5)) + require.NoError(t, os.Rename(oldPath, newPath)) + + // Reload — should skip the mismatched file + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 0, len(blocks), "mismatched header should be skipped") +} + +func TestLoadSkipsUnrecognizedFilename(t *testing.T) { + dir := t.TempDir() + + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + _ = bp + + // Write files with bad names + blocksDir := filepath.Join(dir, "blocks") + require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "notablock.pb"), []byte("data"), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "readme.txt"), []byte("hi"), 0600)) + + // Reload — should skip both + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 0, len(blocks)) +} + +func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Persist blocks 0..4 + for i := types.BlockNumber(0); i < 5; i++ { + require.NoError(t, bp.PersistBlock(lane, i, testSignedProposal(rng, key, i))) + } + + // Delete blocks before 3 + bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3}) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") + require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) +} + +func TestDeleteBeforeMultipleLanes(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + // Lane1: blocks 0,1,2; Lane2: blocks 0,1,2 + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(lane1, i, testSignedProposal(rng, key1, i))) + require.NoError(t, bp.PersistBlock(lane2, i, testSignedProposal(rng, key2, i))) + } + + // Delete lane1 < 2, lane2 < 1 + bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 2, lane2: 1}) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") + require.Equal(t, types.BlockNumber(2), blocks[lane1][0].Number) + require.Equal(t, 2, len(blocks[lane2]), "lane2 should have blocks 1,2") + require.Equal(t, types.BlockNumber(1), blocks[lane2][0].Number) + require.Equal(t, types.BlockNumber(2), blocks[lane2][1].Number) +} + +func TestDeleteBeforeEmptyMap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) + lane := key.Public() + bp, _, err := NewBlockPersister(dir) + require.NoError(t, err) + + require.NoError(t, bp.PersistBlock(lane, 0, testSignedProposal(rng, key, 0))) + + // Empty map — should not delete anything + bp.DeleteBefore(map[types.LaneID]types.BlockNumber{}) + + _, blocks, err := NewBlockPersister(dir) + require.NoError(t, err) + require.Equal(t, 1, len(blocks[lane])) +} + +func TestBlockFilenameRoundTrip(t *testing.T) { + rng := utils.TestRng() + lane := types.GenSecretKey(rng).Public() + n := types.BlockNumber(42) + + name := blockFilename(lane, n) + parsedLane, parsedN, err := parseBlockFilename(name) + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(lane.Bytes()), hex.EncodeToString(parsedLane.Bytes())) + require.Equal(t, n, parsedN) +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist.go b/sei-tendermint/internal/autobahn/consensus/persist/persist.go similarity index 87% rename from sei-tendermint/internal/autobahn/consensus/persist.go rename to sei-tendermint/internal/autobahn/consensus/persist/persist.go index 16e72c693c..9a178607b8 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/persist.go @@ -1,14 +1,5 @@ // Crash-safe A/B file persistence. // -// # stateDir Configuration -// -// At config level, stateDir is an Option[string]. NewState creates a persister when -// stateDir is Some(path). When None, no persister is created (persistence -// disabled — DANGEROUS, may lead to SLASHING if the node restarts and double-votes; -// only use for testing). When Some(path), the path must already exist and be -// writable (verified by writing a temp file at startup); returns error otherwise. -// TODO: surface the None warning in CLI --help (e.g. stream command or config docs). -// // # A/B File Strategy // // We use an A/B file pair (_a.pb/_b.pb) instead of the traditional @@ -40,7 +31,7 @@ // - Writes are synchronous (fsync after each write). // - Writes are idempotent, so retries on next state change are safe. // - Seq is only advanced after a successful write (rollback on failure). -package consensus +package persist import ( "errors" @@ -55,9 +46,10 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) +// A/B file suffixes. const ( - suffixA = "_a.pb" - suffixB = "_b.pb" + SuffixA = "_a.pb" + SuffixB = "_b.pb" ) // ErrNoData is returned by loadPersisted when no persisted files exist for the prefix. @@ -84,11 +76,11 @@ type persister struct { seq uint64 } -// newPersister creates a crash-safe persister for the given directory and prefix. +// NewPersister creates a crash-safe persister for the given directory and prefix. // dir must already exist and be a directory (we do not create it); returns error otherwise. -// Also returns the loaded data (None on fresh start) for the caller to pass to newInner. +// Also returns the loaded data (None on fresh start) for the caller to decode. // This encapsulates all on-disk format details (A/B files, seq wrapper) in one place. -func newPersister(dir string, prefix string) (*persister, utils.Option[[]byte], error) { +func NewPersister(dir string, prefix string) (Persister, utils.Option[[]byte], error) { none := utils.None[[]byte]() fi, err := os.Stat(dir) @@ -116,7 +108,7 @@ func newPersister(dir string, prefix string) (*persister, utils.Option[[]byte], // Ensure both A/B files exist and are writable so Persist never creates new // directory entries. Empty files are treated as non-existent by loadWrapped, // so they won't interfere with loading on restart. - for _, suffix := range []string{suffixA, suffixB} { + for _, suffix := range []string{SuffixA, SuffixB} { path := filepath.Join(dir, prefix+suffix) f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled if err != nil { @@ -153,9 +145,9 @@ func (w *persister) Persist(data []byte) error { seq := w.seq + 1 // Odd seq → A, even seq → B. - suffix := suffixB + suffix := SuffixB if seq%2 == 1 { - suffix = suffixA + suffix = SuffixA } filename := w.prefix + suffix @@ -168,7 +160,7 @@ func (w *persister) Persist(data []byte) error { return fmt.Errorf("marshal wrapper: %w", err) } - if err := writeAndSync(w.dir, filename, bz); err != nil { + if err := WriteAndSync(filepath.Join(w.dir, filename), bz); err != nil { return fmt.Errorf("persist to %s: %w", filename, err) } w.seq = seq @@ -192,7 +184,7 @@ func loadWrapped(stateDir, filename string) (*pb.PersistedWrapper, error) { return nil, fmt.Errorf("read %s: %w", filename, err) } // Treat empty files as non-existent. A valid wrapper must contain at least - // a seq number. Empty files are created by newPersister to pre-populate + // a seq number. Empty files are created by NewPersister to pre-populate // directory entries so that Persist never needs to dir-sync. if len(bz) == 0 { return nil, os.ErrNotExist @@ -209,7 +201,7 @@ func loadWrapped(stateDir, filename string) (*pb.PersistedWrapper, error) { // so the validator can restart. Returns ErrNoData when no persisted files exist (use errors.Is). // Returns other error only when both files fail to load or state is inconsistent (same seq). func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) { - fileA, fileB := prefix+suffixA, prefix+suffixB + fileA, fileB := prefix+SuffixA, prefix+SuffixB wrapperA, errA := loadWrapped(dir, fileA) wrapperB, errB := loadWrapped(dir, fileB) @@ -254,10 +246,9 @@ func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) { } } -// writeAndSync writes data to file and fsyncs. No dir sync needed because -// newPersister pre-creates both A/B files at startup. -func writeAndSync(stateDir string, filename string, data []byte) error { - path := filepath.Join(stateDir, filename) +// WriteAndSync writes data to a file path and fsyncs. No dir sync needed because +// NewPersister pre-creates both A/B files at startup. +func WriteAndSync(path string, data []byte) error { f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled if err != nil { return err diff --git a/sei-tendermint/internal/autobahn/consensus/persist_test.go b/sei-tendermint/internal/autobahn/consensus/persist/persist_test.go similarity index 75% rename from sei-tendermint/internal/autobahn/consensus/persist_test.go rename to sei-tendermint/internal/autobahn/consensus/persist/persist_test.go index 1e34bc0acb..25b558188c 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/persist_test.go @@ -1,29 +1,24 @@ -package consensus +package persist import ( - "context" "errors" "os" "path/filepath" "runtime" "testing" - "time" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" ) func TestPersisterAlternates(t *testing.T) { dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) - // Both files should be pre-created (empty) by newPersister for dir-sync optimization. - _, errA := os.Stat(filepath.Join(dir, "test"+suffixA)) - _, errB := os.Stat(filepath.Join(dir, "test"+suffixB)) + // Both files should be pre-created (empty) by NewPersister for dir-sync optimization. + _, errA := os.Stat(filepath.Join(dir, "test"+SuffixA)) + _, errB := os.Stat(filepath.Join(dir, "test"+SuffixB)) require.NoError(t, errA, "A should be pre-created") require.NoError(t, errB, "B should be pre-created") @@ -45,7 +40,7 @@ func TestPersisterAlternates(t *testing.T) { func TestPersisterPicksHigherSeq(t *testing.T) { dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) // Write three times: A(seq=1), B(seq=2), A(seq=3) @@ -63,13 +58,13 @@ func TestLoadPersistedOneCorruptFileSucceeds(t *testing.T) { dir := t.TempDir() // Write to both files: A(seq=1), B(seq=2) - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("first"))) // seq=1, A require.NoError(t, w.Persist([]byte("second"))) // seq=2, B // Corrupt B (the winner) — should fall back to A - err = os.WriteFile(filepath.Join(dir, "test"+suffixB), []byte("corrupt"), 0600) + err = os.WriteFile(filepath.Join(dir, "test"+SuffixB), []byte("corrupt"), 0600) require.NoError(t, err) wrapper, err := loadPersisted(dir, "test") @@ -81,12 +76,12 @@ func TestLoadPersistedBothCorruptError(t *testing.T) { dir := t.TempDir() // Write valid wrapped data to A only - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("valid"))) // seq=1, A // Corrupt A (B is empty, treated as non-existent) — both files fail - err = os.WriteFile(filepath.Join(dir, "test"+suffixA), []byte("corrupt"), 0600) + err = os.WriteFile(filepath.Join(dir, "test"+SuffixA), []byte("corrupt"), 0600) require.NoError(t, err) _, err = loadPersisted(dir, "test") @@ -98,16 +93,16 @@ func TestNewPersisterOneCorruptFileSucceeds(t *testing.T) { dir := t.TempDir() // Write to both files: A(seq=1), B(seq=2) - w1, _, err := newPersister(dir, "test") + w1, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w1.Persist([]byte("first"))) // seq=1, A require.NoError(t, w1.Persist([]byte("second"))) // seq=2, B - // Corrupt B (the winner) — newPersister should still succeed using A - err = os.WriteFile(filepath.Join(dir, "test"+suffixB), []byte("corrupt"), 0600) + // Corrupt B (the winner) — NewPersister should still succeed using A + err = os.WriteFile(filepath.Join(dir, "test"+SuffixB), []byte("corrupt"), 0600) require.NoError(t, err) - w2, _, err := newPersister(dir, "test") + w2, _, err := NewPersister(dir, "test") require.NoError(t, err) // A won (seq=1), so next write goes to B (the corrupt/loser slot) @@ -128,7 +123,7 @@ func TestLoadPersistedEmptyDir(t *testing.T) { func TestNewPersisterInvalidDirError(t *testing.T) { // State dir must already exist; invalid (nonexistent or not a directory) returns error - _, _, err := newPersister("/nonexistent/path/that/does/not/exist", "test") + _, _, err := NewPersister("/nonexistent/path/that/does/not/exist", "test") require.Error(t, err) require.Contains(t, err.Error(), "invalid state dir") } @@ -140,12 +135,12 @@ func TestPersistWriteErrorReturnsError(t *testing.T) { t.Skip("chmod 000 on directory not reliable on Windows") } dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("data1"))) // Remove all permissions from dir so OpenFile fails with EACCES require.NoError(t, os.Chmod(dir, 0000)) - defer os.Chmod(dir, 0700) + defer os.Chmod(dir, 0700) //nolint:errcheck err = w.Persist([]byte("data2")) require.Error(t, err) } @@ -158,7 +153,7 @@ func TestPersistWriteErrorDoesNotAdvanceSeq(t *testing.T) { t.Skip("chmod 000 on directory not reliable on Windows") } dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) // Successful write: seq=1 → A @@ -191,15 +186,15 @@ func TestLoadPersistedOSErrorPropagates(t *testing.T) { dir := t.TempDir() // Write to both files: A(seq=1), B(seq=2) - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("first"))) // seq=1, A require.NoError(t, w.Persist([]byte("second"))) // seq=2, B // Make B unreadable (OS error, not corrupt data) - pathB := filepath.Join(dir, "test"+suffixB) + pathB := filepath.Join(dir, "test"+SuffixB) require.NoError(t, os.Chmod(pathB, 0000)) - defer os.Chmod(pathB, 0600) + defer os.Chmod(pathB, 0600) //nolint:errcheck // loadPersisted should fail — not silently fall back to A _, err = loadPersisted(dir, "test") @@ -213,13 +208,13 @@ func TestLoadPersistedCorruptFallsBack(t *testing.T) { // fall back to the other file — this is the crash recovery path. dir := t.TempDir() - w, _, err := newPersister(dir, "test") + w, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w.Persist([]byte("first"))) // seq=1, A require.NoError(t, w.Persist([]byte("second"))) // seq=2, B // Corrupt B (simulates crash mid-write) - err = os.WriteFile(filepath.Join(dir, "test"+suffixB), []byte("garbage"), 0600) + err = os.WriteFile(filepath.Join(dir, "test"+SuffixB), []byte("garbage"), 0600) require.NoError(t, err) // Should succeed using A, since B's error is ErrCorrupt (tolerable) @@ -232,7 +227,7 @@ func TestNewPersisterResumeSeq(t *testing.T) { dir := t.TempDir() // Create persister and write some data - w1, _, err := newPersister(dir, "test") + w1, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w1.Persist([]byte("data1"))) // seq=1, A require.NoError(t, w1.Persist([]byte("data2"))) // seq=2, B @@ -240,7 +235,7 @@ func TestNewPersisterResumeSeq(t *testing.T) { // Create new persister (simulates restart) // A has seq=3 (winner), so new persister should write to B first to preserve A - w2, _, err := newPersister(dir, "test") + w2, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w2.Persist([]byte("data4"))) // seq=4, B (preserves A) @@ -254,13 +249,13 @@ func TestNewPersisterPreservesWinner(t *testing.T) { dir := t.TempDir() // Write to both files: A=seq1, B=seq2 (B wins) - w1, _, err := newPersister(dir, "test") + w1, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w1.Persist([]byte("old"))) // seq=1, A require.NoError(t, w1.Persist([]byte("winner"))) // seq=2, B (winner) // New persister should write to A first (preserve B) - w2, _, err := newPersister(dir, "test") + w2, _, err := NewPersister(dir, "test") require.NoError(t, err) require.NoError(t, w2.Persist([]byte("new"))) // seq=3, A (preserves B) @@ -269,34 +264,3 @@ func TestNewPersisterPreservesWinner(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("new"), wrapper.GetData()) } - -// failPersister is a Persister that always returns an error. -type failPersister struct{ err error } - -func (f failPersister) Persist([]byte) error { return f.err } - -func TestRunOutputsPersistErrorPropagates(t *testing.T) { - // Verify that a persist error in runOutputs propagates - // and terminates the consensus component (instead of panicking). - rng := utils.TestRng() - committee, keys := types.GenCommittee(rng, 4) - ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) - - cs, err := NewState(&Config{ - Key: keys[0], - ViewTimeout: func(types.View) time.Duration { return time.Hour }, - }, ds) - require.NoError(t, err) - - // Inject a persister that always fails. - wantErr := errors.New("disk on fire") - cs.persister = utils.Some[Persister](failPersister{err: wantErr}) - - // runOutputs should fail on the first Iter callback when it tries to persist. - ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) - defer cancel() - err = cs.runOutputs(ctx) - require.Error(t, err) - require.Contains(t, err.Error(), "persist inner") - require.ErrorIs(t, err, wantErr) -} diff --git a/sei-tendermint/internal/autobahn/consensus/state.go b/sei-tendermint/internal/autobahn/consensus/state.go index a2e0930d8f..26290f07e9 100644 --- a/sei-tendermint/internal/autobahn/consensus/state.go +++ b/sei-tendermint/internal/autobahn/consensus/state.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/avail" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -44,7 +45,7 @@ type State struct { innerRecv utils.AtomicRecv[inner] // persister writes inner's persistedInner to disk when PersistentStateDir is set; None when disabled. - persister utils.Option[Persister] + persister utils.Option[persist.Persister] timeoutVotes utils.Mutex[*timeoutVotes] prepareVotes utils.Mutex[*prepareVotes] @@ -79,14 +80,14 @@ func (s *State) SubscribeTimeoutQC() utils.AtomicRecv[utils.Option[*types.Timeou func NewState(cfg *Config, data *data.State) (*State, error) { // Create persister first so newInner can receive the loaded data // instead of reading the files directly. - var pers utils.Option[Persister] + var pers utils.Option[persist.Persister] var persistedData utils.Option[[]byte] if dir, ok := cfg.PersistentStateDir.Get(); ok { - p, d, err := newPersister(dir, innerFile) + p, d, err := persist.NewPersister(dir, innerFile) if err != nil { - return nil, fmt.Errorf("newPersister: %w", err) + return nil, fmt.Errorf("NewPersister: %w", err) } - pers = utils.Some[Persister](p) + pers = utils.Some[persist.Persister](p) persistedData = d } @@ -95,11 +96,16 @@ func NewState(cfg *Config, data *data.State) (*State, error) { return nil, fmt.Errorf("newInner: %w", err) } + availState, err := avail.NewState(cfg.Key, data, cfg.PersistentStateDir) + if err != nil { + return nil, fmt.Errorf("avail.NewState: %w", err) + } + innerSend := utils.Alloc(utils.NewAtomicSend(initialInner)) s := &State{ cfg: cfg, // metrics: NewMetrics(), - avail: avail.NewState(cfg.Key, data), + avail: availState, inner: utils.NewMutex(innerSend), innerRecv: innerSend.Subscribe(), persister: pers,