diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index abb0171..1f93597 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -20,15 +20,18 @@ const DEFAULT_BLOCKS_PER_POLL = 10 const DEFAULT_TRIGGER_INTERVAL = 1000 type Poller struct { - rpc rpc.IRPCClient - blocksPerPoll int64 - triggerIntervalMs int64 - storage storage.IStorage - lastPolledBlock *big.Int - pollFromBlock *big.Int - pollUntilBlock *big.Int - parallelPollers int - workModeChan chan WorkMode + rpc rpc.IRPCClient + blocksPerPoll int64 + triggerIntervalMs int64 + storage storage.IStorage + lastPolledBlock *big.Int + lastPolledBlockMutex sync.RWMutex + pollFromBlock *big.Int + pollUntilBlock *big.Int + parallelPollers int + workModeChan chan WorkMode + currentWorkMode WorkMode + workModeMutex sync.RWMutex } type BlockNumberWithError struct { @@ -117,6 +120,15 @@ func (p *Poller) Start(ctx context.Context) { if !ok { return } + + // Do not poll if not in backfill mode + p.workModeMutex.RLock() + if p.currentWorkMode != WorkModeBackfill { + p.workModeMutex.RUnlock() + continue + } + p.workModeMutex.RUnlock() + blockRangeMutex.Lock() blockNumbers, err := p.getNextBlockRange(pollCtx) blockRangeMutex.Unlock() @@ -149,10 +161,26 @@ func (p *Poller) Start(ctx context.Context) { p.shutdown(cancel, tasks, &wg) return case workMode := <-p.workModeChan: - if workMode == WorkModeLive { - log.Info().Msg("Switching to live mode, stopping poller") - p.shutdown(cancel, tasks, &wg) - return + p.workModeMutex.RLock() + currentWorkMode := p.currentWorkMode + p.workModeMutex.RUnlock() + if workMode != currentWorkMode && workMode != "" { + log.Info().Msgf("Poller work mode changing from %s to %s", currentWorkMode, workMode) + p.workModeMutex.Lock() + changedToBackfillFromLive := currentWorkMode == WorkModeLive && workMode == WorkModeBackfill + p.currentWorkMode = workMode + p.workModeMutex.Unlock() + if changedToBackfillFromLive { + lastBlockInMainStorage, err := p.storage.MainStorage.GetMaxBlockNumber(p.rpc.GetChainID()) + if err != nil { + log.Error().Err(err).Msg("Error getting last block in main storage") + } else { + p.lastPolledBlockMutex.Lock() + p.lastPolledBlock = lastBlockInMainStorage + p.lastPolledBlockMutex.Unlock() + log.Debug().Msgf("Switching to backfill mode, updating last polled block to %s", p.lastPolledBlock.String()) + } + } } case <-ticker.C: select {