diff --git a/sei-tendermint/abci/types/types.go b/sei-tendermint/abci/types/types.go index d4411ce7b0..cb4edbb6ee 100644 --- a/sei-tendermint/abci/types/types.go +++ b/sei-tendermint/abci/types/types.go @@ -232,12 +232,12 @@ type ResponseCheckTxV2 struct { *ResponseCheckTx // helper properties for prioritization in mempool + IsEVM bool EVMNonce uint64 // EVM and sei addresses are both derived from the sender's public key. // TODO(gprusak): include just the secp256k1 public key and let the CheckTx caller derive evm/sei address on their own. EVMSenderAddress common.Address SeiSenderAddress []byte - IsEVM bool EVMRequiredBalance *big.Int } diff --git a/sei-tendermint/cmd/tendermint/commands/gen_autobahn_config.go b/sei-tendermint/cmd/tendermint/commands/gen_autobahn_config.go index c269e91ca0..b7e7c88215 100644 --- a/sei-tendermint/cmd/tendermint/commands/gen_autobahn_config.go +++ b/sei-tendermint/cmd/tendermint/commands/gen_autobahn_config.go @@ -82,13 +82,11 @@ Output is written to the file specified by --output.`, } cfg := config.AutobahnFileConfig{ - Validators: validators, - MaxTxsPerBlock: 5_000, - MempoolSize: 5_000, - BlockInterval: utils.Duration(400 * time.Millisecond), - AllowEmptyBlocks: true, - ViewTimeout: utils.Duration(1500 * time.Millisecond), - DialInterval: utils.Duration(10 * time.Second), + Validators: validators, + MaxTxsPerBlock: 5_000, + BlockInterval: utils.Duration(400 * time.Millisecond), + ViewTimeout: utils.Duration(1500 * time.Millisecond), + DialInterval: utils.Duration(10 * time.Second), } // The flag defaults to "data/autobahn" so persistence is on without // operator action. node/setup.go rootifies the relative path against diff --git a/sei-tendermint/config/autobahn.go b/sei-tendermint/config/autobahn.go index ea92919f86..483af4dbfd 100644 --- a/sei-tendermint/config/autobahn.go +++ b/sei-tendermint/config/autobahn.go @@ -45,9 +45,7 @@ type AutobahnFileConfig struct { Validators []AutobahnValidator `json:"validators"` MaxTxsPerBlock uint64 `json:"max_txs_per_block"` MaxTxsPerSecond utils.Option[uint64] `json:"max_txs_per_second"` - MempoolSize uint64 `json:"mempool_size"` BlockInterval utils.Duration `json:"block_interval"` - AllowEmptyBlocks bool `json:"allow_empty_blocks"` ViewTimeout utils.Duration `json:"view_timeout"` PersistentStateDir utils.Option[string] `json:"persistent_state_dir"` DialInterval utils.Duration `json:"dial_interval"` @@ -61,9 +59,6 @@ func (fc *AutobahnFileConfig) Validate() error { if fc.MaxTxsPerBlock == 0 { return errors.New("max_txs_per_block must be > 0") } - if fc.MempoolSize == 0 { - return errors.New("mempool_size must be > 0") - } if fc.BlockInterval <= 0 { return errors.New("block_interval must be > 0") } diff --git a/sei-tendermint/internal/autobahn/autobahn.proto b/sei-tendermint/internal/autobahn/autobahn.proto index c64fdc119a..9d4f052feb 100644 --- a/sei-tendermint/internal/autobahn/autobahn.proto +++ b/sei-tendermint/internal/autobahn/autobahn.proto @@ -74,12 +74,11 @@ message BlockHeader { } message Payload { + reserved "edge_count", "coinbase", "basefee"; + reserved 3, 4, 5; option (hashable.hashable) = true; optional Timestamp created_at = 1; // required optional uint64 total_gas = 2; // required - optional int64 edge_count = 3; // required - optional bytes coinbase = 4; // required - optional int64 basefee = 5; // required repeated bytes txs = 6; } diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index b4c122c063..356b776106 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -40,6 +40,10 @@ type State struct { persisters persisters } +func (s *State) PublicKey() types.PublicKey { + return s.key.Public() +} + // persisters holds all disk persistence components. Either all are present // (real I/O) or all are no-op (testing). It is a pure I/O struct — all inner // state access goes through State methods. @@ -521,11 +525,11 @@ func (s *State) fullCommitQC(ctx context.Context, n types.RoadIndex) (*types.Ful } // WaitForCapacity waits until the given lane has enough capacity for a new block. -func (s *State) WaitForCapacity(ctx context.Context, lane types.LaneID) error { +func (s *State) WaitForCapacity(ctx context.Context, toProduce types.BlockNumber) error { + lane := s.key.Public() for inner, ctrl := range s.inner.Lock() { - q := inner.blocks[lane] if err := ctrl.WaitUntil(ctx, func() bool { - return q.next < inner.persistedBlockStart[lane]+BlocksPerLane + return toProduce < inner.persistedBlockStart[lane]+BlocksPerLane }); err != nil { return err } @@ -564,12 +568,12 @@ func (s *State) WaitForLaneQCs( // ProduceBlock appends a new block to the producers lane. // Blocks until the lane has enough capacity for the new block. -func (s *State) ProduceBlock(ctx context.Context, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { - return s.produceBlock(ctx, s.key, payload) +func (s *State) ProduceBlock(n types.BlockNumber, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { + return s.produceBlock(n, s.key, payload) } // TODO: produceBlock is a separate function for testing - consider improving the tests to use ProduceBlock only. -func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { +func (s *State) produceBlock(n types.BlockNumber, key types.SecretKey, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { lane := key.Public() var result *types.Signed[*types.LaneProposal] for inner, ctrl := range s.inner.Lock() { @@ -577,10 +581,11 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload * if !ok { return nil, ErrBadLane } - if err := ctrl.WaitUntil(ctx, func() bool { - return q.next < inner.persistedBlockStart[lane]+BlocksPerLane - }); err != nil { - return nil, err + if n >= inner.persistedBlockStart[lane]+BlocksPerLane { + return nil, fmt.Errorf("lane full") + } + if q.next != n { + return nil, fmt.Errorf("unexpected block number: got %v, want %v", n, q.next) } var parent types.BlockHeaderHash if q.first < q.next { diff --git a/sei-tendermint/internal/autobahn/avail/state_test.go b/sei-tendermint/internal/autobahn/avail/state_test.go index 0529d14605..324f86f074 100644 --- a/sei-tendermint/internal/autobahn/avail/state_test.go +++ b/sei-tendermint/internal/autobahn/avail/state_test.go @@ -128,7 +128,7 @@ func testState(t *testing.T, stateDir utils.Option[string]) { lane := key.Public() p := types.GenPayload(rng) want[lane] = append(want[lane], p.Hash()) - b, err := state.produceBlock(ctx, key, p) + b, err := state.produceBlock(state.NextBlock(lane), key, p) if err != nil { return fmt.Errorf("state.ProduceBlock(): %w", err) } @@ -254,7 +254,7 @@ func TestStateRestartFromPersisted(t *testing.T) { for range 5 { key := keys[rng.Intn(len(keys))] - if _, err := state.produceBlock(ctx, key, types.GenPayload(rng)); err != nil { + if _, err := state.produceBlock(state.NextBlock(key.Public()), key, types.GenPayload(rng)); err != nil { return fmt.Errorf("produceBlock: %w", err) } } @@ -340,7 +340,6 @@ func TestStateMismatchedQCs(t *testing.T) { }, utils.OrPanic1(data.NewDataWAL(utils.None[string](), committee)))) state, err := NewState(keys[0], ds, utils.None[string]()) require.NoError(t, err) - ctx := t.Context() // Helper to create a CommitQC for a specific index makeQC := func(prev utils.Option[*types.CommitQC], laneQCs map[types.LaneID]*types.LaneQC) *types.CommitQC { @@ -364,7 +363,7 @@ func TestStateMismatchedQCs(t *testing.T) { // 1. Produce a block so we have a non-empty range lane := keys[0].Public() p := types.GenPayload(rng) - b, err := state.ProduceBlock(ctx, p) + b, err := state.ProduceBlock(state.NextBlock(lane), p) require.NoError(t, err) // 2. Form a LaneQC for it @@ -398,7 +397,7 @@ func TestPushBlockRejectsBadParentHash(t *testing.T) { state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]())) // Produce a valid first block on our lane. - _, err := state.ProduceBlock(ctx, types.GenPayload(rng)) + _, err := state.ProduceBlock(state.NextBlock(keys[0].Public()), types.GenPayload(rng)) require.NoError(t, err) // Create a second block with a fake parentHash. diff --git a/sei-tendermint/internal/autobahn/consensus/state.go b/sei-tendermint/internal/autobahn/consensus/state.go index 0790ee9548..ed92d9ff03 100644 --- a/sei-tendermint/internal/autobahn/consensus/state.go +++ b/sei-tendermint/internal/autobahn/consensus/state.go @@ -154,18 +154,6 @@ func (s *State) commitQC() utils.AtomicRecv[utils.Option[*types.CommitQC]] { panic("unreachable") } -// WaitForCapacity waits until a new block can be produced by this node. -func (s *State) WaitForCapacity(ctx context.Context) error { - return s.avail.WaitForCapacity(ctx, s.cfg.Key.Public()) -} - -// ProduceBlock produces a new block with the given payload. -// Returns ErrNoCapacity if there is currently no capacity for the next block. -// Run WaitForCapacity before calling ProduceBlock. -func (s *State) ProduceBlock(ctx context.Context, payload *types.Payload) (*types.Signed[*types.LaneProposal], error) { - return s.avail.ProduceBlock(ctx, payload) -} - // PushProposal processes an unverified FullProposal message. func (s *State) PushProposal(ctx context.Context, proposal *types.FullProposal) error { return s.pushProposal(ctx, proposal) diff --git a/sei-tendermint/internal/autobahn/data/state.go b/sei-tendermint/internal/autobahn/data/state.go index 89b3b537bb..c971386b48 100644 --- a/sei-tendermint/internal/autobahn/data/state.go +++ b/sei-tendermint/internal/autobahn/data/state.go @@ -145,8 +145,9 @@ type appProposalWithTimestamp struct { } type inner struct { - qcs map[types.GlobalBlockNumber]*types.FullCommitQC // [first,nextQC) - blocks map[types.GlobalBlockNumber]*types.Block // [first,nextBlock) + subset of [nextBlock,nextQC) + qcs map[types.GlobalBlockNumber]*types.FullCommitQC // [first,nextQC) + blocks map[types.GlobalBlockNumber]*types.Block // [first,nextBlock) + subset of [nextBlock,nextQC) + // appProposal[n] contains appProposal block >=n. appProposals map[types.GlobalBlockNumber]appProposalWithTimestamp // [first,nextAppProposal) // blockHashes is a hash → height index mirroring blocks. Maintained @@ -565,6 +566,9 @@ func (s *State) PushAppHash(ctx context.Context, n types.GlobalBlockNumber, hash proposal: proposal, timestamp: t, } + // TODO(gprusak): this will be problematic on restart, + // nextAppProposal should be initiated wrt current application height, + // so that we don't iterate over all blocks in storage on startup. for inner.nextAppProposal <= n { b := inner.blocks[inner.nextAppProposal] latency := t.Sub(b.Payload().CreatedAt()).Seconds() @@ -594,6 +598,43 @@ func (s *State) AppProposal(ctx context.Context, n types.GlobalBlockNumber) (*ty panic("unreachable") } +func (i *inner) nextToExecute(lane types.LaneID) types.BlockNumber { + // TODO(gprusak): decide whether 0 is a good result in this case in general. + if i.first == i.nextAppProposal { + return 0 + } + n := i.nextAppProposal - 1 + r := i.qcs[n].QC().LaneRange(lane) + // TODO: this header can be actually extracted from FullCommitQC, so consider moving all this logic there. + h := i.blocks[n].Header() + x := lane.Compare(h.Lane()) + // NOTE: here we assume the specific ordering of lane blocks in the CommitQC: + // TODO(gprusak): move this logic closer to CommitQC + switch { + case x < 0: + return r.Next() + case x > 0: + return r.First() + default: + return h.BlockNumber() + 1 + } +} + +// Waits until lane block n is executed, returns the next block of this lane to be executed (>n) +func (s *State) WaitUntilExecuted(ctx context.Context, lane types.LaneID, n types.BlockNumber) (types.BlockNumber, error) { + for inner, ctrl := range s.inner.Lock() { + for { + if next := inner.nextToExecute(lane); n < next { + return next, nil + } + if err := ctrl.Wait(ctx); err != nil { + return 0, err + } + } + } + panic("unreachable") +} + // PruneBefore removes blocks, QCs, and AppProposals before retainFrom. // Blocks at retainFrom and above are kept. Per-block pruning may split // a QC range; this is handled on recovery (NewState skips partial QC prefixes). diff --git a/sei-tendermint/internal/autobahn/pb/autobahn.pb.go b/sei-tendermint/internal/autobahn/pb/autobahn.pb.go index 6085e0b2bb..aa7d59194c 100644 --- a/sei-tendermint/internal/autobahn/pb/autobahn.pb.go +++ b/sei-tendermint/internal/autobahn/pb/autobahn.pb.go @@ -542,11 +542,8 @@ func (x *BlockHeader) GetPayloadHash() []byte { type Payload struct { state protoimpl.MessageState `protogen:"open.v1"` - CreatedAt *Timestamp `protobuf:"bytes,1,opt,name=created_at,json=createdAt,proto3,oneof" json:"created_at,omitempty"` // required - TotalGas *uint64 `protobuf:"varint,2,opt,name=total_gas,json=totalGas,proto3,oneof" json:"total_gas,omitempty"` // required - EdgeCount *int64 `protobuf:"varint,3,opt,name=edge_count,json=edgeCount,proto3,oneof" json:"edge_count,omitempty"` // required - Coinbase []byte `protobuf:"bytes,4,opt,name=coinbase,proto3,oneof" json:"coinbase,omitempty"` // required - Basefee *int64 `protobuf:"varint,5,opt,name=basefee,proto3,oneof" json:"basefee,omitempty"` // required + CreatedAt *Timestamp `protobuf:"bytes,1,opt,name=created_at,json=createdAt,proto3,oneof" json:"created_at,omitempty"` // required + TotalGas *uint64 `protobuf:"varint,2,opt,name=total_gas,json=totalGas,proto3,oneof" json:"total_gas,omitempty"` // required Txs [][]byte `protobuf:"bytes,6,rep,name=txs,proto3" json:"txs,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -596,27 +593,6 @@ func (x *Payload) GetTotalGas() uint64 { return 0 } -func (x *Payload) GetEdgeCount() int64 { - if x != nil && x.EdgeCount != nil { - return *x.EdgeCount - } - return 0 -} - -func (x *Payload) GetCoinbase() []byte { - if x != nil { - return x.Coinbase - } - return nil -} - -func (x *Payload) GetBasefee() int64 { - if x != nil && x.Basefee != nil { - return *x.Basefee - } - return 0 -} - func (x *Payload) GetTxs() [][]byte { if x != nil { return x.Txs @@ -1963,23 +1939,16 @@ const file_autobahn_autobahn_proto_rawDesc = "" + "\x05_laneB\x0f\n" + "\r_block_numberB\x0e\n" + "\f_parent_hashB\x0f\n" + - "\r_payload_hash\"\xa7\x02\n" + + "\r_payload_hash\"\xcc\x01\n" + "\aPayload\x127\n" + "\n" + "created_at\x18\x01 \x01(\v2\x13.autobahn.TimestampH\x00R\tcreatedAt\x88\x01\x01\x12 \n" + - "\ttotal_gas\x18\x02 \x01(\x04H\x01R\btotalGas\x88\x01\x01\x12\"\n" + - "\n" + - "edge_count\x18\x03 \x01(\x03H\x02R\tedgeCount\x88\x01\x01\x12\x1f\n" + - "\bcoinbase\x18\x04 \x01(\fH\x03R\bcoinbase\x88\x01\x01\x12\x1d\n" + - "\abasefee\x18\x05 \x01(\x03H\x04R\abasefee\x88\x01\x01\x12\x10\n" + + "\ttotal_gas\x18\x02 \x01(\x04H\x01R\btotalGas\x88\x01\x01\x12\x10\n" + "\x03txs\x18\x06 \x03(\fR\x03txs:\x06Ȉ\xe2\xab\f\x01B\r\n" + "\v_created_atB\f\n" + "\n" + - "_total_gasB\r\n" + - "\v_edge_countB\v\n" + - "\t_coinbaseB\n" + - "\n" + - "\b_basefee\"\x8c\x01\n" + + "_total_gasJ\x04\b\x03\x10\x04J\x04\b\x04\x10\x05J\x04\b\x05\x10\x06R\n" + + "edge_countR\bcoinbaseR\abasefee\"\x8c\x01\n" + "\x05Block\x122\n" + "\x06header\x18\x01 \x01(\v2\x15.autobahn.BlockHeaderH\x00R\x06header\x88\x01\x01\x120\n" + "\apayload\x18\x02 \x01(\v2\x11.autobahn.PayloadH\x01R\apayload\x88\x01\x01:\x06Ȉ\xe2\xab\f\x01B\t\n" + diff --git a/sei-tendermint/internal/autobahn/producer/mempool.go b/sei-tendermint/internal/autobahn/producer/mempool.go new file mode 100644 index 0000000000..ea95aaf2e6 --- /dev/null +++ b/sei-tendermint/internal/autobahn/producer/mempool.go @@ -0,0 +1,174 @@ +package producer + +import ( + "context" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + tmtypes "github.com/sei-protocol/sei-chain/sei-tendermint/types" +) + +var errTooLarge = errors.New("transaction too large") +var errBadNonce = errors.New("bad nonce") + +type blockSpec struct { + gasEstimated uint64 + gasWanted uint64 + sizeBytes uint64 + txs [][]byte + // nonces of accounts which are expected to be bumped by this block. + // They are checked against the app state after the block is executed. + evmNonces map[common.Address]uint64 +} + +type mempool struct { + capacity uint64 + first types.BlockNumber + next types.BlockNumber + blocks map[types.BlockNumber]*blockSpec + nextBlock *blockSpec + evmNonces map[common.Address]uint64 +} + +func (m *mempool) IsFull() bool { + return uint64(m.next-m.first) >= m.capacity && len(m.nextBlock.txs) > 0 +} + +func (m *mempool) CanPushBlock() bool { + return uint64(m.next-m.first) < m.capacity && len(m.nextBlock.txs) > 0 +} + +func (m *mempool) PushBlock() { + m.blocks[m.next] = m.nextBlock + m.next += 1 + m.nextBlock = &blockSpec{ + evmNonces: map[common.Address]uint64{}, + } +} + +// TODO(gprusak): this rpc is probably unused, but if it is +// consider whether unsequenced/unexecuted lane txs should be included here. +func (s *State) UnconfirmedTxs() [][]byte { + for m := range s.mempool.Lock() { + return m.nextBlock.txs + } + panic("uneachable") +} + +func (s *State) EvmNextPendingNonce(addr common.Address) uint64 { + for m := range s.mempool.Lock() { + if nonce, ok := m.evmNonces[addr]; ok { + return nonce + } + } + return s.cfg.App.EvmNonce(addr) +} + +func (s *State) mempoolFirst() types.BlockNumber { + for m := range s.mempool.Lock() { + return m.first + } + panic("unreachable") +} + +// Removes txs from mempool assigned to lane blocks m.next shouldn't really happen, + // because local mempool is the only source of local lane blocks, + // but we handle it gracefully anyway. + m.next = max(m.next, n) + } +} + +// Inserts transaction. Blocks until there is capacity in the mempool. +func (s *State) InsertTx(ctx context.Context, tx tmtypes.Tx) (*abci.ResponseCheckTx, error) { + if uint64(len(tx)) > types.MaxTxsBytesPerBlock { + return nil, errTooLarge + } + resp, err := s.cfg.App.CheckTxSafe(ctx, &abci.RequestCheckTxV2{Tx: tx}) + if err != nil { + return nil, err + } + if !resp.IsOK() { + return resp.ResponseCheckTx, nil + } + gasWanted := utils.Clamp[uint64](resp.GasWanted) + if gasWanted > s.cfg.MaxGasPerBlock { + return nil, errTooLarge + } + + for m, ctrl := range s.mempool.Lock() { + // mempool is constructed as a FIFO - we do not delay insertions of large txs (going over cap) + // in favor of waiting for smaller txs. This simple algorithm allows us to cap + // pending txs to size of a single block. We can refine this rule later if needed. + if err := ctrl.WaitUntil(ctx, func() bool { return !m.IsFull() }); err != nil { + return nil, err + } + if resp.IsEVM { + addr := resp.EVMSenderAddress + nonce, ok := m.evmNonces[addr] + if !ok { + nonce = s.cfg.App.EvmNonce(addr) + } + if nonce != resp.EVMNonce { + return nil, fmt.Errorf("%w: got %v, want %v", errBadNonce, resp.EVMNonce, nonce) + } + m.evmNonces[addr] = nonce + 1 + } + // If any limit would be exceeded, then construct a payload. + ok := uint64(len(m.nextBlock.txs))+1 <= s.cfg.maxTxsPerBlock() + ok = ok && m.nextBlock.sizeBytes+uint64(len(tx)) <= types.MaxTxsBytesPerBlock + ok = ok && m.nextBlock.gasWanted+gasWanted <= s.cfg.MaxGasPerBlock + if !ok { + m.PushBlock() + } + if len(m.nextBlock.txs) == 0 { + // We notify that we start a new block. + ctrl.Updated() + } + + // Normalize the gas estimate. + gasEstimated := resp.GasEstimated + if gasEstimated < minTxGas || gasEstimated > resp.GasWanted { + gasEstimated = resp.GasWanted + } + b := m.nextBlock + b.gasEstimated += utils.Clamp[uint64](gasEstimated) + b.gasWanted += utils.Clamp[uint64](resp.GasWanted) + b.sizeBytes += uint64(len(tx)) + b.txs = append(b.txs, tx) + if resp.IsEVM { + addr := resp.EVMSenderAddress + b.evmNonces[addr] = m.evmNonces[addr] + } + } + return resp.ResponseCheckTx, nil +} diff --git a/sei-tendermint/internal/autobahn/producer/mempool_test.go b/sei-tendermint/internal/autobahn/producer/mempool_test.go new file mode 100644 index 0000000000..284ac3d92a --- /dev/null +++ b/sei-tendermint/internal/autobahn/producer/mempool_test.go @@ -0,0 +1,548 @@ +package producer + +import ( + "context" + "encoding/binary" + "errors" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" + "github.com/stretchr/testify/require" +) + +type sealTrigger uint8 + +const ( + sealByCount sealTrigger = iota + sealByBytes + sealByGas +) + +type txSpec struct { + tx []byte + gasWanted int64 + gasEstimated int64 +} + +type evmTxSpec struct { + tx []byte + sender common.Address + nonce uint64 +} + +type overflowSpec struct { + count bool + bytes bool + gas bool +} + +type sealScenario struct { + countLimit uint64 + gasLimit uint64 + sealed [][]txSpec + overflow []overflowSpec + open []txSpec + allTxs [][]byte + specsByTx map[string]txSpec +} + +type gasWantedApp struct { + abci.BaseApplication + gasWanted int64 +} + +func (a gasWantedApp) CheckTx(_ context.Context, _ *abci.RequestCheckTxV2) *abci.ResponseCheckTxV2 { + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + GasWanted: a.gasWanted, + }, + } +} + +type rejectingApp struct { + abci.BaseApplication + resp *abci.ResponseCheckTx +} + +func (a rejectingApp) CheckTx(_ context.Context, _ *abci.RequestCheckTxV2) *abci.ResponseCheckTxV2 { + return &abci.ResponseCheckTxV2{ResponseCheckTx: a.resp} +} + +type acceptingApp struct { + abci.BaseApplication + resp *abci.ResponseCheckTx +} + +func (a acceptingApp) CheckTx(_ context.Context, _ *abci.RequestCheckTxV2) *abci.ResponseCheckTxV2 { + return &abci.ResponseCheckTxV2{ResponseCheckTx: a.resp} +} + +type txSpecApp struct { + abci.BaseApplication + specs map[string]txSpec +} + +func (a txSpecApp) CheckTx(_ context.Context, req *abci.RequestCheckTxV2) *abci.ResponseCheckTxV2 { + spec := a.specs[string(req.Tx)] + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + GasWanted: spec.gasWanted, + GasEstimated: spec.gasEstimated, + }, + } +} + +type evmTxSpecApp struct { + abci.BaseApplication + baseNonces map[common.Address]uint64 +} + +func (a evmTxSpecApp) CheckTx(_ context.Context, req *abci.RequestCheckTxV2) *abci.ResponseCheckTxV2 { + spec := decodeEvmTxSpec(req.Tx) + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + GasWanted: 50, + GasEstimated: 40, + }, + IsEVM: true, + EVMNonce: spec.nonce, + EVMSenderAddress: spec.sender, + SeiSenderAddress: []byte("sender"), + EVMRequiredBalance: big.NewInt(1), + } +} + +func (a evmTxSpecApp) EvmNonce(addr common.Address) uint64 { + return a.baseNonces[addr] +} + +func encodeEvmTx(sender common.Address, nonce uint64) []byte { + tx := make([]byte, common.AddressLength+8) + copy(tx, sender.Bytes()) + binary.BigEndian.PutUint64(tx[common.AddressLength:], nonce) + return tx +} + +func decodeEvmTxSpec(tx []byte) evmTxSpec { + return evmTxSpec{ + tx: tx, + sender: common.BytesToAddress(tx[:common.AddressLength]), + nonce: binary.BigEndian.Uint64(tx[common.AddressLength:]), + } +} + +func newSealScenario(t *testing.T, rng utils.Rng) sealScenario { + const ( + wantSealedBlocks = 24 + minTriggerCoverage = 3 + maxAttempts = 64 + gasScale = 5_000 + unitMax = 16 + ) + for range maxAttempts { + countLimit := uint64(8 + rng.Intn(5)) + avgUnitsPerTx := (unitMax + 1) / 2 + byteBudgetUnits := max(unitMax+1, int(countLimit)*avgUnitsPerTx+rng.Intn(2*int(countLimit)+1)-int(countLimit)) + gasBudgetUnits := max(unitMax+1, int(countLimit)*avgUnitsPerTx+rng.Intn(2*int(countLimit)+1)-int(countLimit)) + byteScale := max(1, int(types.MaxTxsBytesPerBlock)/byteBudgetUnits) + gasLimit := uint64(gasScale * gasBudgetUnits) + var seq uint64 + specs := make([]txSpec, 0, wantSealedBlocks*int(countLimit)) + for len(specs) < wantSealedBlocks*int(countLimit)*4 { + specs = append(specs, randomTxSpec(rng, &seq, byteScale, gasScale, unitMax)) + } + scenario := simulateSealScenario(countLimit, gasLimit, specs) + if len(scenario.sealed) < wantSealedBlocks { + continue + } + scenario = truncateSealScenario(scenario, wantSealedBlocks) + countHits, byteHits, gasHits := 0, 0, 0 + for _, overflow := range scenario.overflow { + if overflow.count { + countHits += 1 + } + if overflow.bytes { + byteHits += 1 + } + if overflow.gas { + gasHits += 1 + } + } + if countHits >= minTriggerCoverage && byteHits >= minTriggerCoverage && gasHits >= minTriggerCoverage { + return scenario + } + } + t.Fatal("failed to generate seal scenario with sufficient trigger coverage") + return sealScenario{} +} + +func truncateSealScenario(scenario sealScenario, wantSealedBlocks int) sealScenario { + sealed := append([][]txSpec(nil), scenario.sealed[:wantSealedBlocks]...) + overflow := append([]overflowSpec(nil), scenario.overflow[:wantSealedBlocks]...) + open := scenario.open + if wantSealedBlocks < len(scenario.sealed) { + open = scenario.sealed[wantSealedBlocks] + } + allTxs := make([][]byte, 0) + specsByTx := map[string]txSpec{} + for _, block := range sealed { + for _, spec := range block { + allTxs = append(allTxs, spec.tx) + specsByTx[string(spec.tx)] = spec + } + } + for _, spec := range open { + allTxs = append(allTxs, spec.tx) + specsByTx[string(spec.tx)] = spec + } + return sealScenario{ + countLimit: scenario.countLimit, + gasLimit: scenario.gasLimit, + sealed: sealed, + overflow: overflow, + open: append([]txSpec(nil), open...), + allTxs: allTxs, + specsByTx: specsByTx, + } +} + +func randomTxSpec(rng utils.Rng, seq *uint64, byteScale, gasScale, unitMax int) txSpec { + sizeUnits := 1 + rng.Intn(unitMax) + gasUnits := 1 + rng.Intn(unitMax) + size := sizeUnits * byteScale + gasWanted := int64(gasUnits * gasScale) + tx := make([]byte, size) + binary.BigEndian.PutUint64(tx[:8], *seq) + copy(tx[8:], utils.GenBytes(rng, size-8)) + *seq += 1 + gasEstimated := gasWanted + if gasWanted > 1 { + gasEstimated = gasWanted - int64(rng.Intn(int(gasWanted-1))) + } + return txSpec{tx: tx, gasWanted: gasWanted, gasEstimated: gasEstimated} +} + +func simulateSealScenario(countLimit, gasLimit uint64, specs []txSpec) sealScenario { + current := make([]txSpec, 0, countLimit) + sealed := make([][]txSpec, 0) + overflow := make([]overflowSpec, 0) + allTxs := make([][]byte, 0, len(specs)) + specsByTx := make(map[string]txSpec, len(specs)) + + for _, spec := range specs { + allTxs = append(allTxs, spec.tx) + specsByTx[string(spec.tx)] = spec + if len(current) > 0 { + o := blockOverflow(current, spec, countLimit, gasLimit) + if o.count || o.bytes || o.gas { + sealed = append(sealed, append([]txSpec(nil), current...)) + overflow = append(overflow, o) + current = current[:0] + } + } + current = append(current, spec) + } + return sealScenario{ + countLimit: countLimit, + gasLimit: gasLimit, + sealed: sealed, + overflow: overflow, + open: append([]txSpec(nil), current...), + allTxs: allTxs, + specsByTx: specsByTx, + } +} + +func blockOverflow(block []txSpec, next txSpec, countLimit, gasLimit uint64) overflowSpec { + size, gas := blockTotals(block) + return overflowSpec{ + count: uint64(len(block))+1 > countLimit, + bytes: size+uint64(len(next.tx)) > uint64(types.MaxTxsBytesPerBlock), + gas: gas+uint64(next.gasWanted) > gasLimit, + } +} + +func blockTotals(block []txSpec) (size uint64, gas uint64) { + for _, spec := range block { + size += uint64(len(spec.tx)) + gas += uint64(spec.gasWanted) + } + return size, gas +} + +func txsOf(block []txSpec) [][]byte { + txs := make([][]byte, len(block)) + for i, spec := range block { + txs[i] = spec.tx + } + return txs +} + +func assertSealScenario( + t *testing.T, + state *State, + firstBlock types.BlockNumber, + scenario sealScenario, +) { + t.Helper() + + lane := state.consensus.Avail().PublicKey() + for i := range scenario.sealed { + block, err := state.consensus.Avail().Block(t.Context(), lane, firstBlock+types.BlockNumber(i)) + require.NoError(t, err) + require.Equal(t, txsOf(scenario.sealed[i]), block.Msg().Block().Payload().Txs()) + } + require.Equal(t, txsOf(scenario.open), state.UnconfirmedTxs()) + require.Len(t, scenario.overflow, len(scenario.sealed)) + for i, sealed := range scenario.sealed { + size, gas := blockTotals(sealed) + require.LessOrEqual(t, uint64(len(sealed)), scenario.countLimit) + require.LessOrEqual(t, size, uint64(types.MaxTxsBytesPerBlock)) + require.LessOrEqual(t, gas, scenario.gasLimit) + var nextFirst txSpec + if i+1 < len(scenario.sealed) { + nextFirst = scenario.sealed[i+1][0] + } else { + nextFirst = scenario.open[0] + } + require.Equal(t, blockOverflow(sealed, nextFirst, scenario.countLimit, scenario.gasLimit), scenario.overflow[i]) + require.True(t, scenario.overflow[i].count || scenario.overflow[i].bytes || scenario.overflow[i].gas) + } + + for m := range state.mempool.Lock() { + require.Equal(t, firstBlock, m.first) + require.Equal(t, firstBlock+types.BlockNumber(len(scenario.sealed)), m.next) + require.Len(t, m.blocks, len(scenario.sealed)) + for i := range scenario.sealed { + require.Equal(t, txsOf(scenario.sealed[i]), m.blocks[firstBlock+types.BlockNumber(i)].txs) + } + require.Equal(t, txsOf(scenario.open), m.nextBlock.txs) + return + } + t.Fatal("unreachable") +} + +func newTestState(t *testing.T, app abci.Application) *State { + t.Helper() + + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 3) + dataState := utils.OrPanic1(data.NewState( + &data.Config{Committee: committee}, + utils.OrPanic1(data.NewDataWAL(utils.None[string](), committee)), + )) + consensusState := utils.OrPanic1(consensus.NewState(&consensus.Config{ + Key: keys[0], + ViewTimeout: func(types.View) time.Duration { return time.Hour }, + PersistentStateDir: utils.None[string](), + }, dataState)) + + return NewState(&Config{ + MaxGasPerBlock: types.MaxTxsBytesPerBlock, + MaxTxsPerBlock: types.MaxTxsPerBlock, + BlockInterval: time.Hour, + MaxTxsPerSecond: utils.None[uint64](), + App: proxy.New(app, proxy.NopMetrics()), + }, consensusState) +} + +func TestInsertTxRejectsTooLargeTransaction(t *testing.T) { + state := newTestState(t, abci.BaseApplication{}) + tx := make([]byte, types.MaxTxsBytesPerBlock+1) + + resp, err := state.InsertTx(t.Context(), tx) + + require.Nil(t, resp) + require.ErrorIs(t, err, errTooLarge) + require.True(t, errors.Is(err, errTooLarge)) +} + +func TestInsertTxRejectsGasWantedAboveBlockLimit(t *testing.T) { + state := newTestState(t, gasWantedApp{gasWanted: 101}) + state.cfg.MaxGasPerBlock = 100 + + resp, err := state.InsertTx(t.Context(), []byte("tx")) + + require.Nil(t, resp) + require.ErrorIs(t, err, errTooLarge) +} + +func TestInsertTxReturnsRejectedCheckTxWithoutEnqueueing(t *testing.T) { + wantResp := &abci.ResponseCheckTx{ + Code: 1, + Log: "rejected", + } + state := newTestState(t, rejectingApp{resp: wantResp}) + + gotResp, err := state.InsertTx(t.Context(), []byte("tx")) + + require.NoError(t, err) + require.Same(t, wantResp, gotResp) + require.Empty(t, state.UnconfirmedTxs()) +} + +func TestInsertTxAppendsAcceptedTransactionToOpenBlock(t *testing.T) { + wantResp := &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + GasWanted: 50, + GasEstimated: 40, + } + state := newTestState(t, acceptingApp{resp: wantResp}) + tx := []byte("tx1") + + gotResp, err := state.InsertTx(t.Context(), tx) + + require.NoError(t, err) + require.Same(t, wantResp, gotResp) + require.Equal(t, [][]byte{tx}, state.UnconfirmedTxs()) +} + +func TestInsertTxSealsCurrentBlockWhenTxCountWouldOverflow(t *testing.T) { + rng := utils.TestRng() + scenario := newSealScenario(t, rng) + state := newTestState(t, txSpecApp{specs: scenario.specsByTx}) + state.cfg.MaxTxsPerBlock = scenario.countLimit + state.cfg.MaxGasPerBlock = scenario.gasLimit + lane := state.consensus.Avail().PublicKey() + firstBlock := state.consensus.Avail().NextBlock(lane) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + s.SpawnBg(func() error { return utils.IgnoreCancel(state.Run(ctx)) }) + + for _, tx := range scenario.allTxs { + resp, err := state.InsertTx(ctx, tx) + require.NoError(t, err) + require.Equal(t, scenario.specsByTx[string(tx)].gasWanted, resp.GasWanted) + } + assertSealScenario(t, state, firstBlock, scenario) + return nil + }) + require.NoError(t, err) +} + +func TestInsertTxRequiresEVMNonceOrderAcrossAccountsAndBlocks(t *testing.T) { + rng := utils.TestRng() + accountCount := 3 + rng.Intn(2) + blockSize := 2 + rng.Intn(2) + goodCount := 2*blockSize + 1 + rng.Intn(blockSize+1) + + accounts := make([]common.Address, accountCount) + baseNonces := make(map[common.Address]uint64, accountCount) + expectedNonces := make(map[common.Address]uint64, accountCount) + perAccountAccepted := make(map[common.Address]int, accountCount) + for i := range accountCount { + accounts[i] = common.BytesToAddress(utils.GenBytes(rng, common.AddressLength)) + baseNonces[accounts[i]] = uint64(rng.Intn(20)) + expectedNonces[accounts[i]] = baseNonces[accounts[i]] + } + + type attempt struct { + spec evmTxSpec + isBad bool + } + attempts := make([]attempt, 0, 2*goodCount) + good := make([]evmTxSpec, 0, goodCount) + + newTx := func(sender common.Address, nonce uint64, label byte) evmTxSpec { + _ = label + return evmTxSpec{ + tx: encodeEvmTx(sender, nonce), + sender: sender, + nonce: nonce, + } + } + badNonce := func(sender common.Address, want uint64) uint64 { + switch rng.Intn(3) { + case 0: + return want + 1 + uint64(rng.Intn(3)) + case 1: + if want == 0 { + return 1 + uint64(rng.Intn(3)) + } + return want - 1 + default: + if want > baseNonces[sender] { + return baseNonces[sender] + uint64(rng.Intn(int(want-baseNonces[sender]))) + } + return want + 2 + uint64(rng.Intn(2)) + } + } + + for i := range goodCount { + sender := accounts[rng.Intn(len(accounts))] + want := expectedNonces[sender] + if i > 0 || want > 0 { + bad := newTx(sender, badNonce(sender, want), 'b') + attempts = append(attempts, attempt{spec: bad, isBad: true}) + } + ok := newTx(sender, want, 'g') + attempts = append(attempts, attempt{spec: ok}) + good = append(good, ok) + expectedNonces[sender] = want + 1 + perAccountAccepted[sender] += 1 + } + + state := newTestState(t, evmTxSpecApp{baseNonces: baseNonces}) + state.cfg.MaxTxsPerBlock = uint64(blockSize) + + currentExpected := make(map[common.Address]uint64, len(baseNonces)) + for addr, nonce := range baseNonces { + currentExpected[addr] = nonce + } + assertPendingNonces := func() { + t.Helper() + for addr, nonce := range currentExpected { + require.Equal(t, nonce, state.EvmNextPendingNonce(addr)) + } + } + for _, x := range attempts { + resp, err := state.InsertTx(t.Context(), x.spec.tx) + if x.isBad { + require.Nil(t, resp) + require.ErrorIs(t, err, errBadNonce) + assertPendingNonces() + continue + } + require.NoError(t, err) + require.EqualValues(t, 50, resp.GasWanted) + currentExpected[x.spec.sender] += 1 + assertPendingNonces() + } + + assertPendingNonces() + + sealedBlocks := (len(good) - 1) / blockSize + openStart := sealedBlocks * blockSize + require.Equal(t, txsOfEVM(good[openStart:]), state.UnconfirmedTxs()) + + for m := range state.mempool.Lock() { + require.Equal(t, sealedBlocks, len(m.blocks)) + for i := range sealedBlocks { + from := i * blockSize + to := from + blockSize + require.Equal(t, txsOfEVM(good[from:to]), m.blocks[m.first+types.BlockNumber(i)].txs) + } + require.Equal(t, txsOfEVM(good[openStart:]), m.nextBlock.txs) + return + } + t.Fatal("unreachable") +} + +func txsOfEVM(specs []evmTxSpec) [][]byte { + txs := make([][]byte, len(specs)) + for i, spec := range specs { + txs[i] = spec.tx + } + return txs +} diff --git a/sei-tendermint/internal/autobahn/producer/state.go b/sei-tendermint/internal/autobahn/producer/state.go index 07fff9e134..d2617e1e70 100644 --- a/sei-tendermint/internal/autobahn/producer/state.go +++ b/sei-tendermint/internal/autobahn/producer/state.go @@ -5,9 +5,11 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/avail" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" "golang.org/x/time/rate" @@ -15,125 +17,137 @@ import ( // Config is the config of the block scope. type Config struct { - MaxGasPerBlock uint64 - MaxTxsPerBlock uint64 - MaxTxsPerSecond utils.Option[uint64] - MempoolSize uint64 - BlockInterval time.Duration - AllowEmptyBlocks bool + MaxGasPerBlock uint64 + MaxTxsPerBlock uint64 + // Delay after which a non-full block can be produced. + BlockInterval time.Duration + // TESTONLY: max rate at which lane is produced. It can be used to do + // benchmarks with stable throughput, in case execution performance degrades + // when overloaded. + MaxTxsPerSecond utils.Option[uint64] + App *proxy.Proxy } -// minTxGas is the minimum gas cost of an evm tx. const minTxGas = 21000 func (c *Config) maxTxsPerBlock() uint64 { - return min(c.MaxTxsPerBlock, c.MaxGasPerBlock/minTxGas) -} - -// MaxGasPerBlockI64 returns MaxGasPerBlock clamped to the int64 range. -// Config validation only enforces > 0 (sei-tendermint/config/autobahn.go), -// so a misconfigured chain with a value above math.MaxInt64 can't silently -// overflow when consumed by APIs that take int64 (the mempool's ReapLimits, -// the RPC layer's ConsensusParamUpdates.Block.MaxGas). Centralizing the -// clamp here means callers pick this up by name instead of repeating -// utils.Clamp[int64] at every site, and any future change to the clamp -// rule (or the underlying field type) lives in one place. -func (c *Config) MaxGasPerBlockI64() int64 { - return utils.Clamp[int64](c.MaxGasPerBlock) + return min(types.MaxTxsPerBlock, c.MaxTxsPerBlock) } // State is the block producer state. type State struct { - cfg *Config - txMempool *mempool.TxMempool + cfg *Config + mempool utils.Watch[*mempool] // consensus state to which published blocks will be reported. consensus *consensus.State } // NewState constructs a new block producer state. // Returns an error if the current node is NOT a producer. -func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.State) *State { +func NewState(cfg *Config, consensus *consensus.State) *State { + lane := consensus.Avail().PublicKey() + n := consensus.Avail().NextBlock(lane) return &State{ - cfg: cfg, - txMempool: txMempool, + cfg: cfg, + mempool: utils.NewWatch(&mempool{ + capacity: avail.BlocksPerLane, + first: n, + next: n, + blocks: map[types.BlockNumber]*blockSpec{}, + nextBlock: &blockSpec{evmNonces: map[common.Address]uint64{}}, + evmNonces: map[common.Address]uint64{}, + }), consensus: consensus, } } -// makePayload constructs payload for the next produced block. -// It waits for any transactions OR until `cfg.BlockInterval` passes. -func (s *State) makePayload(ctx context.Context) (*types.Payload, error) { - // Wait for transactions. We give up and produce an empty block if mempool is empty for - // cfg.BlockInterval. - _ = utils.WithTimeout(ctx, s.cfg.BlockInterval, func(ctx context.Context) error { - return s.txMempool.WaitForTxs(ctx) - }) - // If the context has been cancelled though, we just fail. - if err := ctx.Err(); err != nil { - return nil, err - } - - txs, gasEstimated := s.txMempool.ReapTxs(mempool.ReapLimits{ - MaxTxs: utils.Some(min(types.MaxTxsPerBlock, s.cfg.maxTxsPerBlock())), - MaxBytes: utils.Some(utils.Clamp[int64](types.MaxTxsBytesPerBlock)), - MaxGasWanted: utils.Some(s.cfg.MaxGasPerBlockI64()), - MaxGasEstimated: utils.Some(s.cfg.MaxGasPerBlockI64()), - }, true) - payloadTxs := make([][]byte, 0, len(txs)) - for _, tx := range txs { - payloadTxs = append(payloadTxs, tx) - } - payload, err := types.PayloadBuilder{ - CreatedAt: time.Now(), - TotalGas: uint64(gasEstimated), // nolint:gosec // always non-negative - Txs: payloadTxs, - }.Build() - // This should never happen: we construct the payload from correctly sized data. - if err != nil { - panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err)) - } - return payload, nil -} - -// nextPayload constructs the payload for the next block. -// Wrapper of makePayload which ensures that the block is not empty (if required). -func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) { - for { - payload, err := s.makePayload(ctx) - if err != nil { - return nil, err - } - if len(payload.Txs()) > 0 || s.cfg.AllowEmptyBlocks { - return payload, nil - } - } -} - -// Run runs the background tasks of the producer state. +// Run runs the background tasks of the producer state: +// * prunes executed lane blocks from mempool +// * pushes new lane blocks from mempool to avail state +// Note that mempool capacity bounds the number of unexecuted blocks of the local lane. +// This is needed so that we can track the evm nonces of sequenced txs - mempool admits txs +// sequentially in the nonce order. func (s *State) Run(ctx context.Context) error { return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error { - // Construct blocks from mempool. - limit := rate.Inf - burst := 1 - if l, ok := s.cfg.MaxTxsPerSecond.Get(); ok { - limit = rate.Limit(l) - burst = int(l + s.cfg.MaxTxsPerBlock) // nolint:gosec - } - limiter := rate.NewLimiter(limit, burst) - for { - if err := s.consensus.WaitForCapacity(ctx); err != nil { - return fmt.Errorf("s.Data().WaitForCapacity(): %w", err) - } - payload, err := s.nextPayload(ctx) - if err != nil { - return fmt.Errorf("s.nextPayload(): %w", err) + availState := s.consensus.Avail() + lane := availState.PublicKey() + firstBlock := s.mempoolFirst() + scope.Spawn(func() error { + // Task pruning executed lane blocks from the mempool + dataState := s.consensus.Data() + var err error + for toExecute := firstBlock; ; { + if toExecute, err = dataState.WaitUntilExecuted(ctx, lane, toExecute); err != nil { + return err + } + s.pruneMempool(toExecute) } - if _, err := s.consensus.ProduceBlock(ctx, payload); err != nil { - return fmt.Errorf("s.Data().PushBlock(): %w", err) + }) + scope.Spawn(func() error { + // Task pushing blocks from mempool to avail state. + limit := rate.Inf + burst := 1 + if l, ok := s.cfg.MaxTxsPerSecond.Get(); ok { + limit = rate.Limit(l) + burst = int(l + s.cfg.MaxTxsPerBlock) // nolint:gosec } - if err := limiter.WaitN(ctx, len(payload.Txs())); err != nil { - return fmt.Errorf("limiter(): %w", err) + limiter := rate.NewLimiter(limit, burst) + lastBlockTime := time.Now() + for toProduce := firstBlock; ; toProduce += 1 { + if err := availState.WaitForCapacity(ctx, toProduce); err != nil { + return fmt.Errorf("availState.WaitForCapacity(): %w", err) + } + var payload *types.Payload + // Wait until either + // * there is a full proposal in mempool + // * BlockInterval since the last block passed AND mempool is non-empty + for m, ctrl := range s.mempool.Lock() { + // Wait for full payload with timeout. + if err := utils.WithDeadline(ctx, utils.Some(lastBlockTime.Add(s.cfg.BlockInterval)), func(ctx context.Context) error { + return ctrl.WaitUntil(ctx, func() bool { return toProduce < m.next }) + }); err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + // Wait for non-empty payload. + if err := ctrl.WaitUntil(ctx, func() bool { + return toProduce < m.next || (toProduce == m.next && m.CanPushBlock()) + }); err != nil { + return err + } + // Seal the payload if needed. + if toProduce == m.next { + m.PushBlock() + } + } + b, ok := m.blocks[toProduce] + if !ok { + // Block number tracking should always be in sync between avail state and mempool: + // * mempool keeps blocks until they are executed. + // * blocks can be executed only after they are included in the lane. + // * lane is populated from the mempool. + return fmt.Errorf("mempool mismatched block production") + } + var err error + payload, err = types.PayloadBuilder{ + CreatedAt: time.Now(), + TotalGas: b.gasEstimated, + Txs: b.txs, + }.Build() + if err != nil { + // This should never happen: we construct the payload from correctly sized data. + panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err)) + } + } + if _, err := availState.ProduceBlock(toProduce, payload); err != nil { + return fmt.Errorf("availState.ProduceBlock(): %w", err) + } + lastBlockTime = time.Now() + if err := limiter.WaitN(ctx, len(payload.Txs())); err != nil { + return fmt.Errorf("limiter(): %w", err) + } } - } + }) + return nil }) } diff --git a/sei-tendermint/internal/autobahn/types/block.go b/sei-tendermint/internal/autobahn/types/block.go index 8f39301bb9..58a6eb2041 100644 --- a/sei-tendermint/internal/autobahn/types/block.go +++ b/sei-tendermint/internal/autobahn/types/block.go @@ -160,9 +160,6 @@ type PayloadHash hashable.Hash[*pb.Payload] type PayloadBuilder struct { CreatedAt time.Time TotalGas uint64 - EdgeCount int64 - Coinbase []byte - Basefee int64 Txs [][]byte } @@ -196,15 +193,6 @@ func (p *Payload) CreatedAt() time.Time { return p.p.CreatedAt } // TotalGas . func (p *Payload) TotalGas() uint64 { return p.p.TotalGas } -// EdgeCount . -func (p *Payload) EdgeCount() int64 { return p.p.EdgeCount } - -// Coinbase . -func (p *Payload) Coinbase() []byte { return p.p.Coinbase } - -// Basefee . -func (p *Payload) Basefee() int64 { return p.p.Basefee } - // Txs . func (p *Payload) Txs() [][]byte { return p.p.Txs } @@ -254,9 +242,6 @@ var PayloadConv = protoutils.Conv[*Payload, *pb.Payload]{ return &pb.Payload{ CreatedAt: TimeConv.Encode(p.p.CreatedAt), TotalGas: utils.Alloc(p.p.TotalGas), - EdgeCount: utils.Alloc(p.p.EdgeCount), - Coinbase: p.p.Coinbase, - Basefee: utils.Alloc(p.p.Basefee), Txs: p.p.Txs, } }, @@ -268,18 +253,9 @@ var PayloadConv = protoutils.Conv[*Payload, *pb.Payload]{ if p.TotalGas == nil { return nil, fmt.Errorf("TotalGas: missing") } - if p.EdgeCount == nil { - return nil, fmt.Errorf("EdgeCount: missing") - } - if p.Basefee == nil { - return nil, fmt.Errorf("Basefee: missing") - } return PayloadBuilder{ CreatedAt: createdAt, TotalGas: *p.TotalGas, - EdgeCount: *p.EdgeCount, - Coinbase: p.Coinbase, - Basefee: *p.Basefee, Txs: p.Txs, }.Build() }, diff --git a/sei-tendermint/internal/autobahn/types/testonly.go b/sei-tendermint/internal/autobahn/types/testonly.go index 11a126d659..5044fac52e 100644 --- a/sei-tendermint/internal/autobahn/types/testonly.go +++ b/sei-tendermint/internal/autobahn/types/testonly.go @@ -93,9 +93,6 @@ func GenPayload(rng utils.Rng) *Payload { return utils.OrPanic1(PayloadBuilder{ CreatedAt: utils.GenTimestamp(rng), TotalGas: rng.Uint64(), - EdgeCount: rng.Int63(), - Coinbase: utils.GenBytes(rng, 10), - Basefee: rng.Int63(), Txs: utils.GenSlice(rng, func(rng utils.Rng) []byte { return utils.GenBytes(rng, 10) }), }.Build()) } diff --git a/sei-tendermint/internal/p2p/giga/avail_test.go b/sei-tendermint/internal/p2p/giga/avail_test.go index 2141bf949f..4e614af244 100644 --- a/sei-tendermint/internal/p2p/giga/avail_test.go +++ b/sei-tendermint/internal/p2p/giga/avail_test.go @@ -38,15 +38,22 @@ func TestAvailClientServer(t *testing.T) { } s.SpawnBgNamed("fakeNode0", func() error { return utils.IgnoreCancel(fakeNode0.Run(ctx)) }) for range min(avail.BlocksPerLane, 4) { - b := utils.OrPanic1(fakeNode0.ProduceBlock(ctx, types.GenPayload(rng))) + a := fakeNode0.Avail() + b := utils.OrPanic1(a.ProduceBlock(a.NextBlock(a.PublicKey()), types.GenPayload(rng))) utils.OrPanic(nodes[2].consensus.Avail().PushBlock(ctx, b)) } t.Logf("Run block production") for _, node := range nodes { rng := rng.Split() s.Spawn(func() error { + a := node.consensus.Avail() + lane := a.PublicKey() for range totalBlocks { - if _, err := node.consensus.ProduceBlock(ctx, types.GenPayload(rng)); err != nil { + n := a.NextBlock(lane) + if err := a.WaitForCapacity(ctx, n); err != nil { + return fmt.Errorf("waitForCapacity(): %w", err) + } + if _, err := a.ProduceBlock(n, types.GenPayload(rng)); err != nil { return fmt.Errorf("produceBlock(): %w", err) } } diff --git a/sei-tendermint/internal/p2p/giga/consensus_test.go b/sei-tendermint/internal/p2p/giga/consensus_test.go index 1f431504a6..3a8ce052f3 100644 --- a/sei-tendermint/internal/p2p/giga/consensus_test.go +++ b/sei-tendermint/internal/p2p/giga/consensus_test.go @@ -27,7 +27,9 @@ func TestConsensusClientServer(t *testing.T) { for offset := range types.GlobalBlockNumber(20) { idx := firstBlock + offset t.Logf("[%v] Push a block.", idx) - b, err := nodes[rng.Intn(len(env.nodes))].consensus.ProduceBlock(ctx, types.GenPayload(rng)) + node := nodes[rng.Intn(len(env.nodes))] + a := node.consensus.Avail() + b, err := a.ProduceBlock(a.NextBlock(a.PublicKey()), types.GenPayload(rng)) if err != nil { return fmt.Errorf("ds.ProduceBlock(): %w", err) } diff --git a/sei-tendermint/internal/p2p/giga_router.go b/sei-tendermint/internal/p2p/giga_router.go index d04b4e0f01..3b15a941f4 100644 --- a/sei-tendermint/internal/p2p/giga_router.go +++ b/sei-tendermint/internal/p2p/giga_router.go @@ -16,7 +16,6 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/producer" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/giga" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/rpc" tmbytes "github.com/sei-protocol/sei-chain/sei-tendermint/libs/bytes" @@ -43,7 +42,6 @@ type GigaRouterConfig struct { ValidatorAddrs map[atypes.PublicKey]GigaNodeAddr Consensus *consensus.Config Producer *producer.Config - TxMempool *mempool.TxMempool GenDoc *types.GenesisDoc } @@ -105,7 +103,7 @@ func NewGigaRouter(cfg *GigaRouterConfig, key NodeSecretKey) (*GigaRouter, error if err != nil { return nil, fmt.Errorf("consensus.NewState(): %w", err) } - producerState := producer.NewState(cfg.Producer, cfg.TxMempool, consensusState) + producerState := producer.NewState(cfg.Producer, consensusState) logger.Info("GigaRouter initialized", "validators", len(cfg.ValidatorAddrs), "dial_interval", cfg.DialInterval) return &GigaRouter{ cfg: cfg, @@ -123,6 +121,16 @@ func NewGigaRouter(cfg *GigaRouterConfig, key NodeSecretKey) (*GigaRouter, error }, nil } +func (r *GigaRouter) InsertTx(ctx context.Context, tx types.Tx) (*abci.ResponseCheckTx, error) { + return r.producer.InsertTx(ctx, tx) +} + +// Mempool exposes Autobahn's producer-backed mempool surface to callers that +// need features not shared with CometBFT's TxMempool. +func (r *GigaRouter) Mempool() *producer.State { + return r.producer +} + // LastCommittedBlockNumber returns the highest global block number finalized // by consensus (derived from the latest CommitQC). When no CommitQC has been // recorded yet, atypes.GlobalRangeOpt returns the committee's empty default @@ -142,8 +150,8 @@ func (r *GigaRouter) LastCommittedBlockNumber() int64 { // ResultBlockResults.ConsensusParamUpdates under Autobahn (where // FinalizeBlock responses are not stored on disk) without reaching into // the unexported router.cfg. -func (r *GigaRouter) MaxGasPerBlock() int64 { - return r.cfg.Producer.MaxGasPerBlockI64() +func (r *GigaRouter) MaxGasPerBlock() uint64 { + return r.cfg.Producer.MaxGasPerBlock } // BlockByNumber returns the finalized global block at height n translated @@ -255,7 +263,7 @@ func (r *GigaRouter) translateGlobalBlock(gb *atypes.GlobalBlock) *coretypes.Res } func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (*abci.ResponseCommit, error) { - app := r.cfg.TxMempool.App() + app := r.cfg.Producer.App hash := b.Header.Hash() var proposerAddress types.Address if vals := app.GetValidators(); len(vals) > 0 { @@ -270,9 +278,6 @@ func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (* } // TODO: add metrics to understand execution latency. - r.cfg.TxMempool.Lock() - defer r.cfg.TxMempool.Unlock() - resp, err := app.FinalizeBlock(ctx, &abci.RequestFinalizeBlock{ Txs: b.Payload.Txs(), // Empty DecidedLastCommit does not indicate missing votes. @@ -293,37 +298,18 @@ func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (* if err != nil { return nil, fmt.Errorf("r.cfg.App.FinalizeBlock(): %w", err) } - if err := r.data.PushAppHash(ctx, b.GlobalNumber, resp.AppHash); err != nil { - return nil, fmt.Errorf("r.data.PushAppHash(%v): %w", b.GlobalNumber, err) - } commitResp, err := app.Commit(ctx) if err != nil { return nil, fmt.Errorf("r.cfg.App.Commit(): %w", err) } - blockTxs := make(types.Txs, len(b.Payload.Txs())) - for i, tx := range b.Payload.Txs() { - blockTxs[i] = tx - } - err = r.cfg.TxMempool.Update( - ctx, - int64(b.GlobalNumber), // nolint:gosec // autobahn block numbers fit in int64. - blockTxs, - resp.TxResults, - // TODO: We need the constraints to be fixed per epoch, because we don't know where the lane blocks will be sequenced. - // Therefore we disable constraints for now, until epochs are supported AND - // chain state understands that consensus parameters can change only at the epoch boundary. - mempool.NopTxConstraints(), - // recheck=false; see TxMempool.Update doc for why. - false, - ) - if err != nil { - return nil, fmt.Errorf("r.cfg.TxMempool.Update(%v): %w", b.GlobalNumber, err) + if err := r.data.PushAppHash(ctx, b.GlobalNumber, resp.AppHash); err != nil { + return nil, fmt.Errorf("r.data.PushAppHash(%v): %w", b.GlobalNumber, err) } return commitResp, nil } func (r *GigaRouter) runExecute(ctx context.Context) error { - app := r.cfg.TxMempool.App() + app := r.cfg.Producer.App info, err := app.Info(ctx, &version.RequestInfo) if err != nil { diff --git a/sei-tendermint/internal/p2p/giga_router_test.go b/sei-tendermint/internal/p2p/giga_router_test.go index 2c68c7f5b3..bb32f2ba2f 100644 --- a/sei-tendermint/internal/p2p/giga_router_test.go +++ b/sei-tendermint/internal/p2p/giga_router_test.go @@ -21,7 +21,6 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/producer" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -287,9 +286,6 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { e := Endpoint{AddrPort: cfg.addr} app := newTestApp() proxyApp := proxy.New(app, proxy.NopMetrics()) - // In giga mode the CometBFT handshaker is skipped; the router's - // runExecute calls InitChain itself on fresh start. - txMempool := mempool.NewTxMempool(mempool.TestConfig(), proxyApp, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) router, err := NewRouter( NopMetrics(), cfg.nodeKey, @@ -312,15 +308,13 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { PersistentStateDir: utils.None[string](), }, Producer: &producer.Config{ - MaxGasPerBlock: txGasUsed * maxTxsPerBlock, - MaxTxsPerBlock: maxTxsPerBlock, - MaxTxsPerSecond: utils.None[uint64](), - MempoolSize: 100, - BlockInterval: 100 * time.Millisecond, - AllowEmptyBlocks: false, + App: proxyApp, + MaxGasPerBlock: txGasUsed * maxTxsPerBlock, + MaxTxsPerBlock: maxTxsPerBlock, + MaxTxsPerSecond: utils.None[uint64](), + BlockInterval: 100 * time.Millisecond, }, - TxMempool: txMempool, - GenDoc: genDoc, + GenDoc: genDoc, }), }, ) @@ -335,8 +329,9 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { allTxs = append(allTxs, tx) } s.SpawnNamed(fmt.Sprintf("producer[%v]", i), func() error { - for _, payload := range txs { - if _, err := txMempool.CheckTx(ctx, payload); err != nil { + giga := router.Giga().OrPanic("non-giga router") + for _, tx := range txs { + if _, err := giga.InsertTx(ctx, tx); err != nil { return fmt.Errorf("txMempool.CheckTx(): %w", err) } } @@ -359,8 +354,7 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { // blocks have been finalized every node should report a non-zero // consensus-committed height through the new accessors used by /status. for i, r := range routers { - giga, ok := r.Giga().Get() - require.True(t, ok, "router[%v].Giga()", i) + giga := r.Giga().OrPanic("non-giga router") committed := giga.LastCommittedBlockNumber() require.Positive(t, committed, "router[%v].LastCommittedBlockNumber()", i) // Covers GigaRouter.BlockByNumber — the accessor used by the @@ -446,7 +440,6 @@ func TestGigaRouter_EvmProxy(t *testing.T) { } require.NoError(t, genDoc.ValidateAndComplete()) - txMempool := mempool.NewTxMempool(mempool.TestConfig(), proxy.New(newTestApp(), proxy.NopMetrics()), mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) router, err := NewGigaRouter(&GigaRouterConfig{ DialInterval: time.Second, ValidatorAddrs: addrs, @@ -456,15 +449,13 @@ func TestGigaRouter_EvmProxy(t *testing.T) { PersistentStateDir: utils.None[string](), }, Producer: &producer.Config{ - MaxGasPerBlock: 1, - MaxTxsPerBlock: 1, - MaxTxsPerSecond: utils.None[uint64](), - MempoolSize: 1, - BlockInterval: time.Second, - AllowEmptyBlocks: false, + App: proxy.New(newTestApp(), proxy.NopMetrics()), + MaxGasPerBlock: 1, + MaxTxsPerBlock: 1, + MaxTxsPerSecond: utils.None[uint64](), + BlockInterval: time.Second, }, - TxMempool: txMempool, - GenDoc: genDoc, + GenDoc: genDoc, }, nodeKeys[0]) require.NoError(t, err) diff --git a/sei-tendermint/internal/p2p/router_test.go b/sei-tendermint/internal/p2p/router_test.go index f833fe976b..04a7906559 100644 --- a/sei-tendermint/internal/p2p/router_test.go +++ b/sei-tendermint/internal/p2p/router_test.go @@ -19,7 +19,6 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/producer" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -326,7 +325,6 @@ func TestRouter_GigaSetWhenConfigured(t *testing.T) { // Use intentionally non-default values to ensure config actually propagates. opts := makeRouterOptions() proxyApp := proxy.New(abci.BaseApplication{}, proxy.NopMetrics()) - txMempool := mempool.NewTxMempool(mempool.TestConfig(), proxyApp, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) opts.Giga = utils.Some(&GigaRouterConfig{ DialInterval: 7 * time.Second, ValidatorAddrs: validatorAddrs, @@ -336,14 +334,12 @@ func TestRouter_GigaSetWhenConfigured(t *testing.T) { PersistentStateDir: utils.None[string](), }, Producer: &producer.Config{ - MaxGasPerBlock: 77_000_000, - MaxTxsPerBlock: 7_777, - MaxTxsPerSecond: utils.Some(uint64(999)), - MempoolSize: 3_333, - BlockInterval: 777 * time.Millisecond, - AllowEmptyBlocks: true, + App: proxyApp, + MaxGasPerBlock: 77_000_000, + MaxTxsPerBlock: 7_777, + MaxTxsPerSecond: utils.Some(uint64(999)), + BlockInterval: 777 * time.Millisecond, }, - TxMempool: txMempool, GenDoc: &types.GenesisDoc{ ChainID: "giga-e2e-test", InitialHeight: 42, @@ -375,9 +371,7 @@ func TestRouter_GigaSetWhenConfigured(t *testing.T) { maxTps, tpsOk := giga.cfg.Producer.MaxTxsPerSecond.Get() require.True(t, tpsOk) require.Equal(t, uint64(999), maxTps) - require.Equal(t, uint64(3_333), giga.cfg.Producer.MempoolSize) require.Equal(t, 777*time.Millisecond, giga.cfg.Producer.BlockInterval) - require.True(t, giga.cfg.Producer.AllowEmptyBlocks) // Verify genesis doc. require.Equal(t, "giga-e2e-test", giga.cfg.GenDoc.ChainID) diff --git a/sei-tendermint/internal/rpc/core/blocks.go b/sei-tendermint/internal/rpc/core/blocks.go index ebf24a99c2..14e77a9878 100644 --- a/sei-tendermint/internal/rpc/core/blocks.go +++ b/sei-tendermint/internal/rpc/core/blocks.go @@ -269,7 +269,9 @@ func (env *Environment) BlockResults(ctx context.Context, req *coretypes.Request return &coretypes.ResultBlockResults{ Height: height, ConsensusParamUpdates: &tmproto.ConsensusParams{ - Block: &tmproto.BlockParams{MaxGas: giga.MaxGasPerBlock()}, + Block: &tmproto.BlockParams{ + MaxGas: utils.Clamp[int64](giga.MaxGasPerBlock()), + }, }, }, nil } diff --git a/sei-tendermint/internal/rpc/core/consensus.go b/sei-tendermint/internal/rpc/core/consensus.go index 6958766883..1c5f4c171f 100644 --- a/sei-tendermint/internal/rpc/core/consensus.go +++ b/sei-tendermint/internal/rpc/core/consensus.go @@ -91,13 +91,21 @@ func (env *Environment) Validators(ctx context.Context, req *coretypes.RequestVa // More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state func (env *Environment) DumpConsensusState(ctx context.Context) (*coretypes.ResultDumpConsensusState, error) { // Get Peer consensus states. + reactor, err := env.requireConsensusReactor() + if err != nil { + return nil, err + } + consensusState, err := env.requireConsensusState() + if err != nil { + return nil, err + } peerStates := map[types.NodeID]coretypes.PeerStateInfo{} for _, info := range env.Router.ConnInfos() { if _, ok := peerStates[info.ID]; ok { continue } - peerState, ok := env.ConsensusReactor.GetPeerState(info.ID) + peerState, ok := reactor.GetPeerState(info.ID) if !ok { continue } @@ -122,7 +130,7 @@ func (env *Environment) DumpConsensusState(ctx context.Context) (*coretypes.Resu } // Get self round state. - roundState, err := env.ConsensusState.GetRoundStateJSON() + roundState, err := consensusState.GetRoundStateJSON() if err != nil { return nil, err } @@ -136,8 +144,12 @@ func (env *Environment) DumpConsensusState(ctx context.Context) (*coretypes.Resu // UNSTABLE // More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state func (env *Environment) GetConsensusState(ctx context.Context) (*coretypes.ResultConsensusState, error) { + consensusState, err := env.requireConsensusState() + if err != nil { + return nil, err + } // Get self round state. - bz, err := env.ConsensusState.GetRoundStateSimpleJSON() + bz, err := consensusState.GetRoundStateSimpleJSON() return &coretypes.ResultConsensusState{RoundState: bz}, err } diff --git a/sei-tendermint/internal/rpc/core/dev.go b/sei-tendermint/internal/rpc/core/dev.go index 8f90497bda..374c2142f2 100644 --- a/sei-tendermint/internal/rpc/core/dev.go +++ b/sei-tendermint/internal/rpc/core/dev.go @@ -2,12 +2,20 @@ package core import ( "context" + "errors" "github.com/sei-protocol/sei-chain/sei-tendermint/rpc/coretypes" ) // UnsafeFlushMempool removes all transactions from the mempool. func (env *Environment) UnsafeFlushMempool(ctx context.Context) (*coretypes.ResultUnsafeFlushMempool, error) { - env.Mempool.Flush() + if _, ok := env.gigaRouter().Get(); ok { + return nil, errors.New("unsafe_flush_mempool is not supported with autobahn mempool") + } + mp, err := env.requireMempool() + if err != nil { + return nil, err + } + mp.Flush() return &coretypes.ResultUnsafeFlushMempool{}, nil } diff --git a/sei-tendermint/internal/rpc/core/env.go b/sei-tendermint/internal/rpc/core/env.go index 972ba9110d..f539cb7d0b 100644 --- a/sei-tendermint/internal/rpc/core/env.go +++ b/sei-tendermint/internal/rpc/core/env.go @@ -52,7 +52,7 @@ var logger = seilog.NewLogger("tendermint", "internal", "rpc", "core") //---------------------------------------------- // These interfaces are used by RPC and must be thread safe -type consensusState interface { +type ConsensusState interface { GetState() sm.State GetValidators() (int64, []*types.Validator) GetLastHeight() int64 @@ -70,10 +70,10 @@ type Environment struct { // interfaces defined in types and above StateStore sm.Store BlockStore sm.BlockStore - EvidencePool sm.EvidencePool - ConsensusState consensusState - ConsensusReactor *consensus.Reactor - BlockSyncReactor *blocksync.Reactor + EvidencePool utils.Option[sm.EvidencePool] + ConsensusState utils.Option[ConsensusState] + ConsensusReactor utils.Option[*consensus.Reactor] + BlockSyncReactor utils.Option[*blocksync.Reactor] IsListening bool Listeners []string @@ -86,9 +86,9 @@ type Environment struct { GenDoc *types.GenesisDoc // cache the genesis structure EventSinks []indexer.EventSink EventBus *eventbus.EventBus // thread safe - EventLog *eventlog.Log - Mempool *mempool.TxMempool - StateSyncReactor *statesync.Reactor + EventLog utils.Option[*eventlog.Log] + Mempool utils.Option[*mempool.TxMempool] + StateSyncReactor utils.Option[statesync.Reactor] Config config.RPCConfig @@ -211,10 +211,10 @@ func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64, } func (env *Environment) latestUncommittedHeight() int64 { - if env.ConsensusReactor != nil { + if reactor, ok := env.ConsensusReactor.Get(); ok { // consensus reactor can be nil in inspect mode. - nodeIsSyncing := env.ConsensusReactor.WaitSync() + nodeIsSyncing := reactor.WaitSync() if nodeIsSyncing { return env.BlockStore.Height() } @@ -222,6 +222,41 @@ func (env *Environment) latestUncommittedHeight() int64 { return env.BlockStore.Height() + 1 } +func (env *Environment) requireMempool() (*mempool.TxMempool, error) { + if mp, ok := env.Mempool.Get(); ok { + return mp, nil + } + return nil, fmt.Errorf("mempool is not available") +} + +func (env *Environment) requireEventLog() (*eventlog.Log, error) { + if lg, ok := env.EventLog.Get(); ok { + return lg, nil + } + return nil, fmt.Errorf("event log is not enabled") +} + +func (env *Environment) requireEvidencePool() (sm.EvidencePool, error) { + if pool, ok := env.EvidencePool.Get(); ok { + return pool, nil + } + return nil, fmt.Errorf("evidence pool is not available") +} + +func (env *Environment) requireConsensusState() (ConsensusState, error) { + if state, ok := env.ConsensusState.Get(); ok { + return state, nil + } + return nil, fmt.Errorf("consensus state is not available") +} + +func (env *Environment) requireConsensusReactor() (*consensus.Reactor, error) { + if reactor, ok := env.ConsensusReactor.Get(); ok { + return reactor, nil + } + return nil, fmt.Errorf("consensus reactor is not available") +} + // StartService constructs and starts listeners for the RPC service // according to the config object, returning an error if the service // cannot be constructed or started. The listeners, which provide @@ -258,7 +293,7 @@ func (env *Environment) StartService(ctx context.Context, conf *config.Config) ( // If the event log is enabled, subscribe to all events published to the // event bus, and forward them to the event log. - if lg := env.EventLog; lg != nil { + if lg, ok := env.EventLog.Get(); ok { // TODO(creachadair): This is kind of a hack, ideally we'd share the // observer with the indexer, but it's tricky to plumb them together. // For now, use a "normal" subscription with a big buffer allowance. diff --git a/sei-tendermint/internal/rpc/core/events.go b/sei-tendermint/internal/rpc/core/events.go index 4323581e2e..3cf83d301a 100644 --- a/sei-tendermint/internal/rpc/core/events.go +++ b/sei-tendermint/internal/rpc/core/events.go @@ -149,8 +149,9 @@ func (env *Environment) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUn // of maxItems and waitTime may be capped to sensible internal maxima without // reporting an error to the caller. func (env *Environment) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { - if env.EventLog == nil { - return nil, errors.New("the event log is not enabled") + eventLog, err := env.requireEventLog() + if err != nil { + return nil, err } // Parse and validate parameters. @@ -190,7 +191,6 @@ func (env *Environment) Events(ctx context.Context, req *coretypes.RequestEvents var info eventlog.Info var items []*eventlog.Item - var err error accept := func(itm *eventlog.Item) error { // N.B. We accept up to one item more than requested, so we can tell how // to set the "more" flag in the response. @@ -211,7 +211,7 @@ func (env *Environment) Events(ctx context.Context, req *coretypes.RequestEvents // and we want to keep waiting until we have relevant results (or time out). cur := after for len(items) == 0 { - info, err = env.EventLog.WaitScan(ctx, cur, accept) + info, err = eventLog.WaitScan(ctx, cur, accept) if err != nil { // Don't report a timeout as a request failure. if errors.Is(err, context.DeadlineExceeded) { @@ -223,7 +223,7 @@ func (env *Environment) Events(ctx context.Context, req *coretypes.RequestEvents } } else { // Quick poll, return only what is already available. - info, err = env.EventLog.Scan(accept) + info, err = eventLog.Scan(accept) } if err != nil { return nil, err diff --git a/sei-tendermint/internal/rpc/core/evidence.go b/sei-tendermint/internal/rpc/core/evidence.go index 78032599b1..66015b110f 100644 --- a/sei-tendermint/internal/rpc/core/evidence.go +++ b/sei-tendermint/internal/rpc/core/evidence.go @@ -16,7 +16,11 @@ func (env *Environment) BroadcastEvidence(ctx context.Context, req *coretypes.Re if err := req.Evidence.ValidateBasic(); err != nil { return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err) } - if err := env.EvidencePool.AddEvidence(ctx, req.Evidence); err != nil { + pool, err := env.requireEvidencePool() + if err != nil { + return nil, err + } + if err := pool.AddEvidence(ctx, req.Evidence); err != nil { return nil, fmt.Errorf("failed to add evidence: %w", err) } return &coretypes.ResultBroadcastEvidence{Hash: req.Evidence.Hash()}, nil diff --git a/sei-tendermint/internal/rpc/core/lag_status.go b/sei-tendermint/internal/rpc/core/lag_status.go index 7d57ba1b18..8761bb3095 100644 --- a/sei-tendermint/internal/rpc/core/lag_status.go +++ b/sei-tendermint/internal/rpc/core/lag_status.go @@ -9,7 +9,10 @@ import ( // LagStatus returns Tendermint lag status, if lag is over a certain threshold func (env *Environment) LagStatus(ctx context.Context) (*coretypes.ResultLagStatus, error) { currentHeight := env.BlockStore.Height() - maxPeerBlockHeight := env.BlockSyncReactor.GetMaxPeerBlockHeight() + maxPeerBlockHeight := int64(0) + if reactor, ok := env.BlockSyncReactor.Get(); ok { + maxPeerBlockHeight = reactor.GetMaxPeerBlockHeight() + } lag := int64(0) // Calculate lag diff --git a/sei-tendermint/internal/rpc/core/mempool.go b/sei-tendermint/internal/rpc/core/mempool.go index b61cdae14c..78b15e6606 100644 --- a/sei-tendermint/internal/rpc/core/mempool.go +++ b/sei-tendermint/internal/rpc/core/mempool.go @@ -15,6 +15,7 @@ import ( tmmath "github.com/sei-protocol/sei-chain/sei-tendermint/libs/math" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/rpc/coretypes" + "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) // EvmProxy returns the EVM RPC URL of the autobahn validator that owns the @@ -36,7 +37,15 @@ func (env *Environment) EvmProxy(sender common.Address) (*url.URL, bool) { // https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async // Deprecated and should be removed in 0.37 func (env *Environment) BroadcastTxAsync(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) { - go func() { _, _ = env.Mempool.CheckTx(ctx, req.Tx) }() + if giga, ok := env.gigaRouter().Get(); ok { + go func() { _, _ = giga.Mempool().InsertTx(ctx, req.Tx) }() + return &coretypes.ResultBroadcastTx{Hash: req.Tx.Hash().Bytes()}, nil + } + mp, err := env.requireMempool() + if err != nil { + return nil, err + } + go func() { _, _ = mp.CheckTx(ctx, req.Tx) }() return &coretypes.ResultBroadcastTx{Hash: req.Tx.Hash().Bytes()}, nil } @@ -50,7 +59,24 @@ func (env *Environment) BroadcastTxSync(ctx context.Context, req *coretypes.Requ // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func (env *Environment) BroadcastTx(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) { - r, err := env.Mempool.CheckTx(ctx, req.Tx) + if giga, ok := env.gigaRouter().Get(); ok { + r, err := giga.Mempool().InsertTx(ctx, req.Tx) + if err != nil { + return nil, err + } + return &coretypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Codespace: r.Codespace, + Hash: req.Tx.Hash().Bytes(), + Log: r.Log, + }, nil + } + mp, err := env.requireMempool() + if err != nil { + return nil, err + } + r, err := mp.CheckTx(ctx, req.Tx) if err != nil { return nil, err } @@ -66,10 +92,31 @@ func (env *Environment) BroadcastTx(ctx context.Context, req *coretypes.RequestB // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTxCommit, error) { - r, err := env.Mempool.CheckTx(ctx, req.Tx) + if giga, ok := env.gigaRouter().Get(); ok { + r, err := giga.Mempool().InsertTx(ctx, req.Tx) + if err != nil { + return nil, err + } + if r.Code != abci.CodeTypeOK { + return &coretypes.ResultBroadcastTxCommit{ + CheckTx: *r, + Hash: req.Tx.Hash().Bytes(), + }, nil + } + return env.broadcastTxCommitFromCheckTx(ctx, req, r) + } + mp, err := env.requireMempool() + if err != nil { + return nil, err + } + r, err := mp.CheckTx(ctx, req.Tx) if err != nil { return nil, err } + return env.broadcastTxCommitFromCheckTx(ctx, req, r) +} + +func (env *Environment) broadcastTxCommitFromCheckTx(ctx context.Context, req *coretypes.RequestBroadcastTx, r *abci.ResponseCheckTx) (*coretypes.ResultBroadcastTxCommit, error) { if r.Code != abci.CodeTypeOK { return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, @@ -96,7 +143,7 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.Re case <-ctx.Done(): logger.Error("error on broadcastTxCommit", "duration", time.Since(startAt), - "err", err) + "err", ctx.Err()) return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, Hash: req.Tx.Hash().Bytes(), @@ -127,16 +174,45 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.Re // UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error) { - totalCount := env.Mempool.Size() + if giga, ok := env.gigaRouter().Get(); ok { + // NOTE: this pagination seems to be useless, given that the mempool content is + // constantly changing and we don't have any snapshot marker in the request. + rawTxs := giga.Mempool().UnconfirmedTxs() + perPage := env.validatePerPage(req.PerPage.IntPtr()) + page, err := validatePage(req.Page.IntPtr(), perPage, len(rawTxs)) + if err != nil { + return nil, err + } + skipCount := validateSkipCount(page, perPage) + + sizeBytes := 0 + for _, tx := range rawTxs { + sizeBytes += len(tx) + } + var txs types.Txs + for _, tx := range rawTxs[skipCount:min(skipCount+perPage, len(rawTxs))] { + txs = append(txs, tx) + } + return &coretypes.ResultUnconfirmedTxs{ + Count: len(txs), + Total: len(rawTxs), + TotalBytes: int64(sizeBytes), + Txs: txs, + }, nil + } + mp, err := env.requireMempool() + if err != nil { + return nil, err + } + totalCount := mp.Size() perPage := env.validatePerPage(req.PerPage.IntPtr()) page, err := validatePage(req.Page.IntPtr(), perPage, totalCount) if err != nil { return nil, err } - skipCount := validateSkipCount(page, perPage) - txs, _ := env.Mempool.ReapTxs(mempool.ReapLimits{ + txs, _ := mp.ReapTxs(mempool.ReapLimits{ MaxTxs: utils.Some(uint64(skipCount + tmmath.MinInt(perPage, totalCount-skipCount))), //nolint:gosec // guaranteed to be non-negative }, false) if skipCount > len(txs) { @@ -147,7 +223,7 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque return &coretypes.ResultUnconfirmedTxs{ Count: len(result), Total: totalCount, - TotalBytes: utils.Clamp[int64](env.Mempool.SizeBytes()), + TotalBytes: utils.Clamp[int64](mp.SizeBytes()), Txs: result, }, nil } @@ -155,10 +231,26 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque // NumUnconfirmedTxs gets number of unconfirmed transactions. // More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { + if giga, ok := env.gigaRouter().Get(); ok { + rawTxs := giga.Mempool().UnconfirmedTxs() + sizeBytes := 0 + for _, tx := range rawTxs { + sizeBytes += len(tx) + } + return &coretypes.ResultUnconfirmedTxs{ + Count: len(rawTxs), + Total: len(rawTxs), + TotalBytes: int64(sizeBytes), + }, nil + } + mp, err := env.requireMempool() + if err != nil { + return nil, err + } return &coretypes.ResultUnconfirmedTxs{ - Count: env.Mempool.Size(), - Total: env.Mempool.Size(), - TotalBytes: utils.Clamp[int64](env.Mempool.SizeBytes()), + Count: mp.Size(), + Total: mp.Size(), + TotalBytes: utils.Clamp[int64](mp.SizeBytes()), }, nil } diff --git a/sei-tendermint/internal/rpc/core/status.go b/sei-tendermint/internal/rpc/core/status.go index 5fc69b0ea3..712f110ede 100644 --- a/sei-tendermint/internal/rpc/core/status.go +++ b/sei-tendermint/internal/rpc/core/status.go @@ -131,24 +131,24 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er ValidatorInfo: validatorInfo, } - if env.ConsensusReactor != nil { - result.SyncInfo.CatchingUp = env.ConsensusReactor.WaitSync() + if reactor, ok := env.ConsensusReactor.Get(); ok { + result.SyncInfo.CatchingUp = reactor.WaitSync() } - if env.BlockSyncReactor != nil { - result.SyncInfo.MaxPeerBlockHeight = env.BlockSyncReactor.GetMaxPeerBlockHeight() - result.SyncInfo.TotalSyncedTime = env.BlockSyncReactor.GetTotalSyncedTime() - result.SyncInfo.RemainingTime = env.BlockSyncReactor.GetRemainingSyncTime() + if reactor, ok := env.BlockSyncReactor.Get(); ok { + result.SyncInfo.MaxPeerBlockHeight = reactor.GetMaxPeerBlockHeight() + result.SyncInfo.TotalSyncedTime = reactor.GetTotalSyncedTime() + result.SyncInfo.RemainingTime = reactor.GetRemainingSyncTime() } - if env.StateSyncReactor != nil { - result.SyncInfo.TotalSnapshots = env.StateSyncReactor.TotalSnapshots() - result.SyncInfo.ChunkProcessAvgTime = env.StateSyncReactor.ChunkProcessAvgTime() - result.SyncInfo.SnapshotHeight = env.StateSyncReactor.SnapshotHeight() - result.SyncInfo.SnapshotChunksCount = env.StateSyncReactor.SnapshotChunksCount() - result.SyncInfo.SnapshotChunksTotal = env.StateSyncReactor.SnapshotChunksTotal() - result.SyncInfo.BackFilledBlocks = env.StateSyncReactor.BackFilledBlocks() - result.SyncInfo.BackFillBlocksTotal = env.StateSyncReactor.BackFillBlocksTotal() + if reactor, ok := env.StateSyncReactor.Get(); ok { + result.SyncInfo.TotalSnapshots = reactor.TotalSnapshots() + result.SyncInfo.ChunkProcessAvgTime = reactor.ChunkProcessAvgTime() + result.SyncInfo.SnapshotHeight = reactor.SnapshotHeight() + result.SyncInfo.SnapshotChunksCount = reactor.SnapshotChunksCount() + result.SyncInfo.SnapshotChunksTotal = reactor.SnapshotChunksTotal() + result.SyncInfo.BackFilledBlocks = reactor.BackFilledBlocks() + result.SyncInfo.BackFillBlocksTotal = reactor.BackFillBlocksTotal() } return result, nil @@ -164,17 +164,14 @@ func (env *Environment) validatorAtHeight(h int64) utils.Option[*types.Validator if err != nil { return none } - if env.ConsensusState == nil { - return none - } privValAddress := k.Address() // Skip the in-memory consensus-state lookup under Autobahn: the CometBFT // consensus State is never advanced, so GetValidators would nil-deref // on an unpopulated validator set. The state-store lookup below is kept // in sync under both engines. - if !env.gigaRouter().IsPresent() { - lastBlockHeight, vals := env.ConsensusState.GetValidators() + if consensusState, ok := env.ConsensusState.Get(); ok && !env.gigaRouter().IsPresent() { + lastBlockHeight, vals := consensusState.GetValidators() if lastBlockHeight == h { for _, val := range vals { if bytes.Equal(val.Address, privValAddress) { diff --git a/sei-tendermint/node/node.go b/sei-tendermint/node/node.go index 761cccd68d..b358a29120 100644 --- a/sei-tendermint/node/node.go +++ b/sei-tendermint/node/node.go @@ -58,7 +58,7 @@ type nodeImpl struct { // network router *p2p.Router - ServiceRestartCh chan []string + ServiceRestartCh utils.Option[chan []string] nodeInfo types.NodeInfo nodeKey types.NodeKey // our node privkey @@ -67,14 +67,14 @@ type nodeImpl struct { initialState sm.State stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk - mempool *mempool.TxMempool - evPool *evidence.Pool + mempool utils.Option[*mempool.TxMempool] + evPool utils.Option[*evidence.Pool] indexerService *indexer.Service services []service.Service rpcListeners []net.Listener // rpc servers shutdownOps closer rpcEnv *rpccore.Environment - prometheusSrv *http.Server + prometheusSrv utils.Option[*http.Server] } // makeNode returns a new, ready to go, Tendermint Node. @@ -158,6 +158,10 @@ func makeNode( } pubKey = utils.Some(key) } + eventLogOpt := utils.None[*eventlog.Log]() + if eventLog != nil { + eventLogOpt = utils.Some(eventLog) + } // TODO construct node here: node := &nodeImpl{ config: cfg, @@ -183,7 +187,7 @@ func makeNode( GenDoc: genDoc, EventSinks: eventSinks, EventBus: eventBus, - EventLog: eventLog, + EventLog: eventLogOpt, Config: *cfg.RPC, }, } @@ -193,14 +197,13 @@ func makeNode( return nil, fmt.Errorf("autobahn does not support remote validator signers (priv-validator.laddr is set)") } gigaEnabled := cfg.AutobahnConfigFile != "" - mp := mempool.NewTxMempool(cfg.Mempool.ToMempoolConfig(), proxyApp, nodeMetrics.mempool, sm.TxConstraintsFetcherFromStore(stateStore)) router, peerCloser, err := createRouter( nodeMetrics.p2p, node.NodeInfo, nodeKey, utils.Some(atypes.SecretKeyFromED25519(filePrivval.Key.PrivKey)), cfg, - utils.Some(mp), + utils.Some(proxyApp), genDoc, dbProvider, ) @@ -209,20 +212,8 @@ func makeNode( return nil, fmt.Errorf("failed to create router: %w", err) } node.router = router - node.mempool = mp node.rpcEnv.Router = router - // Mempool gossiping is not compatible with Giga, - // so we disable the mempool reactor. - if !gigaEnabled { - mpReactor, err := mempoolreactor.NewReactor(cfg.Mempool, mp, router) - if err != nil { - return nil, fmt.Errorf("mempoolreactor.NewReactor(): %w", err) - } - mpReactor.MarkReadyToStart() - node.services = append(node.services, mpReactor) - } - evReactor, evPool, edbCloser, err := createEvidenceReactor(cfg, dbProvider, stateStore, blockStore, node.router, nodeMetrics.evidence, eventBus) closers = append(closers, edbCloser) @@ -230,68 +221,77 @@ func makeNode( return nil, fmt.Errorf("createEvidenceReactor(): %w", err) } node.services = append(node.services, evReactor) - node.rpcEnv.EvidencePool = evPool - node.evPool = evPool - - node.rpcEnv.Mempool = mp - - // make block executor for consensus and blockchain reactors to execute blocks - blockExec := sm.NewBlockExecutor( - stateStore, - proxyApp, - mp, - evPool, - blockStore, - eventBus, - nodeMetrics.state, - consensusPolicy, - ) + node.rpcEnv.EvidencePool = utils.Some[sm.EvidencePool](evPool) + node.evPool = utils.Some(evPool) - // Determine whether we should attempt state sync. - stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) - if stateSync && state.LastBlockHeight > 0 { - logger.Info("Found local state with non-zero height, skipping state sync") - stateSync = false + if cfg.P2P.PexReactor { + pxReactor, err := pex.NewReactor( + node.router, + pex.DefaultSendInterval, + ) + if err != nil { + return nil, fmt.Errorf("pex.NewReactor(): %w", err) + } + node.services = append(node.services, pxReactor) } - // Determine whether we should do block sync. This must happen after the handshake, since the - // app may modify the validator set, specifying ourself as the only validator. - blockSync := !onlyValidatorIsUs(state, pubKey) - if gigaEnabled { - // TODO(autobahn-recovery): handles only restart with local disk intact. - // A node that lost its WAL + app CMS (new validator, disk wipe) needs both - // app state sync and an autobahn WAL sync to catch up. Not yet supported. - stateSync = false - blockSync = false - } - waitSync := stateSync || blockSync + if !gigaEnabled { + mp := mempool.NewTxMempool(cfg.Mempool.ToMempoolConfig(), proxyApp, nodeMetrics.mempool, sm.TxConstraintsFetcherFromStore(stateStore)) + node.mempool = utils.Some(mp) + node.rpcEnv.Mempool = utils.Some(mp) + mpReactor, err := mempoolreactor.NewReactor(cfg.Mempool, mp, router) + if err != nil { + return nil, fmt.Errorf("mempoolreactor.NewReactor(): %w", err) + } + mpReactor.MarkReadyToStart() + node.services = append(node.services, mpReactor) - consensusWAL, err := consensus.OpenWAL(cfg.Consensus.WalFile()) - if err != nil { - return nil, fmt.Errorf("consensus.OpenWAL(): %w", err) - } - closers = append(closers, func() error { - consensusWAL.Close() - return nil - }) + // make block executor for consensus and blockchain reactors to execute blocks + blockExec := sm.NewBlockExecutor( + stateStore, + proxyApp, + mp, + evPool, + blockStore, + eventBus, + nodeMetrics.state, + consensusPolicy, + ) - csState := consensus.NewState( - cfg.Consensus, - consensusWAL, - stateStore, - blockExec, - blockStore, - mp, - evPool, - eventBus, - tracerProviderOptions, - nodeMetrics.consensus, - ) - node.rpcEnv.ConsensusState = csState + // Determine whether we should attempt state sync. + stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) + if stateSync && state.LastBlockHeight > 0 { + logger.Info("Found local state with non-zero height, skipping state sync") + stateSync = false + } + // Determine whether we should do block sync. This must happen after the handshake, since the + // app may modify the validator set, specifying ourself as the only validator. + blockSync := !onlyValidatorIsUs(state, pubKey) + waitSync := stateSync || blockSync - var csReactor *consensus.Reactor - if !gigaEnabled { - csReactor, err = consensus.NewReactor( + consensusWAL, err := consensus.OpenWAL(cfg.Consensus.WalFile()) + if err != nil { + return nil, fmt.Errorf("consensus.OpenWAL(): %w", err) + } + closers = append(closers, func() error { + consensusWAL.Close() + return nil + }) + csState := consensus.NewState( + cfg.Consensus, + consensusWAL, + stateStore, + blockExec, + blockStore, + mp, + evPool, + eventBus, + tracerProviderOptions, + nodeMetrics.consensus, + ) + node.rpcEnv.ConsensusState = utils.Some[rpccore.ConsensusState](csState) + + csReactor, err := consensus.NewReactor( csState, node.router, eventBus, @@ -304,78 +304,66 @@ func makeNode( } node.services = append(node.services, csReactor) - node.rpcEnv.ConsensusReactor = csReactor - } - - // Create the blockchain reactor. Note, we do not start block sync if we're - // doing a state sync first. - bcReactor, err := blocksync.NewReactor( - stateStore, - blockStore, - node.router, - utils.Some(blocksync.SyncerConfig{ - BlockExec: blockExec, - ConsReactor: csReactor, - BlockSync: blockSync && !stateSync, - Metrics: nodeMetrics.consensus, - EventBus: eventBus, - RestartEvent: restartEvent, - SelfRemediationConfig: cfg.SelfRemediation, - }), - ) - if err != nil { - return nil, fmt.Errorf("blocksync.NewReactor(): %w", err) - } - node.services = append(node.services, bcReactor) - node.rpcEnv.BlockSyncReactor = bcReactor - - // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. - // FIXME We need to update metrics here, since other reactors don't have access to them. - if stateSync { - nodeMetrics.consensus.StateSyncing.Set(1) - } else if blockSync { - nodeMetrics.consensus.BlockSyncing.Set(1) - } + node.rpcEnv.ConsensusReactor = utils.Some(csReactor) - if cfg.P2P.PexReactor { - pxReactor, err := pex.NewReactor( + // Create the blockchain reactor. Note, we do not start block sync if we're + // doing a state sync first. + bcReactor, err := blocksync.NewReactor( + stateStore, + blockStore, node.router, - pex.DefaultSendInterval, + utils.Some(blocksync.SyncerConfig{ + BlockExec: blockExec, + ConsReactor: csReactor, + BlockSync: blockSync && !stateSync, + Metrics: nodeMetrics.consensus, + EventBus: eventBus, + RestartEvent: restartEvent, + SelfRemediationConfig: cfg.SelfRemediation, + }), ) if err != nil { - return nil, fmt.Errorf("pex.NewReactor(): %w", err) + return nil, fmt.Errorf("blocksync.NewReactor(): %w", err) + } + node.services = append(node.services, bcReactor) + node.rpcEnv.BlockSyncReactor = utils.Some(bcReactor) + + // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. + // FIXME We need to update metrics here, since other reactors don't have access to them. + if stateSync { + nodeMetrics.consensus.StateSyncing.Set(1) + } else if blockSync { + nodeMetrics.consensus.BlockSyncing.Set(1) } - node.services = append(node.services, pxReactor) - } - postSyncHook := func(ctx context.Context, state sm.State) error { - csReactor.SetStateSyncingMetrics(0) + postSyncHook := func(ctx context.Context, state sm.State) error { + csReactor.SetStateSyncingMetrics(0) + + // TODO: Some form of orchestrator is needed here between the state + // advancing reactors to be able to control which one of the three + // is running + // FIXME Very ugly to have these metrics bleed through here. + csReactor.SetBlockSyncingMetrics(1) + if err := bcReactor.SwitchToBlockSync(state); err != nil { + logger.Error("failed to switch to block sync", "err", err) + return err + } - // TODO: Some form of orchestrator is needed here between the state - // advancing reactors to be able to control which one of the three - // is running - // FIXME Very ugly to have these metrics bleed through here. - csReactor.SetBlockSyncingMetrics(1) - if err := bcReactor.SwitchToBlockSync(state); err != nil { - logger.Error("failed to switch to block sync", "err", err) - return err + return nil } - return nil - } - // Set up state sync reactor, and schedule a sync if requested. - // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, - // we should clean this whole thing up. See: - // https://github.com/tendermint/tendermint/issues/4644 - // The CometBFT handshaker reconciles the block store and state store with the app - // by replaying blocks and calling InitChain at genesis. Autobahn (giga) maintains - // its own data WAL and does not update the CometBFT block/state stores, so on - // restart the handshaker would observe storeHeight=0 < appHeight=N and fail with - // ErrAppBlockHeightTooHigh. We skip the handshaker in giga mode; instead the - // giga router's runExecute owns InitChain on fresh start (appHeight==0) and - // relies on the app's committed CMS to rebuild deliverState on restart. - node.shouldHandshake = !stateSync && !gigaEnabled - if !gigaEnabled { + // Set up state sync reactor, and schedule a sync if requested. + // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, + // we should clean this whole thing up. See: + // https://github.com/tendermint/tendermint/issues/4644 + // The CometBFT handshaker reconciles the block store and state store with the app + // by replaying blocks and calling InitChain at genesis. Autobahn (giga) maintains + // its own data WAL and does not update the CometBFT block/state stores, so on + // restart the handshaker would observe storeHeight=0 < appHeight=N and fail with + // ErrAppBlockHeightTooHigh. We skip the handshaker in giga mode; instead the + // giga router's runExecute owns InitChain on fresh start (appHeight==0) and + // relies on the app's committed CMS to rebuild deliverState on restart. + node.shouldHandshake = !stateSync ssReactor, err := statesync.NewReactor( genDoc.ChainID, genDoc.InitialHeight, @@ -397,13 +385,26 @@ func makeNode( return nil, fmt.Errorf("statesync.NewReactor(): %w", err) } node.services = append(node.services, ssReactor) - } - if cfg.Mode == config.ModeValidator { - if privValidator != nil { - csState.SetPrivValidator(ctx, utils.Some(privValidator)) + if cfg.Mode == config.ModeValidator { + if privValidator != nil { + csState.SetPrivValidator(ctx, utils.Some(privValidator)) + } + } + } else { + bcReactor, err := blocksync.NewReactor( + stateStore, + blockStore, + node.router, + utils.None[blocksync.SyncerConfig](), + ) + if err != nil { + return nil, fmt.Errorf("blocksync.NewReactor(): %w", err) } + node.rpcEnv.BlockSyncReactor = utils.Some(bcReactor) + node.services = append(node.services, bcReactor) } + node.rpcEnv.PubKey = pubKey node.BaseService = *service.NewBaseService("Node", node) @@ -502,12 +503,14 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { if err != nil { return err } - if err := n.evPool.Start(state); err != nil { - return err + if evPool, ok := n.evPool.Get(); ok { + if err := evPool.Start(state); err != nil { + return err + } } if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { - n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr) + n.prometheusSrv = utils.Some(n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr)) } // Start the transport. @@ -515,7 +518,9 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return err } n.rpcEnv.IsListening = true - n.SpawnCritical("mempool", n.mempool.Run) + if m, ok := n.mempool.Get(); ok { + n.SpawnCritical("mempool", m.Run) + } for _, reactor := range n.services { if err := reactor.Start(ctx); err != nil { @@ -567,12 +572,11 @@ func (n *nodeImpl) OnStop() { pvsc.Wait() } - if n.prometheusSrv != nil { - if err := n.prometheusSrv.Shutdown(context.Background()); err != nil { + if srv, ok := n.prometheusSrv.Get(); ok { + if err := srv.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: logger.Error("Prometheus HTTP server Shutdown", "err", err) } - } if err := n.shutdownOps(); err != nil { if strings.TrimSpace(err.Error()) != "" { diff --git a/sei-tendermint/node/seed.go b/sei-tendermint/node/seed.go index 7f7a077cec..f372d58e8a 100644 --- a/sei-tendermint/node/seed.go +++ b/sei-tendermint/node/seed.go @@ -12,7 +12,6 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/config" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventbus" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" @@ -85,7 +84,7 @@ func makeSeedNode( nodeKey, utils.None[atypes.SecretKey](), cfg, - utils.None[*mempool.TxMempool](), + utils.None[*proxy.Proxy](), genDoc, dbProvider, ) diff --git a/sei-tendermint/node/setup.go b/sei-tendermint/node/setup.go index a13d23df75..380e8b759d 100644 --- a/sei-tendermint/node/setup.go +++ b/sei-tendermint/node/setup.go @@ -21,11 +21,11 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/consensus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventbus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/evidence" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" mempoolreactor "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool/reactor" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state/indexer" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/statesync" @@ -195,7 +195,7 @@ func buildGigaConfig( autobahnConfigFile string, nodeKey types.NodeKey, validatorKey atypes.SecretKey, - txMempool *mempool.TxMempool, + app *proxy.Proxy, genDoc *types.GenesisDoc, ) (*p2p.GigaRouterConfig, error) { if autobahnConfigFile == "" { @@ -253,15 +253,13 @@ func buildGigaConfig( PersistentStateDir: fc.PersistentStateDir, }, Producer: &producer.Config{ - MaxGasPerBlock: maxGasPerBlock, - MaxTxsPerBlock: fc.MaxTxsPerBlock, - MaxTxsPerSecond: fc.MaxTxsPerSecond, - MempoolSize: fc.MempoolSize, - BlockInterval: time.Duration(fc.BlockInterval), - AllowEmptyBlocks: fc.AllowEmptyBlocks, + App: app, + MaxGasPerBlock: maxGasPerBlock, + MaxTxsPerBlock: fc.MaxTxsPerBlock, + MaxTxsPerSecond: fc.MaxTxsPerSecond, + BlockInterval: time.Duration(fc.BlockInterval), }, - TxMempool: txMempool, - GenDoc: genDoc, + GenDoc: genDoc, }, nil } @@ -271,7 +269,7 @@ func createRouter( nodeKey types.NodeKey, validatorKey utils.Option[atypes.SecretKey], cfg *config.Config, - txMempool utils.Option[*mempool.TxMempool], + app utils.Option[*proxy.Proxy], genDoc *types.GenesisDoc, dbProvider config.DBProvider, ) (*p2p.Router, closer, error) { @@ -363,11 +361,11 @@ func createRouter( if !ok { return nil, closer, fmt.Errorf("autobahn non-validator nodes are not supported yet; a local validator key is required") } - mp, ok := txMempool.Get() + app, ok := app.Get() if !ok { - return nil, closer, errors.New("autobahn requires a tx mempool") + return nil, closer, fmt.Errorf("autobahn requires app") } - gigaCfg, err := buildGigaConfig(cfg.AutobahnConfigFile, nodeKey, valKey, mp, genDoc) + gigaCfg, err := buildGigaConfig(cfg.AutobahnConfigFile, nodeKey, valKey, app, genDoc) if err != nil { return nil, closer, fmt.Errorf("buildGigaConfig: %w", err) } diff --git a/sei-tendermint/node/setup_test.go b/sei-tendermint/node/setup_test.go index 091117038c..0f57c7198c 100644 --- a/sei-tendermint/node/setup_test.go +++ b/sei-tendermint/node/setup_test.go @@ -14,8 +14,8 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/tcp" "github.com/sei-protocol/sei-chain/sei-tendermint/types" @@ -57,9 +57,7 @@ func defaultFileConfig(validators []config.AutobahnValidator) *config.AutobahnFi Validators: validators, MaxTxsPerBlock: 5_000, MaxTxsPerSecond: utils.None[uint64](), - MempoolSize: 5_000, BlockInterval: utils.Duration(400 * time.Millisecond), - AllowEmptyBlocks: false, ViewTimeout: utils.Duration(1500 * time.Millisecond), PersistentStateDir: utils.None[string](), DialInterval: utils.Duration(10 * time.Second), @@ -69,13 +67,8 @@ func defaultFileConfig(validators []config.AutobahnValidator) *config.AutobahnFi // testGenesisMaxGas is the gas limit baked into the test genesis doc. const testGenesisMaxGas int64 = 50_000_000 -func makeTestGigaDeps() (*mempool.TxMempool, *types.GenesisDoc) { - txMempool := mempool.NewTxMempool( - mempool.TestConfig(), - kvstore.NewProxy(), - mempool.NopMetrics(), - mempool.NopTxConstraintsFetcher, - ) +func makeTestGigaDeps() (*proxy.Proxy, *types.GenesisDoc) { + app := kvstore.NewProxy() genDoc := &types.GenesisDoc{ ChainID: "test-chain", // Nontrivial InitialHeight so any future code that assumes the @@ -85,7 +78,7 @@ func makeTestGigaDeps() (*mempool.TxMempool, *types.GenesisDoc) { Block: types.BlockParams{MaxGas: testGenesisMaxGas}, }, } - return txMempool, genDoc + return app, genDoc } func TestBuildGigaConfig_EmptyPathErrors(t *testing.T) { @@ -106,9 +99,7 @@ func TestBuildGigaConfig_EnabledWithValidators(t *testing.T) { Validators: []config.AutobahnValidator{v1, v2, v3}, MaxTxsPerBlock: 5_000, MaxTxsPerSecond: utils.Some(uint64(1_000)), - MempoolSize: 20_000, BlockInterval: utils.Duration(200 * time.Millisecond), - AllowEmptyBlocks: true, ViewTimeout: utils.Duration(3 * time.Second), PersistentStateDir: utils.Some("/tmp/autobahn-state"), DialInterval: utils.Duration(5 * time.Second), @@ -144,9 +135,7 @@ func TestBuildGigaConfig_EnabledWithValidators(t *testing.T) { maxTps, ok := result.Producer.MaxTxsPerSecond.Get() require.True(t, ok) assert.Equal(t, uint64(1_000), maxTps) - assert.Equal(t, uint64(20_000), result.Producer.MempoolSize) assert.Equal(t, 200*time.Millisecond, result.Producer.BlockInterval) - assert.True(t, result.Producer.AllowEmptyBlocks) assert.Equal(t, genDoc, result.GenDoc) } diff --git a/sei-tendermint/rpc/client/eventstream/eventstream_test.go b/sei-tendermint/rpc/client/eventstream/eventstream_test.go index 252dd97276..2a4413c985 100644 --- a/sei-tendermint/rpc/client/eventstream/eventstream_test.go +++ b/sei-tendermint/rpc/client/eventstream/eventstream_test.go @@ -13,6 +13,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventlog" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventlog/cursor" rpccore "github.com/sei-protocol/sei-chain/sei-tendermint/internal/rpc/core" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/rpc/client/eventstream" "github.com/sei-protocol/sei-chain/sei-tendermint/rpc/coretypes" "github.com/sei-protocol/sei-chain/sei-tendermint/types" @@ -192,7 +193,7 @@ func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, s t.Fatalf("Creating event log: %v", err) } s.log = lg - s.env = &rpccore.Environment{EventLog: lg} + s.env = &rpccore.Environment{EventLog: utils.Some(lg)} s.stream = eventstream.New(s, query, streamOpts) return s } diff --git a/sei-tendermint/rpc/client/local/local.go b/sei-tendermint/rpc/client/local/local.go index 2aab8bffa0..73c9978c69 100644 --- a/sei-tendermint/rpc/client/local/local.go +++ b/sei-tendermint/rpc/client/local/local.go @@ -107,7 +107,13 @@ func (c *Local) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultChec } func (c *Local) EvmNextPendingNonce(addr common.Address) uint64 { - return c.Mempool.EvmNextPendingNonce(addr) + if giga, ok := c.Environment.Router.Giga().Get(); ok { + return giga.Mempool().EvmNextPendingNonce(addr) + } + if mp, ok := c.Mempool.Get(); ok { + return mp.EvmNextPendingNonce(addr) + } + return 0 } func (c *Local) EvmProxy(sender common.Address) (*url.URL, bool) { diff --git a/sei-tendermint/rpc/client/rpc_test.go b/sei-tendermint/rpc/client/rpc_test.go index ec39af03e2..8da3411ef3 100644 --- a/sei-tendermint/rpc/client/rpc_test.go +++ b/sei-tendermint/rpc/client/rpc_test.go @@ -569,7 +569,9 @@ func getMempool(t *testing.T, srv service.Service) *mempool.TxMempool { RPCEnvironment() *rpccore.Environment }) require.True(t, ok) - return n.RPCEnvironment().Mempool + mp, ok := n.RPCEnvironment().Mempool.Get() + require.True(t, ok) + return mp } // these cases are roughly the same as the TestClientMethodCalls, but