Skip to content

config options for work mode monitor #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func init() {
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes")
rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode")
viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url"))
viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest"))
viper.BindPFlag("rpc.blocks.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-blocks-batchDelay"))
Expand Down Expand Up @@ -214,6 +216,8 @@ func init() {
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes"))
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
rootCmd.AddCommand(orchestratorCmd)
rootCmd.AddCommand(apiCmd)
}
Expand Down
6 changes: 6 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ type PublisherConfig struct {
Events EventPublisherConfig `mapstructure:"events"`
}

type WorkModeConfig struct {
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
}

type Config struct {
RPC RPCConfig `mapstructure:"rpc"`
Log LogConfig `mapstructure:"log"`
Expand All @@ -176,6 +181,7 @@ type Config struct {
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
Publisher PublisherConfig `mapstructure:"publisher"`
WorkMode WorkModeConfig `mapstructure:"workMode"`
}

var Cfg Config
Expand Down
44 changes: 29 additions & 15 deletions internal/orchestrator/work_mode_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
Expand All @@ -15,26 +16,39 @@ import (
type WorkMode string

const (
WORK_MODE_CHECK_INTERVAL = 10 * time.Minute
WORK_MODE_BACKFILL_THRESHOLD = 500
WorkModeLive WorkMode = "live"
WorkModeBackfill WorkMode = "backfill"
DEFAULT_WORK_MODE_CHECK_INTERVAL = 10
DEFAULT_LIVE_MODE_THRESHOLD = 500
WorkModeLive WorkMode = "live"
WorkModeBackfill WorkMode = "backfill"
)

type WorkModeMonitor struct {
rpc rpc.IRPCClient
storage storage.IStorage
workModeChannels map[chan WorkMode]struct{}
channelsMutex sync.RWMutex
currentMode WorkMode
rpc rpc.IRPCClient
storage storage.IStorage
workModeChannels map[chan WorkMode]struct{}
channelsMutex sync.RWMutex
currentMode WorkMode
checkInterval time.Duration
liveModeThreshold *big.Int
}

func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor {
checkInterval := config.Cfg.WorkMode.CheckIntervalMinutes
if checkInterval < 1 {
checkInterval = DEFAULT_WORK_MODE_CHECK_INTERVAL
}
liveModeThreshold := config.Cfg.WorkMode.LiveModeThreshold
if liveModeThreshold < 1 {
liveModeThreshold = DEFAULT_LIVE_MODE_THRESHOLD
}
log.Info().Msgf("Work mode monitor initialized with check interval %d and live mode threshold %d", checkInterval, liveModeThreshold)
return &WorkModeMonitor{
rpc: rpc,
storage: storage,
workModeChannels: make(map[chan WorkMode]struct{}),
currentMode: "",
rpc: rpc,
storage: storage,
workModeChannels: make(map[chan WorkMode]struct{}),
currentMode: "",
checkInterval: time.Duration(checkInterval) * time.Minute,
liveModeThreshold: big.NewInt(liveModeThreshold),
}
}

Expand Down Expand Up @@ -83,7 +97,7 @@ func (m *WorkModeMonitor) Start(ctx context.Context) {
m.broadcastWorkMode(newMode)
}

ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL)
ticker := time.NewTicker(m.checkInterval)
defer ticker.Stop()

log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode)
Expand Down Expand Up @@ -145,7 +159,7 @@ func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, erro

blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64())
if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 {
if blockDiff.Cmp(m.liveModeThreshold) < 0 {
return WorkModeLive, nil
}

Expand Down