diff --git a/sei-cosmos/server/start.go b/sei-cosmos/server/start.go index b55ca79008..45621a433c 100644 --- a/sei-cosmos/server/start.go +++ b/sei-cosmos/server/start.go @@ -4,8 +4,10 @@ package server import ( "context" + "errors" "fmt" "net/http" + "sync" //nolint:gosec,G108 _ "net/http/pprof" "os" @@ -175,9 +177,6 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. tracerProviderOptions = []trace.TracerProviderOption{} } - // amino is needed here for backwards compatibility of REST routes - exitCode := RestartErrorCode - serverCtx.Logger.Info("Creating node metrics provider") nodeMetricsProvider := node.DefaultMetricsProvider(serverCtx.Config.Instrumentation)(clientCtx.ChainID) @@ -193,33 +192,21 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. } } - restartCoolDownDuration := time.Second * time.Duration(serverCtx.Config.SelfRemediation.RestartCooldownSeconds) - // Set the first restart time to be now - restartCoolDownDuration so that the first restart can trigger whenever - canRestartAfter := time.Now().Add(-restartCoolDownDuration) - serverCtx.Logger.Info("Starting Process") for { - err = startInProcess( + err := startInProcess( serverCtx, clientCtx, appCreator, tracerProviderOptions, nodeMetricsProvider, apiMetrics, - canRestartAfter, ) - errCode, ok := err.(ErrorCode) - exitCode = errCode.Code - if !ok { + if !errors.Is(err, ErrShouldRestart) { return err } - if exitCode != RestartErrorCode { - break - } serverCtx.Logger.Info("restarting node...") - canRestartAfter = time.Now().Add(restartCoolDownDuration) } - return nil }, } @@ -283,7 +270,6 @@ func startInProcess( tracerProviderOptions []trace.TracerProviderOption, nodeMetricsProvider *node.NodeMetrics, apiMetrics *telemetry.Metrics, - canRestartAfter time.Time, ) error { cfg := ctx.Config home := cfg.RootDir @@ -330,13 +316,20 @@ func startInProcess( } app := appCreator(ctx.Logger, db, traceWriter, ctx.Config, ctx.Viper) - var ( - tmNode service.Service - restartCh chan struct{} - gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly) - ) + gRPCOnly := ctx.Viper.GetBool(flagGRPCOnly) + var tmNode service.Service - restartCh = make(chan struct{}) + var restartMtx sync.Mutex + restartCh := make(chan struct{}) + restartEvent := func() { + restartMtx.Lock() + defer restartMtx.Unlock() + select { + case <-restartCh: + default: + close(restartCh) + } + } if gRPCOnly { ctx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") @@ -361,7 +354,7 @@ func startInProcess( goCtx, ctx.Config, ctx.Logger, - restartCh, + restartEvent, abciclient.NewLocalClient(ctx.Logger, app), gen, tracerProviderOptions, @@ -434,7 +427,7 @@ func startInProcess( // we do not need to start Rosetta or handle any Tendermint related processes. if gRPCOnly { // wait for signal capture and gracefully return - return WaitForQuitSignals(ctx, restartCh, canRestartAfter) + return WaitForQuitSignals(goCtx, restartCh) } var rosettaSrv crgserver.Server @@ -507,5 +500,5 @@ func startInProcess( }() // wait for signal capture and gracefully return - return WaitForQuitSignals(ctx, restartCh, canRestartAfter) + return WaitForQuitSignals(goCtx, restartCh) } diff --git a/sei-cosmos/server/util.go b/sei-cosmos/server/util.go index 0c295bebfd..4f763f522f 100644 --- a/sei-cosmos/server/util.go +++ b/sei-cosmos/server/util.go @@ -1,6 +1,7 @@ package server import ( + "context" "errors" "fmt" "io" @@ -39,8 +40,7 @@ import ( // a command's Context. const ServerContextKey = sdk.ContextKey("server.context") -// Error code reserved for signalled -const RestartErrorCode = 100 +var ErrShouldRestart = errors.New("node should be restarted") // server context type Context struct { @@ -135,7 +135,7 @@ func InterceptConfigs(cmd *cobra.Command) (*tmcfg.Config, error) { // is used to read and parse the application configuration. Command handlers can // fetch the server Context to get the Tendermint configuration or to get access // to Viper. -func InterceptConfigsPreRunHandler(cmd *cobra.Command, customAppConfigTemplate string, customAppConfig interface{}) error { +func InterceptConfigsPreRunHandler(cmd *cobra.Command, customAppConfigTemplate string, customAppConfig any) error { serverCtx := NewDefaultContext() // Get the executable name and configure the viper instance so that environmental @@ -221,7 +221,7 @@ func SetCmdServerContext(cmd *cobra.Command, serverCtx *Context) error { // configuration file. The Tendermint configuration file is parsed given a root // Viper object, whereas the application is parsed with the private package-aware // viperCfg object. -func interceptConfigs(rootViper *viper.Viper, customAppTemplate string, customConfig interface{}) (*tmcfg.Config, error) { +func interceptConfigs(rootViper *viper.Viper, customAppTemplate string, customConfig any) (*tmcfg.Config, error) { rootDir := rootViper.GetString(flags.FlagHome) configPath := filepath.Join(rootDir, "config") tmCfgFile := filepath.Join(configPath, "config.toml") @@ -408,26 +408,14 @@ func TrapSignal(cleanupFunc func()) { } // WaitForQuitSignals waits for SIGINT and SIGTERM and returns. -func WaitForQuitSignals(ctx *Context, restartCh chan struct{}, canRestartAfter time.Time) ErrorCode { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - if restartCh != nil { - for { - select { - case sig := <-sigs: - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} - case <-restartCh: - // If it's in the restart cooldown period - if time.Now().Before(canRestartAfter) { - ctx.Logger.Info("Restarting too frequently, can only restart after %s", canRestartAfter) - continue - } - return ErrorCode{Code: RestartErrorCode} - } - } - } else { - sig := <-sigs - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} +func WaitForQuitSignals(ctx context.Context, restartCh chan struct{}) error { + ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + select { + case <-ctx.Done(): + return nil + case <-restartCh: // blocks forever on a nil channel + return ErrShouldRestart } } diff --git a/sei-cosmos/server/util_test.go b/sei-cosmos/server/util_test.go index af2c92b48e..86eb076048 100644 --- a/sei-cosmos/server/util_test.go +++ b/sei-cosmos/server/util_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "os" - "os/signal" "path" "path/filepath" "strings" @@ -13,7 +12,6 @@ import ( "testing" "time" - "github.com/sei-protocol/sei-chain/sei-tendermint/libs/log" "github.com/spf13/cobra" "go.opentelemetry.io/otel/sdk/trace" @@ -103,8 +101,7 @@ func TestInterceptConfigsPreRunHandlerReadsConfigToml(t *testing.T) { t.Fatalf("creating config.toml file failed: %v", err) } - _, err = writer.WriteString(fmt.Sprintf("db-backend = '%s'\n", testDbBackend)) - if err != nil { + if _, err := fmt.Fprintf(writer, "db-backend = '%s'\n", testDbBackend); err != nil { t.Fatalf("Failed writing string to config.toml: %v", err) } @@ -144,8 +141,7 @@ func TestInterceptConfigsPreRunHandlerReadsAppToml(t *testing.T) { t.Fatalf("creating app.toml file failed: %v", err) } - _, err = writer.WriteString(fmt.Sprintf("halt-time = %d\n", testHaltTime)) - if err != nil { + if _, err := fmt.Fprintf(writer, "halt-time = %d\n", testHaltTime); err != nil { t.Fatalf("Failed writing string to app.toml: %v", err) } @@ -308,8 +304,7 @@ func (v precedenceCommon) setAll(t *testing.T, setFlag *string, setEnvVar *strin t.Fatalf("creating config.toml file failed: %v", err) } - _, err = writer.WriteString(fmt.Sprintf("[rpc]\nladdr = \"%s\"\n", *setConfigFile)) - if err != nil { + if _, err := fmt.Fprintf(writer, "[rpc]\nladdr = \"%s\"\n", *setConfigFile); err != nil { t.Fatalf("Failed writing string to config.toml: %v", err) } @@ -407,63 +402,28 @@ func TestInterceptConfigsWithBadPermissions(t *testing.T) { } func TestWaitForQuitSignals(t *testing.T) { - t.Run("WithRestartChannelAndCanRestartAfterNotReached", func(t *testing.T) { + t.Run("WithRestartChannelAndCanRestart", func(t *testing.T) { restartCh := make(chan struct{}) go func() { time.Sleep(100 * time.Millisecond) restartCh <- struct{}{} }() - go func() { - time.Sleep(200 * time.Millisecond) - syscall.Kill(syscall.Getpid(), syscall.SIGTERM) - }() - - errCode := server.WaitForQuitSignals( - &server.Context{Logger: log.NewNopLogger()}, - restartCh, - time.Now().Add(500*time.Millisecond), - ) - expectedCode := int(syscall.SIGTERM) + 128 - if errCode.Code != expectedCode { - t.Errorf("Expected error code %d, got %d", expectedCode, errCode.Code) - } - }) - - t.Run("WithRestartChannelAndCanRestartAfterReached", func(t *testing.T) { - restartCh := make(chan struct{}) - go func() { - time.Sleep(100 * time.Millisecond) - restartCh <- struct{}{} - }() - - errCode := server.WaitForQuitSignals( - &server.Context{Logger: log.NewNopLogger()}, - restartCh, - time.Now().Add(-100*time.Millisecond), - ) - if errCode.Code != server.RestartErrorCode { - t.Errorf("Expected error code %d, got %d", server.RestartErrorCode, errCode.Code) + err := server.WaitForQuitSignals(t.Context(), restartCh) + if !errors.Is(err, server.ErrShouldRestart) { + t.Errorf("Expected ErrShouldRestart, got %v", err) } }) t.Run("WithSIGINT", func(t *testing.T) { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT) - go func() { time.Sleep(100 * time.Millisecond) syscall.Kill(syscall.Getpid(), syscall.SIGINT) }() - errCode := server.WaitForQuitSignals( - &server.Context{Logger: log.NewNopLogger()}, - make(chan struct{}), - time.Now(), - ) - expectedCode := int(syscall.SIGINT) + 128 - if errCode.Code != expectedCode { - t.Errorf("Expected error code %d, got %d", expectedCode, errCode.Code) + err := server.WaitForQuitSignals(t.Context(), make(chan struct{})) + if err != nil { + t.Fatal(err) } }) @@ -473,14 +433,9 @@ func TestWaitForQuitSignals(t *testing.T) { syscall.Kill(syscall.Getpid(), syscall.SIGTERM) }() - errCode := server.WaitForQuitSignals( - &server.Context{Logger: log.NewNopLogger()}, - make(chan struct{}), - time.Now(), - ) - expectedCode := int(syscall.SIGTERM) + 128 - if errCode.Code != expectedCode { - t.Errorf("Expected error code %d, got %d", expectedCode, errCode.Code) + err := server.WaitForQuitSignals(t.Context(), make(chan struct{})) + if err != nil { + t.Fatal(err) } }) } diff --git a/sei-cosmos/testutil/network/util.go b/sei-cosmos/testutil/network/util.go index bd66dc2b9c..5f5a689663 100644 --- a/sei-cosmos/testutil/network/util.go +++ b/sei-cosmos/testutil/network/util.go @@ -2,7 +2,7 @@ package network import ( "encoding/json" - "io/ioutil" + "os" "path/filepath" "time" @@ -56,7 +56,7 @@ func startInProcess(cfg Config, val *Validator) error { val.GoCtx, tmCfg, logger, - make(chan struct{}), + func() {}, abciclient.NewLocalClient(logger, app), defaultGensis, []trace.TracerProviderOption{}, @@ -211,15 +211,9 @@ func writeFile(name string, dir string, contents []byte) error { writePath := filepath.Join(dir) file := filepath.Join(writePath, name) - err := tmos.EnsureDir(writePath, 0755) - if err != nil { - return err - } - - err = ioutil.WriteFile(file, contents, 0644) - if err != nil { + if err := tmos.EnsureDir(writePath, 0755); err != nil { return err } - return nil + return os.WriteFile(file, contents, 0644) } diff --git a/sei-tendermint/cmd/tendermint/commands/run_node.go b/sei-tendermint/cmd/tendermint/commands/run_node.go index 27306bff10..485e3ac345 100644 --- a/sei-tendermint/cmd/tendermint/commands/run_node.go +++ b/sei-tendermint/cmd/tendermint/commands/run_node.go @@ -13,6 +13,7 @@ import ( cfg "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/log" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) var ( @@ -100,7 +101,7 @@ func addDBFlags(cmd *cobra.Command, conf *cfg.Config) { // NewRunNodeCmd returns the command that allows the CLI to start a node. // It can be used with a custom PrivValidator and in-process ABCI application. -func NewRunNodeCmd(nodeProvider cfg.ServiceProvider, conf *cfg.Config, logger log.Logger, restartCh chan struct{}) *cobra.Command { +func NewRunNodeCmd(nodeProvider cfg.ServiceProvider, conf *cfg.Config, logger log.Logger) *cobra.Command { cmd := &cobra.Command{ Use: "start", Aliases: []string{"node", "run"}, @@ -113,7 +114,9 @@ func NewRunNodeCmd(nodeProvider cfg.ServiceProvider, conf *cfg.Config, logger lo ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM) defer cancel() - n, err := nodeProvider(ctx, conf, logger, restartCh) + restart := utils.NewAtomicSend(false) + + n, err := nodeProvider(ctx, conf, logger, func() { restart.Store(true) }) if err != nil { return fmt.Errorf("failed to create node: %w", err) } @@ -124,16 +127,15 @@ func NewRunNodeCmd(nodeProvider cfg.ServiceProvider, conf *cfg.Config, logger lo logger.Info("started node", "chain", conf.ChainID()) - for { - select { - case <-ctx.Done(): - return nil - case <-restartCh: - logger.Info("Received signal to restart node.") - n.Stop() - os.Exit(1) - } + if _, err := restart.Wait(ctx, func(x bool) bool { return x }); err != nil { + // Context canceled. + // TODO(gprusak): shouldn't we stop the node either way though? + return nil } + logger.Info("Received signal to restart node.") + n.Stop() + os.Exit(1) + panic("unreachable") }, } diff --git a/sei-tendermint/cmd/tendermint/main.go b/sei-tendermint/cmd/tendermint/main.go index 354131e11d..17364e2067 100644 --- a/sei-tendermint/cmd/tendermint/main.go +++ b/sei-tendermint/cmd/tendermint/main.go @@ -25,9 +25,6 @@ func main() { panic(err) } - // A stop gap solution to restart the node for certain failures (such as network partition) - restartCh := make(chan struct{}) - rcmd := commands.RootCommand(conf, logger) rcmd.AddCommand( commands.MakeGenValidatorCommand(), @@ -60,7 +57,7 @@ func main() { nodeFunc := node.NewDefault // Create & start node - rcmd.AddCommand(commands.NewRunNodeCmd(nodeFunc, conf, logger, restartCh)) + rcmd.AddCommand(commands.NewRunNodeCmd(nodeFunc, conf, logger)) if err := cli.RunWithTrace(ctx, rcmd); err != nil { panic(err) diff --git a/sei-tendermint/config/db.go b/sei-tendermint/config/db.go index e22ac54738..f76710579c 100644 --- a/sei-tendermint/config/db.go +++ b/sei-tendermint/config/db.go @@ -10,7 +10,7 @@ import ( ) // ServiceProvider takes a config and a logger and returns a ready to go Node. -type ServiceProvider func(context.Context, *Config, log.Logger, chan struct{}) (service.Service, error) +type ServiceProvider func(ctx context.Context, cfg *Config, logger log.Logger, restartEvent func()) (service.Service, error) // DBContext specifies config information for loading a new DB. type DBContext struct { diff --git a/sei-tendermint/internal/blocksync/reactor.go b/sei-tendermint/internal/blocksync/reactor.go index e3c28d973b..7ddf39a514 100644 --- a/sei-tendermint/internal/blocksync/reactor.go +++ b/sei-tendermint/internal/blocksync/reactor.go @@ -111,7 +111,7 @@ type Reactor struct { syncStartTime time.Time - restartCh chan struct{} + restartEvent func() lastRestartTime time.Time blocksBehindThreshold uint64 blocksBehindCheckInterval time.Duration @@ -129,7 +129,7 @@ func NewReactor( blockSync bool, metrics *consensus.Metrics, eventBus *eventbus.EventBus, - restartCh chan struct{}, + restartEvent func(), // should be idempotent and non-blocking selfRemediationConfig *config.SelfRemediationConfig, ) (*Reactor, error) { channel, err := p2p.OpenChannel(router, GetChannelDescriptor()) @@ -147,7 +147,7 @@ func NewReactor( channel: channel, metrics: metrics, eventBus: eventBus, - restartCh: restartCh, + restartEvent: restartEvent, lastRestartTime: time.Now(), blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold, blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second, //nolint:gosec // validated in config.ValidateBasic against MaxInt64 @@ -298,6 +298,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { // autoRestartIfBehind will check if the node is behind the max peer height by // a certain threshold. If it is, the node will attempt to restart itself +// TODO(gprusak): this should be a sub task of the consensus reactor instead. func (r *Reactor) autoRestartIfBehind(ctx context.Context) { if r.blocksBehindThreshold == 0 || r.blocksBehindCheckInterval <= 0 { r.logger.Info("Auto remediation is disabled") @@ -322,18 +323,17 @@ func (r *Reactor) autoRestartIfBehind(ctx context.Context) { r.logger.Debug("does not exceed threshold or is already in block sync mode", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight, "blockSyncIsSet", blockSyncIsSet) continue } - // Check if we have met cooldown time if time.Since(r.lastRestartTime).Seconds() < float64(r.restartCooldownSeconds) { r.logger.Debug("we are lagging behind, going to trigger a restart after cooldown time passes") continue } - r.logger.Info("Blocks behind threshold, restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight) // Send signal to restart the node r.blockSync.Set() - r.restartCh <- struct{}{} + r.restartEvent() + return case <-ctx.Done(): return } diff --git a/sei-tendermint/internal/blocksync/reactor_test.go b/sei-tendermint/internal/blocksync/reactor_test.go index 222b61aa71..9fca9cd5a7 100644 --- a/sei-tendermint/internal/blocksync/reactor_test.go +++ b/sei-tendermint/internal/blocksync/reactor_test.go @@ -27,6 +27,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/store" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/test/factory" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/log" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) @@ -89,7 +90,7 @@ func makeReactor( t *testing.T, genDoc *types.GenesisDoc, router *p2p.Router, - restartChan chan struct{}, + restartEvent func(), selfRemediationConfig *config.SelfRemediationConfig, ) *Reactor { @@ -144,7 +145,7 @@ func makeReactor( true, consensus.NopMetrics(), nil, // eventbus, can be nil - restartChan, + restartEvent, selfRemediationConfig, ) if err != nil { @@ -169,7 +170,6 @@ func (rts *reactorTestSuite) addNode( rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics()) require.NoError(t, rts.app[nodeID].Start(ctx)) - restartChan := make(chan struct{}) remediationConfig := config.DefaultSelfRemediationConfig() remediationConfig.BlocksBehindThreshold = 1000 @@ -178,7 +178,7 @@ func (rts *reactorTestSuite) addNode( t, genDoc, rts.network.Node(nodeID).Router, - restartChan, + func() {}, config.DefaultSelfRemediationConfig(), ) lastCommit := &types.Commit{} @@ -371,27 +371,26 @@ func TestAutoRestartIfBehind(t *testing.T) { maxPeerHeight: tt.maxPeerHeight, } - restartChan := make(chan struct{}, 1) + restart := utils.NewAtomicSend(false) r := &Reactor{ logger: log.TestingLogger(), store: mockBlockStore, pool: blockPool, blocksBehindThreshold: tt.blocksBehindThreshold, blocksBehindCheckInterval: tt.blocksBehindCheckInterval, - restartCh: restartChan, + restartEvent: func() { restart.Store(true) }, blockSync: newAtomicBool(tt.isBlockSync), } - ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) - defer cancel() - - go r.autoRestartIfBehind(ctx) - - select { - case <-restartChan: - assert.True(t, tt.restartExpected, "Unexpected restart") - case <-time.After(50 * time.Millisecond): - assert.False(t, tt.restartExpected, "Expected restart but did not occur") + ctx := t.Context() + if tt.restartExpected { + r.autoRestartIfBehind(ctx) + assert.True(t, restart.Load(), "Expected restart but did not occur") + } else { + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + defer cancel() + r.autoRestartIfBehind(ctx) + assert.False(t, restart.Load(), "Unexpected restart") } }) } diff --git a/sei-tendermint/internal/statesync/reactor.go b/sei-tendermint/internal/statesync/reactor.go index c7f660d30c..130ab057df 100644 --- a/sei-tendermint/internal/statesync/reactor.go +++ b/sei-tendermint/internal/statesync/reactor.go @@ -214,7 +214,7 @@ type Reactor struct { lastNoAvailablePeers time.Time // Used to signal a restart the node on the application level - restartCh chan struct{} + restartEvent func() restartNoAvailablePeersWindow time.Duration } @@ -237,7 +237,7 @@ func NewReactor( eventBus *eventbus.EventBus, postSyncHook func(context.Context, sm.State) error, needsStateSync bool, - restartCh chan struct{}, + restartEvent func(), selfRemediationConfig *config.SelfRemediationConfig, ) (*Reactor, error) { snapshotChannel, err := p2p.OpenChannel(router, GetSnapshotChannelDescriptor()) @@ -277,7 +277,7 @@ func NewReactor( lightBlockChannel: lightBlockChannel, paramsChannel: paramsChannel, lastNoAvailablePeers: time.Time{}, - restartCh: restartCh, + restartEvent: restartEvent, restartNoAvailablePeersWindow: time.Duration(selfRemediationConfig.StatesyncNoPeersRestartWindowSeconds) * time.Second, //nolint:gosec // validated in config.ValidateBasic against MaxInt64 } @@ -1020,7 +1020,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { r.lastNoAvailablePeers = time.Now() } else if time.Since(r.lastNoAvailablePeers) > r.restartNoAvailablePeersWindow { r.logger.Error("no available peers left for statesync (restarting router)") - r.restartCh <- struct{}{} + r.restartEvent() } } else { // Reset diff --git a/sei-tendermint/internal/statesync/reactor_test.go b/sei-tendermint/internal/statesync/reactor_test.go index fbc6dc01c1..72a3b5af58 100644 --- a/sei-tendermint/internal/statesync/reactor_test.go +++ b/sei-tendermint/internal/statesync/reactor_test.go @@ -86,7 +86,7 @@ func setup( nil, // eventbus can be nil nil, // post-sync-hook false, // run Sync during Start() - make(chan struct{}), + func() {}, config.DefaultSelfRemediationConfig(), ) require.NoError(t, err) diff --git a/sei-tendermint/node/node.go b/sei-tendermint/node/node.go index 7c5ce46bca..9456fe0339 100644 --- a/sei-tendermint/node/node.go +++ b/sei-tendermint/node/node.go @@ -85,7 +85,7 @@ func newDefaultNode( ctx context.Context, cfg *config.Config, logger log.Logger, - restartCh chan struct{}, + restartEvent func(), ) (service.Service, error) { nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile()) if err != nil { @@ -99,10 +99,8 @@ func newDefaultNode( if cfg.Mode == config.ModeSeed { return makeSeedNode( - ctx, logger, cfg, - restartCh, config.DefaultDBProvider, nodeKey, defaultGenesisDocProviderFunc(cfg), @@ -118,7 +116,7 @@ func newDefaultNode( return makeNode( ctx, cfg, - restartCh, + restartEvent, pval, nodeKey, appClient, @@ -134,7 +132,7 @@ func newDefaultNode( func makeNode( ctx context.Context, cfg *config.Config, - restartCh chan struct{}, + restartEvent func(), filePrivval *privval.FilePV, nodeKey types.NodeKey, client abciclient.Client, @@ -342,7 +340,7 @@ func makeNode( blockSync && !stateSync, nodeMetrics.consensus, eventBus, - restartCh, + restartEvent, cfg.SelfRemediation, ) if err != nil { @@ -405,7 +403,7 @@ func makeNode( // the post-sync operation postSyncHook, stateSync, - restartCh, + restartEvent, cfg.SelfRemediation, ) if err != nil { diff --git a/sei-tendermint/node/node_test.go b/sei-tendermint/node/node_test.go index e279a88624..4a03851a25 100644 --- a/sei-tendermint/node/node_test.go +++ b/sei-tendermint/node/node_test.go @@ -50,7 +50,7 @@ func TestNodeStartStop(t *testing.T) { logger := log.NewNopLogger() // create & start node - ns, err := newDefaultNode(ctx, cfg, logger, make(chan struct{})) + ns, err := newDefaultNode(ctx, cfg, logger, func() {}) require.NoError(t, err) n, ok := ns.(*nodeImpl) @@ -83,7 +83,7 @@ func TestNodeStartStop(t *testing.T) { func getTestNode(ctx context.Context, t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl { t.Helper() - ns, err := newDefaultNode(ctx, conf, logger, make(chan struct{})) + ns, err := newDefaultNode(ctx, conf, logger, func() {}) require.NoError(t, err) n, ok := ns.(*nodeImpl) @@ -200,7 +200,7 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) { logger := log.NewNopLogger() - n, err := newDefaultNode(ctx, cfg, logger, make(chan struct{})) + n, err := newDefaultNode(ctx, cfg, logger, func() {}) assert.Error(t, err) @@ -585,10 +585,8 @@ func TestNodeNewSeedNode(t *testing.T) { logger := log.NewNopLogger() ns, err := makeSeedNode( - ctx, logger, cfg, - make(chan struct{}), config.DefaultDBProvider, nodeKey, defaultGenesisDocProviderFunc(cfg), @@ -623,7 +621,7 @@ func TestNodeSetEventSink(t *testing.T) { logger := log.NewNopLogger() - setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink { + setupTest := func(t *testing.T) []indexer.EventSink { eventBus := eventbus.NewDefault(logger.With("module", "events")) require.NoError(t, eventBus.Start(ctx)) @@ -652,36 +650,36 @@ func TestNodeSetEventSink(t *testing.T) { } } - eventSinks := setupTest(t, cfg) + eventSinks := setupTest(t) assert.Equal(t, 1, len(eventSinks)) assert.Equal(t, indexer.KV, eventSinks[0].Type()) cfg.TxIndex.Indexer = []string{"null"} - eventSinks = setupTest(t, cfg) + eventSinks = setupTest(t) assert.Equal(t, 1, len(eventSinks)) assert.Equal(t, indexer.NULL, eventSinks[0].Type()) cfg.TxIndex.Indexer = []string{"null", "kv"} - eventSinks = setupTest(t, cfg) + eventSinks = setupTest(t) assert.Equal(t, 1, len(eventSinks)) assert.Equal(t, indexer.NULL, eventSinks[0].Type()) cfg.TxIndex.Indexer = []string{"kvv"} - ns, err := newDefaultNode(ctx, cfg, logger, make(chan struct{})) + ns, err := newDefaultNode(ctx, cfg, logger, func() {}) assert.Nil(t, ns) assert.Contains(t, err.Error(), "unsupported event sink type") t.Cleanup(cleanup(ns)) cfg.TxIndex.Indexer = []string{} - eventSinks = setupTest(t, cfg) + eventSinks = setupTest(t) assert.Equal(t, 1, len(eventSinks)) assert.Equal(t, indexer.NULL, eventSinks[0].Type()) cfg.TxIndex.Indexer = []string{"psql"} - ns, err = newDefaultNode(ctx, cfg, logger, make(chan struct{})) + ns, err = newDefaultNode(ctx, cfg, logger, func() {}) assert.Nil(t, ns) assert.Contains(t, err.Error(), "the psql connection settings cannot be empty") t.Cleanup(cleanup(ns)) @@ -691,13 +689,13 @@ func TestNodeSetEventSink(t *testing.T) { var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml") cfg.TxIndex.Indexer = []string{"null", "kv", "Kv"} - ns, err = newDefaultNode(ctx, cfg, logger, make(chan struct{})) + ns, err = newDefaultNode(ctx, cfg, logger, func() {}) require.Error(t, err) assert.Contains(t, err.Error(), e.Error()) t.Cleanup(cleanup(ns)) cfg.TxIndex.Indexer = []string{"Null", "kV", "kv", "nUlL"} - ns, err = newDefaultNode(ctx, cfg, logger, make(chan struct{})) + ns, err = newDefaultNode(ctx, cfg, logger, func() {}) require.Error(t, err) assert.Contains(t, err.Error(), e.Error()) t.Cleanup(cleanup(ns)) diff --git a/sei-tendermint/node/public.go b/sei-tendermint/node/public.go index 59c0368dac..5d378620bc 100644 --- a/sei-tendermint/node/public.go +++ b/sei-tendermint/node/public.go @@ -22,9 +22,9 @@ func NewDefault( ctx context.Context, conf *config.Config, logger log.Logger, - restartCh chan struct{}, + restartEvent func(), ) (service.Service, error) { - return newDefaultNode(ctx, conf, logger, restartCh) + return newDefaultNode(ctx, conf, logger, restartEvent) } // New constructs a tendermint node. The ClientCreator makes it @@ -37,7 +37,7 @@ func New( ctx context.Context, conf *config.Config, logger log.Logger, - restartCh chan struct{}, + restartEvent func(), cf abciclient.Client, gen *types.GenesisDoc, tracerProviderOptions []trace.TracerProviderOption, @@ -66,7 +66,7 @@ func New( return makeNode( ctx, conf, - restartCh, + restartEvent, pval, nodeKey, cf, @@ -78,10 +78,8 @@ func New( ) case config.ModeSeed: return makeSeedNode( - ctx, logger, conf, - restartCh, config.DefaultDBProvider, nodeKey, genProvider, diff --git a/sei-tendermint/node/seed.go b/sei-tendermint/node/seed.go index 7ca79a096e..d027fa5630 100644 --- a/sei-tendermint/node/seed.go +++ b/sei-tendermint/node/seed.go @@ -46,19 +46,14 @@ type seedNodeImpl struct { // makeSeedNode returns a new seed node, containing only p2p, pex reactor func makeSeedNode( - ctx context.Context, logger log.Logger, cfg *config.Config, - restartCh chan struct{}, dbProvider config.DBProvider, nodeKey types.NodeKey, genesisDocProvider genesisDocProvider, client abciclient.Client, nodeMetrics *NodeMetrics, ) (service.Service, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - if !cfg.P2P.PexReactor { return nil, errors.New("cannot run seed nodes with PEX disabled") } @@ -84,19 +79,6 @@ func makeSeedNode( fmt.Errorf("failed to create router: %w", err), peerCloser) } - // Register a listener to restart router if signalled to do so - go func() { - for range restartCh { - logger.Info("Received signal to restart router, restarting...") - router.OnStop() - router.Wait() - logger.Info("Router successfully stopped. Restarting...") - // Start the transport. - if err := router.Start(ctx); err != nil { - logger.Error("Unable to start router, retrying...", err) - } - } - }() pexReactor, err := pex.NewReactor(logger, router, pex.DefaultSendInterval) if err != nil { @@ -106,7 +88,6 @@ func makeSeedNode( proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy) closers := make([]closer, 0, 2) - closers = append(closers, convertCancelCloser(cancel)) blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider) if err != nil { return nil, combineCloseError(err, dbCloser) diff --git a/sei-tendermint/rpc/test/helpers.go b/sei-tendermint/rpc/test/helpers.go index ec60e816cf..0669b5a036 100644 --- a/sei-tendermint/rpc/test/helpers.go +++ b/sei-tendermint/rpc/test/helpers.go @@ -104,7 +104,7 @@ func StartTendermint( ctx, conf, logger, - make(chan struct{}), + func() {}, papp, nil, []trace.TracerProviderOption{}, diff --git a/sei-tendermint/test/e2e/node/main.go b/sei-tendermint/test/e2e/node/main.go index 48ea4b4e77..d68c888bba 100644 --- a/sei-tendermint/test/e2e/node/main.go +++ b/sei-tendermint/test/e2e/node/main.go @@ -132,7 +132,7 @@ func startNode(ctx context.Context, cfg *Config) error { ctx, tmcfg, nodeLogger, - make(chan struct{}), + func() {}, abciclient.NewLocalClient(nodeLogger, app), nil, []trace.TracerProviderOption{}, @@ -152,7 +152,7 @@ func startSeedNode(ctx context.Context) error { tmcfg.Mode = config.ModeSeed - n, err := node.New(ctx, tmcfg, nodeLogger, make(chan struct{}), nil, nil, []trace.TracerProviderOption{}, nil) + n, err := node.New(ctx, tmcfg, nodeLogger, func() {}, nil, nil, []trace.TracerProviderOption{}, nil) if err != nil { return err }