diff --git a/cmd/root.go b/cmd/root.go index 4fa4c18..c398746 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -125,6 +125,8 @@ func init() { rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events") rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address") rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0") + rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes") + rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode") viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url")) viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest")) viper.BindPFlag("rpc.blocks.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-blocks-batchDelay")) @@ -214,6 +216,8 @@ func init() { viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName")) viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter")) viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter")) + viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes")) + viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold")) rootCmd.AddCommand(orchestratorCmd) rootCmd.AddCommand(apiCmd) } diff --git a/configs/config.go b/configs/config.go index ecc5610..afb90c7 100644 --- a/configs/config.go +++ b/configs/config.go @@ -166,6 +166,11 @@ type PublisherConfig struct { Events EventPublisherConfig `mapstructure:"events"` } +type WorkModeConfig struct { + CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"` + LiveModeThreshold int64 `mapstructure:"liveModeThreshold"` +} + type Config struct { RPC RPCConfig `mapstructure:"rpc"` Log LogConfig `mapstructure:"log"` @@ -176,6 +181,7 @@ type Config struct { Storage StorageConfig `mapstructure:"storage"` API APIConfig `mapstructure:"api"` Publisher PublisherConfig `mapstructure:"publisher"` + WorkMode WorkModeConfig `mapstructure:"workMode"` } var Cfg Config diff --git a/internal/orchestrator/work_mode_monitor.go b/internal/orchestrator/work_mode_monitor.go index 298eb89..355dd4a 100644 --- a/internal/orchestrator/work_mode_monitor.go +++ b/internal/orchestrator/work_mode_monitor.go @@ -7,6 +7,7 @@ import ( "time" "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/metrics" "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" @@ -15,26 +16,39 @@ import ( type WorkMode string const ( - WORK_MODE_CHECK_INTERVAL = 10 * time.Minute - WORK_MODE_BACKFILL_THRESHOLD = 500 - WorkModeLive WorkMode = "live" - WorkModeBackfill WorkMode = "backfill" + DEFAULT_WORK_MODE_CHECK_INTERVAL = 10 + DEFAULT_LIVE_MODE_THRESHOLD = 500 + WorkModeLive WorkMode = "live" + WorkModeBackfill WorkMode = "backfill" ) type WorkModeMonitor struct { - rpc rpc.IRPCClient - storage storage.IStorage - workModeChannels map[chan WorkMode]struct{} - channelsMutex sync.RWMutex - currentMode WorkMode + rpc rpc.IRPCClient + storage storage.IStorage + workModeChannels map[chan WorkMode]struct{} + channelsMutex sync.RWMutex + currentMode WorkMode + checkInterval time.Duration + liveModeThreshold *big.Int } func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor { + checkInterval := config.Cfg.WorkMode.CheckIntervalMinutes + if checkInterval < 1 { + checkInterval = DEFAULT_WORK_MODE_CHECK_INTERVAL + } + liveModeThreshold := config.Cfg.WorkMode.LiveModeThreshold + if liveModeThreshold < 1 { + liveModeThreshold = DEFAULT_LIVE_MODE_THRESHOLD + } + log.Info().Msgf("Work mode monitor initialized with check interval %d and live mode threshold %d", checkInterval, liveModeThreshold) return &WorkModeMonitor{ - rpc: rpc, - storage: storage, - workModeChannels: make(map[chan WorkMode]struct{}), - currentMode: "", + rpc: rpc, + storage: storage, + workModeChannels: make(map[chan WorkMode]struct{}), + currentMode: "", + checkInterval: time.Duration(checkInterval) * time.Minute, + liveModeThreshold: big.NewInt(liveModeThreshold), } } @@ -83,7 +97,7 @@ func (m *WorkModeMonitor) Start(ctx context.Context) { m.broadcastWorkMode(newMode) } - ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL) + ticker := time.NewTicker(m.checkInterval) defer ticker.Stop() log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode) @@ -145,7 +159,7 @@ func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, erro blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock) log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64()) - if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 { + if blockDiff.Cmp(m.liveModeThreshold) < 0 { return WorkModeLive, nil }