Skip to content

Commit 4f2ca9a

Browse files
committed
retry fetching block data with decreasing batch size
1 parent 6417dd6 commit 4f2ca9a

File tree

1 file changed

+75
-5
lines changed

1 file changed

+75
-5
lines changed

internal/worker/worker.go

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package worker
22

33
import (
44
"math/big"
5+
"sort"
56
"sync"
67
"time"
78

@@ -22,24 +23,88 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2223
}
2324
}
2425

26+
func (w *Worker) processChunkWithRetry(chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult) {
27+
defer func() {
28+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
29+
}()
30+
31+
// Try with current chunk size
32+
results := w.rpc.GetFullBlocks(chunk)
33+
34+
if len(chunk) == 1 {
35+
// chunk size 1 is the minimum, so we return whatever we get
36+
resultsCh <- results
37+
return
38+
}
39+
40+
// Check for failed blocks
41+
var failedBlocks []*big.Int
42+
var successfulResults []rpc.GetFullBlockResult
43+
44+
for i, result := range results {
45+
if result.Error != nil {
46+
failedBlocks = append(failedBlocks, chunk[i])
47+
} else {
48+
successfulResults = append(successfulResults, result)
49+
}
50+
}
51+
52+
// If we have successful results, send them
53+
if len(successfulResults) > 0 {
54+
resultsCh <- successfulResults
55+
}
56+
57+
// If no blocks failed, we're done
58+
if len(failedBlocks) == 0 {
59+
return
60+
}
61+
62+
// can't split any further, so try one last time
63+
if len(failedBlocks) == 1 {
64+
w.processChunkWithRetry(failedBlocks, resultsCh)
65+
return
66+
}
67+
68+
// Split failed blocks in half and retry
69+
mid := len(failedBlocks) / 2
70+
leftChunk := failedBlocks[:mid]
71+
rightChunk := failedBlocks[mid:]
72+
73+
log.Debug().Msgf("Splitting %d failed blocks into chunks of %d and %d", len(failedBlocks), len(leftChunk), len(rightChunk))
74+
75+
var wg sync.WaitGroup
76+
wg.Add(2)
77+
78+
go func() {
79+
defer wg.Done()
80+
w.processChunkWithRetry(leftChunk, resultsCh)
81+
}()
82+
83+
go func() {
84+
defer wg.Done()
85+
w.processChunkWithRetry(rightChunk, resultsCh)
86+
}()
87+
88+
wg.Wait()
89+
}
90+
2591
func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
2692
blockCount := len(blockNumbers)
2793
chunks := common.SliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)
2894

2995
var wg sync.WaitGroup
30-
resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks))
96+
resultsCh := make(chan []rpc.GetFullBlockResult, blockCount)
3197

3298
log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.GetBlocksPerRequest().Blocks)
99+
33100
for _, chunk := range chunks {
34101
wg.Add(1)
35102
go func(chunk []*big.Int) {
36103
defer wg.Done()
37-
resultsCh <- w.rpc.GetFullBlocks(chunk)
38-
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
39-
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
40-
}
104+
w.processChunkWithRetry(chunk, resultsCh)
41105
}(chunk)
42106
}
107+
43108
go func() {
44109
wg.Wait()
45110
close(resultsCh)
@@ -50,6 +115,11 @@ func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
50115
results = append(results, batchResults...)
51116
}
52117

118+
// Sort results by block number
119+
sort.Slice(results, func(i, j int) bool {
120+
return results[i].BlockNumber.Cmp(results[j].BlockNumber) < 0
121+
})
122+
53123
// track the last fetched block number
54124
if len(results) > 0 {
55125
lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()

0 commit comments

Comments
 (0)