Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions sei-tendermint/internal/autobahn/avail/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
)
Expand All @@ -14,23 +15,82 @@
commitQCs *queue[types.RoadIndex, *types.CommitQC]
blocks map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]]
votes map[types.LaneID]*queue[types.BlockNumber, blockVotes]
// blockPersisted tracks per-lane how far block persistence has progressed.
// RecvBatch only yields blocks below this cursor for voting.
// nil when persistence is disabled (testing); RecvBatch then uses q.next.
// blockPersisted itself is not persisted to disk: on restart it is
// reconstructed from the blocks already on disk (see newInner).
blockPersisted map[types.LaneID]types.BlockNumber
}

func newInner(c *types.Committee) *inner {
// loadedAvailState holds data loaded from disk on restart.
// nil means fresh start (no persisted data).
// blocks are sorted by number and contiguous (gaps already resolved by loader).
type loadedAvailState struct {
appQC utils.Option[*types.AppQC]
blocks map[types.LaneID][]persist.LoadedBlock
}

func newInner(c *types.Committee, loaded *loadedAvailState) *inner {
votes := map[types.LaneID]*queue[types.BlockNumber, blockVotes]{}
blocks := map[types.LaneID]*queue[types.BlockNumber, *types.Signed[*types.LaneProposal]]{}
for _, lane := range c.Lanes().All() {
votes[lane] = newQueue[types.BlockNumber, blockVotes]()
blocks[lane] = newQueue[types.BlockNumber, *types.Signed[*types.LaneProposal]]()
}
return &inner{

i := &inner{
latestAppQC: utils.None[*types.AppQC](),
latestCommitQC: utils.NewAtomicSend(utils.None[*types.CommitQC]()),
appVotes: newQueue[types.GlobalBlockNumber, appVotes](),
commitQCs: newQueue[types.RoadIndex, *types.CommitQC](),
blocks: blocks,
votes: votes,
}

if loaded == nil {
return i
}

// Restore AppQC and advance queues past already-processed indices.
i.latestAppQC = loaded.appQC
if aq, ok := loaded.appQC.Get(); ok {
// CommitQCs through this index have been processed; skip them.
i.commitQCs.first = aq.Proposal().RoadIndex() + 1
i.commitQCs.next = i.commitQCs.first
// AppVotes through this global block number have been processed.
i.appVotes.first = aq.Proposal().GlobalNumber() + 1
i.appVotes.next = i.appVotes.first
}

// Reconstruct blockPersisted from the blocks on disk.
i.blockPersisted = make(map[types.LaneID]types.BlockNumber, c.Lanes().Len())
for _, lane := range c.Lanes().All() {
i.blockPersisted[lane] = 0
}

// Restore persisted blocks into their lane queues.
for lane, bs := range loaded.blocks {
q, ok := i.blocks[lane]
if !ok || len(bs) == 0 {
continue
}
first := bs[0].Number
q.first = first
q.next = first
for _, b := range bs {
q.q[q.next] = b.Proposal
q.next++
}
i.blockPersisted[lane] = q.next
// Advance the votes queue to match so headers() returns ErrPruned
// for already-committed blocks instead of blocking forever.
vq := i.votes[lane]
vq.first = first
vq.next = first
}

return i
}

func (i *inner) laneQC(c *types.Committee, lane types.LaneID, n types.BlockNumber) (*types.LaneQC, bool) {
Expand Down
261 changes: 259 additions & 2 deletions sei-tendermint/internal/autobahn/avail/inner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
Expand All @@ -16,7 +17,8 @@ func TestPruneMismatchedIndices(t *testing.T) {
ds := data.NewState(&data.Config{
Committee: committee,
}, utils.None[data.BlockStore]())
state := NewState(keys[0], ds)
state, err := NewState(keys[0], ds, utils.None[string]())
require.NoError(t, err)

// Helper to create a CommitQC for a specific index
makeQC := func(index types.RoadIndex, prev utils.Option[*types.CommitQC]) *types.CommitQC {
Expand Down Expand Up @@ -48,7 +50,7 @@ func TestPruneMismatchedIndices(t *testing.T) {
appQC1 := types.NewAppQC(makeAppVotes(keys, appProposal1))

// Now call PushAppQC with appQC1 (index 1) and qc0 (index 0)
err := state.PushAppQC(appQC1, qc0)
err = state.PushAppQC(appQC1, qc0)
require.Error(t, err)
require.Contains(t, err.Error(), "mismatched QCs")

Expand All @@ -63,3 +65,258 @@ func TestPruneMismatchedIndices(t *testing.T) {
require.False(t, inner.latestAppQC.IsPresent(), "latestAppQC should not have been updated")
}
}

// testSignedBlock creates a signed lane proposal for a given lane, block number, and parent hash.
func testSignedBlock(key types.SecretKey, lane types.LaneID, n types.BlockNumber, parent types.BlockHeaderHash, rng utils.Rng) *types.Signed[*types.LaneProposal] {
block := types.NewBlock(lane, n, parent, types.GenPayload(rng))
return types.Sign(key, types.NewLaneProposal(block))
}

func TestNewInnerFreshStart(t *testing.T) {
rng := utils.TestRng()
committee, _ := types.GenCommittee(rng, 4)

i := newInner(committee, nil)

require.False(t, i.latestAppQC.IsPresent())
require.Equal(t, types.RoadIndex(0), i.commitQCs.first)
require.Equal(t, types.RoadIndex(0), i.commitQCs.next)
require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.first)
require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.next)
for _, lane := range committee.Lanes().All() {
require.Equal(t, types.BlockNumber(0), i.blocks[lane].first)
require.Equal(t, types.BlockNumber(0), i.blocks[lane].next)
require.Equal(t, types.BlockNumber(0), i.votes[lane].first)
require.Equal(t, types.BlockNumber(0), i.votes[lane].next)
}
}

func TestNewInnerLoadedAppQCAdvancesQueues(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)

roadIdx := types.RoadIndex(5)
globalNum := types.GlobalBlockNumber(42)
appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng))
appQC := types.NewAppQC(makeAppVotes(keys, appProposal))

loaded := &loadedAvailState{
appQC: utils.Some[*types.AppQC](appQC),
}

i := newInner(committee, loaded)

// latestAppQC should be restored.
aq, ok := i.latestAppQC.Get()
require.True(t, ok)
require.Equal(t, roadIdx, aq.Proposal().RoadIndex())
require.Equal(t, globalNum, aq.Proposal().GlobalNumber())

// commitQCs queue should skip past the loaded AppQC's road index.
require.Equal(t, roadIdx+1, i.commitQCs.first)
require.Equal(t, roadIdx+1, i.commitQCs.next)

// appVotes queue should skip past the loaded AppQC's global block number.
require.Equal(t, globalNum+1, i.appVotes.first)
require.Equal(t, globalNum+1, i.appVotes.next)
}

func TestNewInnerLoadedAppQCNone(t *testing.T) {
rng := utils.TestRng()
committee, _ := types.GenCommittee(rng, 4)

loaded := &loadedAvailState{
appQC: utils.None[*types.AppQC](),
}

i := newInner(committee, loaded)

// No AppQC loaded, queues should start at 0.
require.False(t, i.latestAppQC.IsPresent())
require.Equal(t, types.RoadIndex(0), i.commitQCs.first)
require.Equal(t, types.GlobalBlockNumber(0), i.appVotes.first)
}

func TestNewInnerLoadedBlocksContiguous(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)
lane := keys[0].Public()

// Build 3 contiguous blocks: 5, 6, 7.
var parent types.BlockHeaderHash
var bs []persist.LoadedBlock
for n := types.BlockNumber(5); n < 8; n++ {
b := testSignedBlock(keys[0], lane, n, parent, rng)
parent = b.Msg().Block().Header().Hash()
bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b})
}

loaded := &loadedAvailState{
appQC: utils.None[*types.AppQC](),
blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs},
}

i := newInner(committee, loaded)

q := i.blocks[lane]
require.Equal(t, types.BlockNumber(5), q.first)
require.Equal(t, types.BlockNumber(8), q.next)
for j, b := range bs {
require.Equal(t, b.Proposal, q.q[types.BlockNumber(5)+types.BlockNumber(j)])
}

// Votes queue should be aligned.
vq := i.votes[lane]
require.Equal(t, types.BlockNumber(5), vq.first)
require.Equal(t, types.BlockNumber(5), vq.next)
}

func TestNewInnerLoadedBlocksContiguousPrefix(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)
lane := keys[0].Public()

// Loader already resolved the gap: only contiguous prefix [3, 4] is passed.
var parent types.BlockHeaderHash
var bs []persist.LoadedBlock
for _, n := range []types.BlockNumber{3, 4} {
b := testSignedBlock(keys[0], lane, n, parent, rng)
parent = b.Msg().Block().Header().Hash()
bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b})
}

loaded := &loadedAvailState{
appQC: utils.None[*types.AppQC](),
blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs},
}

i := newInner(committee, loaded)

q := i.blocks[lane]
require.Equal(t, types.BlockNumber(3), q.first)
require.Equal(t, types.BlockNumber(5), q.next)
require.Equal(t, bs[0].Proposal, q.q[types.BlockNumber(3)])
require.Equal(t, bs[1].Proposal, q.q[types.BlockNumber(4)])
}

func TestNewInnerLoadedBlocksEmptySlice(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)
lane := keys[0].Public()

loaded := &loadedAvailState{
appQC: utils.None[*types.AppQC](),
blocks: map[types.LaneID][]persist.LoadedBlock{lane: {}},
}

i := newInner(committee, loaded)

q := i.blocks[lane]
require.Equal(t, types.BlockNumber(0), q.first)
require.Equal(t, types.BlockNumber(0), q.next)
}

func TestNewInnerLoadedBlocksUnknownLane(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)

unknownKey := types.GenSecretKey(rng)
unknownLane := unknownKey.Public()

b := testSignedBlock(unknownKey, unknownLane, 0, types.BlockHeaderHash{}, rng)
loaded := &loadedAvailState{
appQC: utils.None[*types.AppQC](),
blocks: map[types.LaneID][]persist.LoadedBlock{unknownLane: {{Number: 0, Proposal: b}}},
}

i := newInner(committee, loaded)

for _, lane := range committee.Lanes().All() {
q := i.blocks[lane]
require.Equal(t, types.BlockNumber(0), q.first)
require.Equal(t, types.BlockNumber(0), q.next)
}
_ = keys
}

func TestNewInnerLoadedAppQCAndBlocks(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)
lane := keys[0].Public()

roadIdx := types.RoadIndex(3)
globalNum := types.GlobalBlockNumber(10)
appProposal := types.NewAppProposal(globalNum, roadIdx, types.GenAppHash(rng))
appQC := types.NewAppQC(makeAppVotes(keys, appProposal))

var parent types.BlockHeaderHash
var bs []persist.LoadedBlock
for n := types.BlockNumber(7); n < 9; n++ {
b := testSignedBlock(keys[0], lane, n, parent, rng)
parent = b.Msg().Block().Header().Hash()
bs = append(bs, persist.LoadedBlock{Number: n, Proposal: b})
}

loaded := &loadedAvailState{
appQC: utils.Some[*types.AppQC](appQC),
blocks: map[types.LaneID][]persist.LoadedBlock{lane: bs},
}

i := newInner(committee, loaded)

aq, ok := i.latestAppQC.Get()
require.True(t, ok)
require.Equal(t, roadIdx, aq.Proposal().RoadIndex())

require.Equal(t, roadIdx+1, i.commitQCs.first)
require.Equal(t, globalNum+1, i.appVotes.first)

q := i.blocks[lane]
require.Equal(t, types.BlockNumber(7), q.first)
require.Equal(t, types.BlockNumber(9), q.next)

vq := i.votes[lane]
require.Equal(t, types.BlockNumber(7), vq.first)
require.Equal(t, types.BlockNumber(7), vq.next)
}

func TestNewInnerLoadedBlocksMultipleLanes(t *testing.T) {
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)
lane0 := keys[0].Public()
lane1 := keys[1].Public()

var parent0 types.BlockHeaderHash
var bs0 []persist.LoadedBlock
for n := types.BlockNumber(2); n < 4; n++ {
b := testSignedBlock(keys[0], lane0, n, parent0, rng)
parent0 = b.Msg().Block().Header().Hash()
bs0 = append(bs0, persist.LoadedBlock{Number: n, Proposal: b})
}

var parent1 types.BlockHeaderHash
var bs1 []persist.LoadedBlock
for n := types.BlockNumber(0); n < 3; n++ {
b := testSignedBlock(keys[1], lane1, n, parent1, rng)
parent1 = b.Msg().Block().Header().Hash()
bs1 = append(bs1, persist.LoadedBlock{Number: n, Proposal: b})
}

loaded := &loadedAvailState{
appQC: utils.None[*types.AppQC](),
blocks: map[types.LaneID][]persist.LoadedBlock{lane0: bs0, lane1: bs1},
}

i := newInner(committee, loaded)

q0 := i.blocks[lane0]
require.Equal(t, types.BlockNumber(2), q0.first)
require.Equal(t, types.BlockNumber(4), q0.next)

q1 := i.blocks[lane1]
require.Equal(t, types.BlockNumber(0), q1.first)
require.Equal(t, types.BlockNumber(3), q1.next)

require.Equal(t, types.BlockNumber(2), i.votes[lane0].first)
require.Equal(t, types.BlockNumber(0), i.votes[lane1].first)
}
Loading
Loading