Skip to content

retry fetching block data with decreasing batch size #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 75 additions & 5 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"math/big"
"sort"
"sync"
"time"

Expand All @@ -22,24 +23,88 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
}
}

func (w *Worker) processChunkWithRetry(chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult) {
defer func() {
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
}()

// Try with current chunk size
results := w.rpc.GetFullBlocks(chunk)

if len(chunk) == 1 {
// chunk size 1 is the minimum, so we return whatever we get
resultsCh <- results
return
}

// Check for failed blocks
var failedBlocks []*big.Int
var successfulResults []rpc.GetFullBlockResult

for i, result := range results {
if result.Error != nil {
failedBlocks = append(failedBlocks, chunk[i])
} else {
successfulResults = append(successfulResults, result)
}
}

// If we have successful results, send them
if len(successfulResults) > 0 {
resultsCh <- successfulResults
}

// If no blocks failed, we're done
if len(failedBlocks) == 0 {
return
}

// can't split any further, so try one last time
if len(failedBlocks) == 1 {
w.processChunkWithRetry(failedBlocks, resultsCh)
return
}

// Split failed blocks in half and retry
mid := len(failedBlocks) / 2
leftChunk := failedBlocks[:mid]
rightChunk := failedBlocks[mid:]

log.Debug().Msgf("Splitting %d failed blocks into chunks of %d and %d", len(failedBlocks), len(leftChunk), len(rightChunk))

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
w.processChunkWithRetry(leftChunk, resultsCh)
}()

go func() {
defer wg.Done()
w.processChunkWithRetry(rightChunk, resultsCh)
}()

wg.Wait()
}
Comment on lines +68 to +89
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential goroutine explosion when many blocks fail

Every failure split spawns two new goroutines and waits (wg.Wait()).
For k consecutively failing blocks this creates 2k-1 goroutines (binary tree), which can reach thousands under a bad RPC outage and may:

  • Exceed the Go scheduler’s default thread limits.
  • Create unnecessary network pressure once the RPC endpoint comes back.

Recommend introducing a simple concurrency limiter (token/semaphore) or using a worker-pool instead of spawning unbounded goroutines, e.g.:

 type Worker struct {
     rpc rpc.IRPCClient
+    sem  chan struct{} // capacity == max concurrent RPCs
 }
 
 func (w *Worker) acquire() { w.sem <- struct{}{} }
 func (w *Worker) release() { <-w.sem }

 // before launching a goroutine
-wg.Add(2)
-go func() { … }()
+wg.Add(2)
+go func() {
+    w.acquire()
+    defer w.release()
+
+}()

This keeps memory & CPU predictable even under sustained failure scenarios.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/worker/worker.go around lines 68 to 89, the current approach spawns
two new goroutines for each failure split without limit, causing exponential
growth in goroutines under repeated failures. To fix this, implement a
concurrency limiter such as a semaphore or token bucket to restrict the number
of concurrent goroutines processing chunks. Alternatively, refactor to use a
fixed-size worker pool that processes failed blocks from a queue, ensuring
controlled concurrency and preventing resource exhaustion during sustained
failure scenarios.


func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
blockCount := len(blockNumbers)
chunks := common.SliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)

var wg sync.WaitGroup
resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks))
resultsCh := make(chan []rpc.GetFullBlockResult, blockCount)

log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.GetBlocksPerRequest().Blocks)

for _, chunk := range chunks {
wg.Add(1)
go func(chunk []*big.Int) {
defer wg.Done()
resultsCh <- w.rpc.GetFullBlocks(chunk)
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
}
w.processChunkWithRetry(chunk, resultsCh)
}(chunk)
}

go func() {
wg.Wait()
close(resultsCh)
Expand All @@ -50,6 +115,11 @@ func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
results = append(results, batchResults...)
}

// Sort results by block number
sort.Slice(results, func(i, j int) bool {
return results[i].BlockNumber.Cmp(results[j].BlockNumber) < 0
})

// track the last fetched block number
if len(results) > 0 {
lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()
Expand Down