Skip to content

Commit c8cf165

Browse files
authored
retry fetching block data with decreasing batch size (#204)
### TL;DR Implemented a retry mechanism for failed block fetches with automatic chunk splitting and added result sorting. ### What changed? - Added a new `processChunkWithRetry` function that: - Processes blocks in chunks and detects failed requests - Splits failed blocks into smaller chunks and retries them recursively - Continues splitting until successful or reaching single block chunks - Sends successful results to the results channel as they're processed - Modified the `Run` method to use the new retry mechanism - Added sorting of results by block number before returning - Removed the conditional sleep and made it consistent after each chunk processing ### How to test? 1. Run the application with a mix of valid and invalid block numbers 2. Verify that all valid blocks are fetched successfully, even when part of a batch with invalid blocks 3. Confirm that the results are properly sorted by block number 4. Check logs for "Splitting failed blocks" messages to verify the retry mechanism is working ### Why make this change? This change improves the reliability of block fetching by automatically retrying failed requests with smaller batch sizes. The previous implementation would fail an entire batch if any block in it failed, leading to missing data. The new approach maximizes data retrieval by isolating problematic blocks and ensuring the results are returned in a consistent order regardless of how they were processed.
2 parents ee125fc + 78e2b76 commit c8cf165

File tree

1 file changed

+75
-5
lines changed

1 file changed

+75
-5
lines changed

internal/worker/worker.go

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package worker
22

33
import (
44
"math/big"
5+
"sort"
56
"sync"
67
"time"
78

@@ -22,24 +23,88 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2223
}
2324
}
2425

26+
func (w *Worker) processChunkWithRetry(chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult) {
27+
defer func() {
28+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
29+
}()
30+
31+
// Try with current chunk size
32+
results := w.rpc.GetFullBlocks(chunk)
33+
34+
if len(chunk) == 1 {
35+
// chunk size 1 is the minimum, so we return whatever we get
36+
resultsCh <- results
37+
return
38+
}
39+
40+
// Check for failed blocks
41+
var failedBlocks []*big.Int
42+
var successfulResults []rpc.GetFullBlockResult
43+
44+
for i, result := range results {
45+
if result.Error != nil {
46+
failedBlocks = append(failedBlocks, chunk[i])
47+
} else {
48+
successfulResults = append(successfulResults, result)
49+
}
50+
}
51+
52+
// If we have successful results, send them
53+
if len(successfulResults) > 0 {
54+
resultsCh <- successfulResults
55+
}
56+
57+
// If no blocks failed, we're done
58+
if len(failedBlocks) == 0 {
59+
return
60+
}
61+
62+
// can't split any further, so try one last time
63+
if len(failedBlocks) == 1 {
64+
w.processChunkWithRetry(failedBlocks, resultsCh)
65+
return
66+
}
67+
68+
// Split failed blocks in half and retry
69+
mid := len(failedBlocks) / 2
70+
leftChunk := failedBlocks[:mid]
71+
rightChunk := failedBlocks[mid:]
72+
73+
log.Debug().Msgf("Splitting %d failed blocks into chunks of %d and %d", len(failedBlocks), len(leftChunk), len(rightChunk))
74+
75+
var wg sync.WaitGroup
76+
wg.Add(2)
77+
78+
go func() {
79+
defer wg.Done()
80+
w.processChunkWithRetry(leftChunk, resultsCh)
81+
}()
82+
83+
go func() {
84+
defer wg.Done()
85+
w.processChunkWithRetry(rightChunk, resultsCh)
86+
}()
87+
88+
wg.Wait()
89+
}
90+
2591
func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
2692
blockCount := len(blockNumbers)
2793
chunks := common.SliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)
2894

2995
var wg sync.WaitGroup
30-
resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks))
96+
resultsCh := make(chan []rpc.GetFullBlockResult, blockCount)
3197

3298
log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.GetBlocksPerRequest().Blocks)
99+
33100
for _, chunk := range chunks {
34101
wg.Add(1)
35102
go func(chunk []*big.Int) {
36103
defer wg.Done()
37-
resultsCh <- w.rpc.GetFullBlocks(chunk)
38-
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
39-
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
40-
}
104+
w.processChunkWithRetry(chunk, resultsCh)
41105
}(chunk)
42106
}
107+
43108
go func() {
44109
wg.Wait()
45110
close(resultsCh)
@@ -50,6 +115,11 @@ func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
50115
results = append(results, batchResults...)
51116
}
52117

118+
// Sort results by block number
119+
sort.Slice(results, func(i, j int) bool {
120+
return results[i].BlockNumber.Cmp(results[j].BlockNumber) < 0
121+
})
122+
53123
// track the last fetched block number
54124
if len(results) > 0 {
55125
lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()

0 commit comments

Comments
 (0)