Skip to content

Commit 205a0b8

Browse files
committed
pass context to rpc and worker
1 parent 4f2ca9a commit 205a0b8

File tree

11 files changed

+160
-139
lines changed

11 files changed

+160
-139
lines changed

internal/orchestrator/chain_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (ct *ChainTracker) Start(ctx context.Context) {
3535
log.Info().Msg("Chain tracker shutting down")
3636
return
3737
case <-ticker.C:
38-
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
38+
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber(ctx)
3939
if err != nil {
4040
log.Error().Err(err).Msg("Error getting latest block number")
4141
continue

internal/orchestrator/committer.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *Committer) Start(ctx context.Context) {
6363
return
6464
default:
6565
time.Sleep(interval)
66-
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
66+
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
6767
if err != nil {
6868
log.Error().Err(err).Msg("Error getting block data to commit")
6969
continue
@@ -72,7 +72,7 @@ func (c *Committer) Start(ctx context.Context) {
7272
log.Debug().Msg("No block data to commit")
7373
continue
7474
}
75-
if err := c.commit(blockDataToCommit); err != nil {
75+
if err := c.commit(ctx, blockDataToCommit); err != nil {
7676
log.Error().Err(err).Msg("Error committing blocks")
7777
}
7878
}
@@ -108,7 +108,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
108108
return blockNumbers, nil
109109
}
110110

111-
func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
111+
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
112112
blocksToCommit, err := c.getBlockNumbersToCommit()
113113
if err != nil {
114114
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
@@ -123,7 +123,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
123123
}
124124
if len(blocksData) == 0 {
125125
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
126-
c.handleMissingStagingData(blocksToCommit)
126+
c.handleMissingStagingData(ctx, blocksToCommit)
127127
return nil, nil
128128
}
129129

@@ -133,7 +133,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
133133
})
134134

135135
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
136-
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
136+
return nil, c.handleGap(ctx, blocksToCommit[0], blocksData[0].Block)
137137
}
138138

139139
var sequentialBlockData []common.BlockData
@@ -161,7 +161,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
161161
return sequentialBlockData, nil
162162
}
163163

164-
func (c *Committer) commit(blockData []common.BlockData) error {
164+
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
165165
blockNumbers := make([]*big.Int, len(blockData))
166166
for i, block := range blockData {
167167
blockNumbers[i] = block.Block.Number
@@ -199,7 +199,7 @@ func (c *Committer) commit(blockData []common.BlockData) error {
199199
return nil
200200
}
201201

202-
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
202+
func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
203203
// increment the gap counter in prometheus
204204
metrics.GapCounter.Inc()
205205
// record the first missed block number in prometheus
@@ -220,11 +220,11 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
220220
}
221221

222222
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
223-
poller.Poll(missingBlockNumbers)
223+
poller.Poll(ctx, missingBlockNumbers)
224224
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
225225
}
226226

227-
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
227+
func (c *Committer) handleMissingStagingData(ctx context.Context, blocksToCommit []*big.Int) {
228228
// Checks if there are any blocks in staging after the current range end
229229
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
230230
if err != nil {
@@ -242,6 +242,6 @@ func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
242242
if len(blocksToCommit) > int(poller.blocksPerPoll) {
243243
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
244244
}
245-
poller.Poll(blocksToPoll)
245+
poller.Poll(ctx, blocksToPoll)
246246
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
247247
}

internal/orchestrator/committer_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
254254
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
255255
}).Return(blockData, nil)
256256

257-
result, err := committer.getSequentialBlockDataToCommit()
257+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
258258

259259
assert.NoError(t, err)
260260
assert.NotNil(t, result)
@@ -290,7 +290,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
290290
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
291291
}).Return(blockData, nil)
292292

293-
result, err := committer.getSequentialBlockDataToCommit()
293+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
294294

295295
assert.NoError(t, err)
296296
assert.NotNil(t, result)
@@ -320,7 +320,7 @@ func TestCommit(t *testing.T) {
320320
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
321321
mockStagingStorage.EXPECT().DeleteStagingData(blockData).Return(nil)
322322

323-
err := committer.commit(blockData)
323+
err := committer.commit(context.Background(), blockData)
324324

325325
assert.NoError(t, err)
326326
}
@@ -343,7 +343,7 @@ func TestHandleGap(t *testing.T) {
343343
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
344344
Blocks: 5,
345345
})
346-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
346+
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
347347
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
348348
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
349349
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
@@ -352,7 +352,7 @@ func TestHandleGap(t *testing.T) {
352352
})
353353
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
354354

355-
err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)
355+
err := committer.handleGap(context.Background(), expectedStartBlockNumber, actualFirstBlock)
356356

357357
assert.Error(t, err)
358358
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")
@@ -463,7 +463,7 @@ func TestHandleMissingStagingData(t *testing.T) {
463463
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
464464
Blocks: 100,
465465
})
466-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
466+
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
467467
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
468468
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
469469
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
@@ -482,7 +482,7 @@ func TestHandleMissingStagingData(t *testing.T) {
482482
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
483483
}).Return(blockData, nil)
484484

485-
result, err := committer.getSequentialBlockDataToCommit()
485+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
486486

487487
assert.NoError(t, err)
488488
assert.Nil(t, result)
@@ -509,7 +509,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
509509
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
510510
Blocks: 3,
511511
})
512-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
512+
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
513513
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
514514
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
515515
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
@@ -526,7 +526,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
526526
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
527527
}).Return(blockData, nil)
528528

529-
result, err := committer.getSequentialBlockDataToCommit()
529+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
530530

531531
assert.NoError(t, err)
532532
assert.Nil(t, result)

internal/orchestrator/failure_recoverer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (fr *FailureRecoverer) Start(ctx context.Context) {
7575
// Trigger worker for recovery
7676
log.Debug().Msgf("Triggering Failure Recoverer for blocks: %v", blocksToTrigger)
7777
worker := worker.NewWorker(fr.rpc)
78-
results := worker.Run(blocksToTrigger)
78+
results := worker.Run(ctx, blocksToTrigger)
7979
fr.handleWorkerResults(blockFailures, results)
8080

8181
// Track recovery activity

internal/orchestrator/poller.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (p *Poller) Start(ctx context.Context) {
103103
return
104104
}
105105
blockRangeMutex.Lock()
106-
blockNumbers, err := p.getNextBlockRange()
106+
blockNumbers, err := p.getNextBlockRange(pollCtx)
107107
blockRangeMutex.Unlock()
108108

109109
if pollCtx.Err() != nil {
@@ -117,7 +117,7 @@ func (p *Poller) Start(ctx context.Context) {
117117
continue
118118
}
119119

120-
lastPolledBlock := p.Poll(blockNumbers)
120+
lastPolledBlock := p.Poll(pollCtx, blockNumbers)
121121
if p.reachedPollLimit(lastPolledBlock) {
122122
log.Debug().Msg("Reached poll limit, exiting poller")
123123
cancel()
@@ -146,7 +146,7 @@ func (p *Poller) Start(ctx context.Context) {
146146
}
147147
}
148148

149-
func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
149+
func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
150150
if len(blockNumbers) < 1 {
151151
log.Debug().Msg("No blocks to poll, skipping")
152152
return
@@ -161,7 +161,7 @@ func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
161161
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
162162

163163
worker := worker.NewWorker(p.rpc)
164-
results := worker.Run(blockNumbers)
164+
results := worker.Run(ctx, blockNumbers)
165165
p.handleWorkerResults(results)
166166
return endBlock
167167
}
@@ -170,8 +170,8 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
170170
return blockNumber == nil || (p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0)
171171
}
172172

173-
func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
174-
latestBlock, err := p.rpc.GetLatestBlockNumber()
173+
func (p *Poller) getNextBlockRange(ctx context.Context) ([]*big.Int, error) {
174+
latestBlock, err := p.rpc.GetLatestBlockNumber(ctx)
175175
if err != nil {
176176
return nil, err
177177
}

internal/orchestrator/reorg_handler.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
8383
rh.publisher.Close()
8484
return
8585
case <-ticker.C:
86-
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
86+
mostRecentBlockChecked, err := rh.RunFromBlock(ctx, rh.lastCheckedBlock)
8787
if err != nil {
8888
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
8989
continue
@@ -99,7 +99,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
9999
}
100100
}
101101

102-
func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
102+
func (rh *ReorgHandler) RunFromBlock(ctx context.Context, latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
103103
fromBlock, toBlock, err := rh.getReorgCheckRange(latestCheckedBlock)
104104
if err != nil {
105105
return nil, err
@@ -130,7 +130,7 @@ func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBl
130130

131131
metrics.ReorgCounter.Inc()
132132
reorgedBlockNumbers := make([]*big.Int, 0)
133-
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
133+
err = rh.findReorgedBlockNumbers(ctx, blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
134134
if err != nil {
135135
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
136136
}
@@ -140,7 +140,7 @@ func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBl
140140
return mostRecentBlockHeader.Number, nil
141141
}
142142

143-
err = rh.handleReorg(reorgedBlockNumbers)
143+
err = rh.handleReorg(ctx, reorgedBlockNumbers)
144144
if err != nil {
145145
return nil, fmt.Errorf("error while handling reorg: %w", err)
146146
}
@@ -190,8 +190,8 @@ func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (
190190
return -1, nil
191191
}
192192

193-
func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
194-
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
193+
func (rh *ReorgHandler) findReorgedBlockNumbers(ctx context.Context, blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
194+
newBlocksByNumber, err := rh.getNewBlocksByNumber(ctx, blockHeadersDescending)
195195
if err != nil {
196196
return err
197197
}
@@ -219,12 +219,12 @@ func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.
219219
sort.Slice(nextHeadersBatch, func(i, j int) bool {
220220
return nextHeadersBatch[i].Number.Cmp(nextHeadersBatch[j].Number) > 0
221221
})
222-
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
222+
return rh.findReorgedBlockNumbers(ctx, nextHeadersBatch, reorgedBlockNumbers)
223223
}
224224
return nil
225225
}
226226

227-
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
227+
func (rh *ReorgHandler) getNewBlocksByNumber(ctx context.Context, blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
228228
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
229229
for _, header := range blockHeaders {
230230
blockNumbers = append(blockNumbers, header.Number)
@@ -241,7 +241,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
241241
wg.Add(1)
242242
go func(chunk []*big.Int) {
243243
defer wg.Done()
244-
resultsCh <- rh.rpc.GetBlocks(chunk)
244+
resultsCh <- rh.rpc.GetBlocks(ctx, chunk)
245245
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
246246
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
247247
}
@@ -264,9 +264,9 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
264264
return fetchedBlocksByNumber, nil
265265
}
266266

267-
func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
267+
func (rh *ReorgHandler) handleReorg(ctx context.Context, reorgedBlockNumbers []*big.Int) error {
268268
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
269-
results := rh.worker.Run(reorgedBlockNumbers)
269+
results := rh.worker.Run(ctx, reorgedBlockNumbers)
270270
data := make([]common.BlockData, 0, len(results))
271271
blocksToDelete := make([]*big.Int, 0, len(results))
272272
for _, result := range results {

0 commit comments

Comments
 (0)