Skip to content

Commit 6023d43

Browse files
committed
config options for work mode monitor
1 parent adad370 commit 6023d43

File tree

3 files changed

+39
-15
lines changed

3 files changed

+39
-15
lines changed

cmd/root.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ func init() {
125125
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
126126
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
127127
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
128+
rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes")
129+
rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode")
128130
viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url"))
129131
viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest"))
130132
viper.BindPFlag("rpc.blocks.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-blocks-batchDelay"))
@@ -214,6 +216,8 @@ func init() {
214216
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
215217
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
216218
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
219+
viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes"))
220+
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
217221
rootCmd.AddCommand(orchestratorCmd)
218222
rootCmd.AddCommand(apiCmd)
219223
}

configs/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ type PublisherConfig struct {
166166
Events EventPublisherConfig `mapstructure:"events"`
167167
}
168168

169+
type WorkModeConfig struct {
170+
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
171+
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
172+
}
173+
169174
type Config struct {
170175
RPC RPCConfig `mapstructure:"rpc"`
171176
Log LogConfig `mapstructure:"log"`
@@ -176,6 +181,7 @@ type Config struct {
176181
Storage StorageConfig `mapstructure:"storage"`
177182
API APIConfig `mapstructure:"api"`
178183
Publisher PublisherConfig `mapstructure:"publisher"`
184+
WorkMode WorkModeConfig `mapstructure:"workMode"`
179185
}
180186

181187
var Cfg Config

internal/orchestrator/work_mode_monitor.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/rs/zerolog/log"
10+
config "github.com/thirdweb-dev/indexer/configs"
1011
"github.com/thirdweb-dev/indexer/internal/metrics"
1112
"github.com/thirdweb-dev/indexer/internal/rpc"
1213
"github.com/thirdweb-dev/indexer/internal/storage"
@@ -15,26 +16,39 @@ import (
1516
type WorkMode string
1617

1718
const (
18-
WORK_MODE_CHECK_INTERVAL = 10 * time.Minute
19-
WORK_MODE_BACKFILL_THRESHOLD = 500
20-
WorkModeLive WorkMode = "live"
21-
WorkModeBackfill WorkMode = "backfill"
19+
DEFAULT_WORK_MODE_CHECK_INTERVAL = 10
20+
DEFAULT_LIVE_MODE_THRESHOLD = 500
21+
WorkModeLive WorkMode = "live"
22+
WorkModeBackfill WorkMode = "backfill"
2223
)
2324

2425
type WorkModeMonitor struct {
25-
rpc rpc.IRPCClient
26-
storage storage.IStorage
27-
workModeChannels map[chan WorkMode]struct{}
28-
channelsMutex sync.RWMutex
29-
currentMode WorkMode
26+
rpc rpc.IRPCClient
27+
storage storage.IStorage
28+
workModeChannels map[chan WorkMode]struct{}
29+
channelsMutex sync.RWMutex
30+
currentMode WorkMode
31+
checkInterval time.Duration
32+
liveModeThreshold *big.Int
3033
}
3134

3235
func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor {
36+
checkInterval := config.Cfg.WorkMode.CheckIntervalMinutes
37+
if checkInterval < 1 {
38+
checkInterval = DEFAULT_WORK_MODE_CHECK_INTERVAL
39+
}
40+
liveModeThreshold := config.Cfg.WorkMode.LiveModeThreshold
41+
if liveModeThreshold < 1 {
42+
liveModeThreshold = DEFAULT_LIVE_MODE_THRESHOLD
43+
}
44+
log.Info().Msgf("Work mode monitor initialized with check interval %d and live mode threshold %d", checkInterval, liveModeThreshold)
3345
return &WorkModeMonitor{
34-
rpc: rpc,
35-
storage: storage,
36-
workModeChannels: make(map[chan WorkMode]struct{}),
37-
currentMode: "",
46+
rpc: rpc,
47+
storage: storage,
48+
workModeChannels: make(map[chan WorkMode]struct{}),
49+
currentMode: "",
50+
checkInterval: time.Duration(checkInterval) * time.Minute,
51+
liveModeThreshold: big.NewInt(liveModeThreshold),
3852
}
3953
}
4054

@@ -83,7 +97,7 @@ func (m *WorkModeMonitor) Start(ctx context.Context) {
8397
m.broadcastWorkMode(newMode)
8498
}
8599

86-
ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL)
100+
ticker := time.NewTicker(m.checkInterval)
87101
defer ticker.Stop()
88102

89103
log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode)
@@ -145,7 +159,7 @@ func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, erro
145159

146160
blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
147161
log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64())
148-
if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 {
162+
if blockDiff.Cmp(m.liveModeThreshold) < 0 {
149163
return WorkModeLive, nil
150164
}
151165

0 commit comments

Comments
 (0)