Skip to content

Commit 4450a45

Browse files
committed
add possibility to poll without saving
1 parent 6e15133 commit 4450a45

File tree

1 file changed

+66
-46
lines changed

1 file changed

+66
-46
lines changed

internal/orchestrator/poller.go

Lines changed: 66 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,26 @@ func (p *Poller) Start(ctx context.Context) {
165165
}
166166

167167
func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
168+
blockData, failedResults := p.PollWithoutSaving(ctx, blockNumbers)
169+
if blockData != nil || failedResults != nil {
170+
p.StageResults(blockData, failedResults)
171+
}
172+
highestBlockNumber := new(big.Int)
173+
if len(blockData) > 0 {
174+
highestBlockNumber = blockData[0].Block.Number
175+
for _, block := range blockData {
176+
if block.Block.Number.Cmp(highestBlockNumber) > 0 {
177+
highestBlockNumber = new(big.Int).Set(block.Block.Number)
178+
}
179+
}
180+
}
181+
return highestBlockNumber
182+
}
183+
184+
func (p *Poller) PollWithoutSaving(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, []rpc.GetFullBlockResult) {
168185
if len(blockNumbers) < 1 {
169186
log.Debug().Msg("No blocks to poll, skipping")
170-
return
187+
return nil, nil
171188
}
172189
endBlock := blockNumbers[len(blockNumbers)-1]
173190
if endBlock != nil {
@@ -180,8 +197,54 @@ func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledB
180197

181198
worker := worker.NewWorker(p.rpc)
182199
results := worker.Run(ctx, blockNumbers)
183-
p.handleWorkerResults(results)
184-
return endBlock
200+
blockData, failedResults := p.convertPollResultsToBlockData(results)
201+
return blockData, failedResults
202+
}
203+
204+
func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult) ([]common.BlockData, []rpc.GetFullBlockResult) {
205+
var successfulResults []rpc.GetFullBlockResult
206+
var failedResults []rpc.GetFullBlockResult
207+
208+
for _, result := range results {
209+
if result.Error != nil {
210+
log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", result.BlockNumber.String())
211+
failedResults = append(failedResults, result)
212+
} else {
213+
successfulResults = append(successfulResults, result)
214+
}
215+
}
216+
217+
blockData := make([]common.BlockData, 0, len(successfulResults))
218+
for _, result := range successfulResults {
219+
blockData = append(blockData, common.BlockData{
220+
Block: result.Data.Block,
221+
Logs: result.Data.Logs,
222+
Transactions: result.Data.Transactions,
223+
Traces: result.Data.Traces,
224+
})
225+
}
226+
return blockData, failedResults
227+
}
228+
229+
func (p *Poller) StageResults(blockData []common.BlockData, failedResults []rpc.GetFullBlockResult) {
230+
startTime := time.Now()
231+
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
232+
e := fmt.Errorf("error inserting block data: %v", err)
233+
log.Error().Err(e)
234+
for _, result := range blockData {
235+
failedResults = append(failedResults, rpc.GetFullBlockResult{
236+
BlockNumber: result.Block.Number,
237+
Error: e,
238+
})
239+
}
240+
metrics.PolledBatchSize.Set(float64(len(blockData)))
241+
}
242+
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
243+
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
244+
245+
if len(failedResults) > 0 {
246+
p.handleBlockFailures(failedResults)
247+
}
185248
}
186249

187250
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
@@ -230,49 +293,6 @@ func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.I
230293
return blockNumbers
231294
}
232295

233-
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
234-
var successfulResults []rpc.GetFullBlockResult
235-
var failedResults []rpc.GetFullBlockResult
236-
237-
for _, result := range results {
238-
if result.Error != nil {
239-
log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", result.BlockNumber.String())
240-
failedResults = append(failedResults, result)
241-
} else {
242-
successfulResults = append(successfulResults, result)
243-
}
244-
}
245-
246-
blockData := make([]common.BlockData, 0, len(successfulResults))
247-
for _, result := range successfulResults {
248-
blockData = append(blockData, common.BlockData{
249-
Block: result.Data.Block,
250-
Logs: result.Data.Logs,
251-
Transactions: result.Data.Transactions,
252-
Traces: result.Data.Traces,
253-
})
254-
}
255-
256-
startTime := time.Now()
257-
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
258-
e := fmt.Errorf("error inserting block data: %v", err)
259-
log.Error().Err(e)
260-
for _, result := range successfulResults {
261-
failedResults = append(failedResults, rpc.GetFullBlockResult{
262-
BlockNumber: result.BlockNumber,
263-
Error: e,
264-
})
265-
}
266-
metrics.PolledBatchSize.Set(float64(len(blockData)))
267-
}
268-
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
269-
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
270-
271-
if len(failedResults) > 0 {
272-
p.handleBlockFailures(failedResults)
273-
}
274-
}
275-
276296
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
277297
var blockFailures []common.BlockFailure
278298
for _, result := range results {

0 commit comments

Comments
 (0)