Skip to content

Commit 918459c

Browse files
authored
add possibility to poll without saving (#225)
### TL;DR Refactored the Poller to separate block polling from data storage operations. ### What changed? - Split the `Poll` method into two separate methods: - `PollWithoutSaving`: Handles only the polling of blocks without storing data - `StageResults`: Handles the storage of polled block data - Moved the worker results handling logic into these new methods - Added functionality to track and return the highest block number polled - Created a helper method `convertPollResultsToBlockData` to transform poll results into block data ### How to test? 1. Run the existing test suite to ensure all functionality works as expected 2. Test the new `PollWithoutSaving` method by calling it directly and verifying it returns the correct block data without saving 3. Test the `StageResults` method by passing block data and verifying it's properly stored 4. Verify that the `Poll` method still returns the highest block number correctly ### Why make this change? This refactoring improves the separation of concerns in the Poller, making it more flexible by allowing clients to poll blocks without immediately saving the data. This enables more advanced use cases where data processing might be needed before storage, and makes the code more maintainable by breaking down the monolithic polling function into smaller, more focused components. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Enhanced the polling process to improve data handling and error reporting, resulting in more reliable background operations without affecting user-facing features. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 84624f5 + 4139a04 commit 918459c

File tree

1 file changed

+73
-46
lines changed

1 file changed

+73
-46
lines changed

internal/orchestrator/poller.go

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,27 @@ func (p *Poller) Start(ctx context.Context) {
165165
}
166166

167167
func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
168+
blockData, failedResults := p.PollWithoutSaving(ctx, blockNumbers)
169+
if len(blockData) > 0 || len(failedResults) > 0 {
170+
p.StageResults(blockData, failedResults)
171+
}
172+
173+
var highestBlockNumber *big.Int
174+
if len(blockData) > 0 {
175+
highestBlockNumber = blockData[0].Block.Number
176+
for _, block := range blockData {
177+
if block.Block.Number.Cmp(highestBlockNumber) > 0 {
178+
highestBlockNumber = new(big.Int).Set(block.Block.Number)
179+
}
180+
}
181+
}
182+
return highestBlockNumber
183+
}
184+
185+
func (p *Poller) PollWithoutSaving(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, []rpc.GetFullBlockResult) {
168186
if len(blockNumbers) < 1 {
169187
log.Debug().Msg("No blocks to poll, skipping")
170-
return
188+
return nil, nil
171189
}
172190
endBlock := blockNumbers[len(blockNumbers)-1]
173191
if endBlock != nil {
@@ -180,8 +198,60 @@ func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledB
180198

181199
worker := worker.NewWorker(p.rpc)
182200
results := worker.Run(ctx, blockNumbers)
183-
p.handleWorkerResults(results)
184-
return endBlock
201+
blockData, failedResults := p.convertPollResultsToBlockData(results)
202+
return blockData, failedResults
203+
}
204+
205+
func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult) ([]common.BlockData, []rpc.GetFullBlockResult) {
206+
var successfulResults []rpc.GetFullBlockResult
207+
var failedResults []rpc.GetFullBlockResult
208+
209+
for _, result := range results {
210+
if result.Error != nil {
211+
bn := "<unknown>"
212+
if result.BlockNumber != nil {
213+
bn = result.BlockNumber.String()
214+
}
215+
log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", bn)
216+
failedResults = append(failedResults, result)
217+
} else {
218+
successfulResults = append(successfulResults, result)
219+
}
220+
}
221+
222+
blockData := make([]common.BlockData, 0, len(successfulResults))
223+
for _, result := range successfulResults {
224+
blockData = append(blockData, common.BlockData{
225+
Block: result.Data.Block,
226+
Logs: result.Data.Logs,
227+
Transactions: result.Data.Transactions,
228+
Traces: result.Data.Traces,
229+
})
230+
}
231+
return blockData, failedResults
232+
}
233+
234+
func (p *Poller) StageResults(blockData []common.BlockData, failedResults []rpc.GetFullBlockResult) {
235+
startTime := time.Now()
236+
metrics.PolledBatchSize.Set(float64(len(blockData)))
237+
if len(blockData) > 0 {
238+
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
239+
e := fmt.Errorf("error inserting block data: %v", err)
240+
log.Error().Err(e)
241+
for _, result := range blockData {
242+
failedResults = append(failedResults, rpc.GetFullBlockResult{
243+
BlockNumber: result.Block.Number,
244+
Error: e,
245+
})
246+
}
247+
}
248+
}
249+
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
250+
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
251+
252+
if len(failedResults) > 0 {
253+
p.handleBlockFailures(failedResults)
254+
}
185255
}
186256

187257
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
@@ -230,49 +300,6 @@ func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.I
230300
return blockNumbers
231301
}
232302

233-
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
234-
var successfulResults []rpc.GetFullBlockResult
235-
var failedResults []rpc.GetFullBlockResult
236-
237-
for _, result := range results {
238-
if result.Error != nil {
239-
log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", result.BlockNumber.String())
240-
failedResults = append(failedResults, result)
241-
} else {
242-
successfulResults = append(successfulResults, result)
243-
}
244-
}
245-
246-
blockData := make([]common.BlockData, 0, len(successfulResults))
247-
for _, result := range successfulResults {
248-
blockData = append(blockData, common.BlockData{
249-
Block: result.Data.Block,
250-
Logs: result.Data.Logs,
251-
Transactions: result.Data.Transactions,
252-
Traces: result.Data.Traces,
253-
})
254-
}
255-
256-
startTime := time.Now()
257-
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
258-
e := fmt.Errorf("error inserting block data: %v", err)
259-
log.Error().Err(e)
260-
for _, result := range successfulResults {
261-
failedResults = append(failedResults, rpc.GetFullBlockResult{
262-
BlockNumber: result.BlockNumber,
263-
Error: e,
264-
})
265-
}
266-
metrics.PolledBatchSize.Set(float64(len(blockData)))
267-
}
268-
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
269-
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
270-
271-
if len(failedResults) > 0 {
272-
p.handleBlockFailures(failedResults)
273-
}
274-
}
275-
276303
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
277304
var blockFailures []common.BlockFailure
278305
for _, result := range results {

0 commit comments

Comments
 (0)