Skip to content

Commit 5628b1f

Browse files
committed
setup workmode determination and update
1 parent 15d07b7 commit 5628b1f

File tree

5 files changed

+236
-12
lines changed

5 files changed

+236
-12
lines changed

internal/metrics/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,11 @@ var (
144144
Buckets: prometheus.DefBuckets,
145145
})
146146
)
147+
148+
// Work Mode Metrics
149+
var (
150+
CurrentWorkMode = promauto.NewGauge(prometheus.GaugeOpts{
151+
Name: "current_work_mode",
152+
Help: "The current work mode (0 = backfill, 1 = live)",
153+
})
154+
)

internal/orchestrator/committer.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,19 @@ type Committer struct {
2727
rpc rpc.IRPCClient
2828
lastCommittedBlock *big.Int
2929
publisher *publisher.Publisher
30+
workMode WorkMode
31+
workModeChan chan WorkMode
3032
}
3133

32-
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
34+
type CommitterOption func(*Committer)
35+
36+
func WithCommitterWorkModeChan(ch chan WorkMode) CommitterOption {
37+
return func(c *Committer) {
38+
c.workModeChan = ch
39+
}
40+
}
41+
42+
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...CommitterOption) *Committer {
3343
triggerInterval := config.Cfg.Committer.Interval
3444
if triggerInterval == 0 {
3545
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
@@ -40,15 +50,22 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4050
}
4151

4252
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
43-
return &Committer{
53+
committer := &Committer{
4454
triggerIntervalMs: triggerInterval,
4555
blocksPerCommit: blocksPerCommit,
4656
storage: storage,
4757
commitFromBlock: commitFromBlock,
4858
rpc: rpc,
4959
lastCommittedBlock: commitFromBlock,
5060
publisher: publisher.GetInstance(),
61+
workMode: "",
62+
}
63+
64+
for _, opt := range opts {
65+
opt(committer)
5166
}
67+
68+
return committer
5269
}
5370

5471
func (c *Committer) Start(ctx context.Context) {
@@ -61,8 +78,17 @@ func (c *Committer) Start(ctx context.Context) {
6178
log.Info().Msg("Committer shutting down")
6279
c.publisher.Close()
6380
return
81+
case workMode := <-c.workModeChan:
82+
if workMode != c.workMode && workMode != "" {
83+
log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
84+
c.workMode = workMode
85+
}
6486
default:
6587
time.Sleep(interval)
88+
if c.workMode == "" {
89+
log.Debug().Msg("Committer work mode not set, skipping commit")
90+
continue
91+
}
6692
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
6793
if err != nil {
6894
log.Error().Err(err).Msg("Error getting block data to commit")

internal/orchestrator/orchestrator.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,17 @@ func (o *Orchestrator) Start() {
5454
o.cancel()
5555
}()
5656

57+
// Create the work mode monitor first
58+
workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage)
59+
5760
if o.pollerEnabled {
5861
wg.Add(1)
5962
go func() {
6063
defer wg.Done()
61-
poller := NewPoller(o.rpc, o.storage)
64+
pollerWorkModeChan := make(chan WorkMode, 1)
65+
workModeMonitor.RegisterChannel(pollerWorkModeChan)
66+
defer workModeMonitor.UnregisterChannel(pollerWorkModeChan)
67+
poller := NewPoller(o.rpc, o.storage, WithPollerWorkModeChan(pollerWorkModeChan))
6268
poller.Start(ctx)
6369
}()
6470
}
@@ -76,7 +82,10 @@ func (o *Orchestrator) Start() {
7682
wg.Add(1)
7783
go func() {
7884
defer wg.Done()
79-
committer := NewCommitter(o.rpc, o.storage)
85+
committerWorkModeChan := make(chan WorkMode, 1)
86+
workModeMonitor.RegisterChannel(committerWorkModeChan)
87+
defer workModeMonitor.UnregisterChannel(committerWorkModeChan)
88+
committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan))
8089
committer.Start(ctx)
8190
}()
8291
}
@@ -90,6 +99,12 @@ func (o *Orchestrator) Start() {
9099
}()
91100
}
92101

102+
wg.Add(1)
103+
go func() {
104+
defer wg.Done()
105+
workModeMonitor.Start(ctx)
106+
}()
107+
93108
// The chain tracker is always running
94109
wg.Add(1)
95110
go func() {

internal/orchestrator/poller.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,23 @@ type Poller struct {
2828
pollFromBlock *big.Int
2929
pollUntilBlock *big.Int
3030
parallelPollers int
31+
workModeChan chan WorkMode
3132
}
3233

3334
type BlockNumberWithError struct {
3435
BlockNumber *big.Int
3536
Error error
3637
}
3738

38-
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
39+
type PollerOption func(*Poller)
40+
41+
func WithPollerWorkModeChan(ch chan WorkMode) PollerOption {
42+
return func(p *Poller) {
43+
p.workModeChan = ch
44+
}
45+
}
46+
47+
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
3948
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
4049
if blocksPerPoll == 0 {
4150
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
@@ -44,19 +53,25 @@ func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
4453
if triggerInterval == 0 {
4554
triggerInterval = DEFAULT_TRIGGER_INTERVAL
4655
}
47-
return &Poller{
56+
poller := &Poller{
4857
rpc: rpc,
4958
triggerIntervalMs: int64(triggerInterval),
5059
blocksPerPoll: int64(blocksPerPoll),
5160
storage: storage,
5261
parallelPollers: config.Cfg.Poller.ParallelPollers,
5362
}
63+
64+
for _, opt := range opts {
65+
opt(poller)
66+
}
67+
68+
return poller
5469
}
5570

5671
var ErrNoNewBlocks = fmt.Errorf("no new blocks to poll")
5772

58-
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
59-
poller := NewBoundlessPoller(rpc, storage)
73+
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
74+
poller := NewBoundlessPoller(rpc, storage, opts...)
6075
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
6176
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
6277
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
@@ -131,11 +146,14 @@ func (p *Poller) Start(ctx context.Context) {
131146
for {
132147
select {
133148
case <-ctx.Done():
134-
cancel()
135-
close(tasks)
136-
wg.Wait()
137-
log.Info().Msg("Poller shutting down")
149+
p.shutdown(cancel, tasks, &wg)
138150
return
151+
case workMode := <-p.workModeChan:
152+
if workMode == WorkModeLive {
153+
log.Info().Msg("Switching to live mode, stopping poller")
154+
p.shutdown(cancel, tasks, &wg)
155+
return
156+
}
139157
case <-ticker.C:
140158
select {
141159
case tasks <- struct{}{}:
@@ -274,3 +292,10 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
274292
log.Error().Err(err).Msg("Error saving block failures")
275293
}
276294
}
295+
296+
func (p *Poller) shutdown(cancel context.CancelFunc, tasks chan struct{}, wg *sync.WaitGroup) {
297+
cancel()
298+
close(tasks)
299+
wg.Wait()
300+
log.Info().Msg("Poller shutting down")
301+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package orchestrator
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"sync"
7+
"time"
8+
9+
"github.com/rs/zerolog/log"
10+
"github.com/thirdweb-dev/indexer/internal/metrics"
11+
"github.com/thirdweb-dev/indexer/internal/rpc"
12+
"github.com/thirdweb-dev/indexer/internal/storage"
13+
)
14+
15+
type WorkMode string
16+
17+
const (
18+
WORK_MODE_CHECK_INTERVAL = 10 * time.Minute
19+
WORK_MODE_BACKFILL_THRESHOLD = 500
20+
WorkModeLive WorkMode = "live"
21+
WorkModeBackfill WorkMode = "backfill"
22+
)
23+
24+
type WorkModeMonitor struct {
25+
rpc rpc.IRPCClient
26+
storage storage.IStorage
27+
workModeChannels map[chan WorkMode]struct{}
28+
channelsMutex sync.RWMutex
29+
currentMode WorkMode
30+
}
31+
32+
func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor {
33+
return &WorkModeMonitor{
34+
rpc: rpc,
35+
storage: storage,
36+
workModeChannels: make(map[chan WorkMode]struct{}),
37+
currentMode: "",
38+
}
39+
}
40+
41+
// RegisterChannel adds a new channel to receive work mode updates
42+
func (m *WorkModeMonitor) RegisterChannel(ch chan WorkMode) {
43+
m.channelsMutex.Lock()
44+
defer m.channelsMutex.Unlock()
45+
46+
m.workModeChannels[ch] = struct{}{}
47+
// Send current mode to the new channel only if it's not empty
48+
if m.currentMode != "" {
49+
select {
50+
case ch <- m.currentMode:
51+
log.Debug().Msg("Initial work mode sent to new channel")
52+
default:
53+
log.Warn().Msg("Failed to send initial work mode to new channel - channel full")
54+
}
55+
}
56+
}
57+
58+
// UnregisterChannel removes a channel from receiving work mode updates
59+
func (m *WorkModeMonitor) UnregisterChannel(ch chan WorkMode) {
60+
m.channelsMutex.Lock()
61+
defer m.channelsMutex.Unlock()
62+
63+
delete(m.workModeChannels, ch)
64+
}
65+
66+
func (m *WorkModeMonitor) updateWorkModeMetric(mode WorkMode) {
67+
var value float64
68+
if mode == WorkModeLive {
69+
value = 1
70+
}
71+
metrics.CurrentWorkMode.Set(value)
72+
}
73+
74+
func (m *WorkModeMonitor) Start(ctx context.Context) {
75+
// Perform immediate check
76+
newMode, err := m.determineWorkMode(ctx)
77+
if err != nil {
78+
log.Error().Err(err).Msg("Error checking work mode during startup")
79+
} else if newMode != m.currentMode {
80+
log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode)
81+
m.currentMode = newMode
82+
m.updateWorkModeMetric(newMode)
83+
m.broadcastWorkMode(newMode)
84+
}
85+
86+
ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL)
87+
defer ticker.Stop()
88+
89+
log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode)
90+
91+
for {
92+
select {
93+
case <-ctx.Done():
94+
log.Info().Msg("Work mode monitor shutting down")
95+
return
96+
case <-ticker.C:
97+
newMode, err := m.determineWorkMode(ctx)
98+
if err != nil {
99+
log.Error().Err(err).Msg("Error checking work mode")
100+
continue
101+
}
102+
103+
if newMode != m.currentMode {
104+
log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode)
105+
m.currentMode = newMode
106+
m.updateWorkModeMetric(newMode)
107+
m.broadcastWorkMode(newMode)
108+
}
109+
}
110+
}
111+
}
112+
113+
func (m *WorkModeMonitor) broadcastWorkMode(mode WorkMode) {
114+
m.channelsMutex.RLock()
115+
defer m.channelsMutex.RUnlock()
116+
117+
for ch := range m.workModeChannels {
118+
select {
119+
case ch <- mode:
120+
log.Debug().Msg("Work mode change notification sent")
121+
default:
122+
log.Warn().Msg("Work mode change notification dropped - channel full")
123+
}
124+
}
125+
}
126+
127+
func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, error) {
128+
lastCommittedBlock, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpc.GetChainID())
129+
if err != nil {
130+
return "", err
131+
}
132+
133+
if lastCommittedBlock.Sign() == 0 {
134+
log.Debug().Msg("No blocks committed yet, using backfill mode")
135+
return WorkModeBackfill, nil
136+
}
137+
138+
latestBlock, err := m.rpc.GetLatestBlockNumber(ctx)
139+
if err != nil {
140+
return "", err
141+
}
142+
143+
blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
144+
log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64())
145+
if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 {
146+
return WorkModeLive, nil
147+
}
148+
149+
return WorkModeBackfill, nil
150+
}

0 commit comments

Comments
 (0)