Skip to content

Commit 4139a04

Browse files
committed
add possibility to poll without saving
1 parent 84624f5 commit 4139a04

File tree

1 file changed

+73
-46
lines changed

1 file changed

+73
-46
lines changed

internal/orchestrator/poller.go

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,27 @@ func (p *Poller) Start(ctx context.Context) {
165165
}
166166

167167
func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
168+
blockData, failedResults := p.PollWithoutSaving(ctx, blockNumbers)
169+
if len(blockData) > 0 || len(failedResults) > 0 {
170+
p.StageResults(blockData, failedResults)
171+
}
172+
173+
var highestBlockNumber *big.Int
174+
if len(blockData) > 0 {
175+
highestBlockNumber = blockData[0].Block.Number
176+
for _, block := range blockData {
177+
if block.Block.Number.Cmp(highestBlockNumber) > 0 {
178+
highestBlockNumber = new(big.Int).Set(block.Block.Number)
179+
}
180+
}
181+
}
182+
return highestBlockNumber
183+
}
184+
185+
func (p *Poller) PollWithoutSaving(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, []rpc.GetFullBlockResult) {
168186
if len(blockNumbers) < 1 {
169187
log.Debug().Msg("No blocks to poll, skipping")
170-
return
188+
return nil, nil
171189
}
172190
endBlock := blockNumbers[len(blockNumbers)-1]
173191
if endBlock != nil {
@@ -180,8 +198,60 @@ func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledB
180198

181199
worker := worker.NewWorker(p.rpc)
182200
results := worker.Run(ctx, blockNumbers)
183-
p.handleWorkerResults(results)
184-
return endBlock
201+
blockData, failedResults := p.convertPollResultsToBlockData(results)
202+
return blockData, failedResults
203+
}
204+
205+
func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult) ([]common.BlockData, []rpc.GetFullBlockResult) {
206+
var successfulResults []rpc.GetFullBlockResult
207+
var failedResults []rpc.GetFullBlockResult
208+
209+
for _, result := range results {
210+
if result.Error != nil {
211+
bn := "<unknown>"
212+
if result.BlockNumber != nil {
213+
bn = result.BlockNumber.String()
214+
}
215+
log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", bn)
216+
failedResults = append(failedResults, result)
217+
} else {
218+
successfulResults = append(successfulResults, result)
219+
}
220+
}
221+
222+
blockData := make([]common.BlockData, 0, len(successfulResults))
223+
for _, result := range successfulResults {
224+
blockData = append(blockData, common.BlockData{
225+
Block: result.Data.Block,
226+
Logs: result.Data.Logs,
227+
Transactions: result.Data.Transactions,
228+
Traces: result.Data.Traces,
229+
})
230+
}
231+
return blockData, failedResults
232+
}
233+
234+
func (p *Poller) StageResults(blockData []common.BlockData, failedResults []rpc.GetFullBlockResult) {
235+
startTime := time.Now()
236+
metrics.PolledBatchSize.Set(float64(len(blockData)))
237+
if len(blockData) > 0 {
238+
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
239+
e := fmt.Errorf("error inserting block data: %v", err)
240+
log.Error().Err(e)
241+
for _, result := range blockData {
242+
failedResults = append(failedResults, rpc.GetFullBlockResult{
243+
BlockNumber: result.Block.Number,
244+
Error: e,
245+
})
246+
}
247+
}
248+
}
249+
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
250+
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
251+
252+
if len(failedResults) > 0 {
253+
p.handleBlockFailures(failedResults)
254+
}
185255
}
186256

187257
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
@@ -230,49 +300,6 @@ func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.I
230300
return blockNumbers
231301
}
232302

233-
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
234-
var successfulResults []rpc.GetFullBlockResult
235-
var failedResults []rpc.GetFullBlockResult
236-
237-
for _, result := range results {
238-
if result.Error != nil {
239-
log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", result.BlockNumber.String())
240-
failedResults = append(failedResults, result)
241-
} else {
242-
successfulResults = append(successfulResults, result)
243-
}
244-
}
245-
246-
blockData := make([]common.BlockData, 0, len(successfulResults))
247-
for _, result := range successfulResults {
248-
blockData = append(blockData, common.BlockData{
249-
Block: result.Data.Block,
250-
Logs: result.Data.Logs,
251-
Transactions: result.Data.Transactions,
252-
Traces: result.Data.Traces,
253-
})
254-
}
255-
256-
startTime := time.Now()
257-
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
258-
e := fmt.Errorf("error inserting block data: %v", err)
259-
log.Error().Err(e)
260-
for _, result := range successfulResults {
261-
failedResults = append(failedResults, rpc.GetFullBlockResult{
262-
BlockNumber: result.BlockNumber,
263-
Error: e,
264-
})
265-
}
266-
metrics.PolledBatchSize.Set(float64(len(blockData)))
267-
}
268-
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
269-
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
270-
271-
if len(failedResults) > 0 {
272-
p.handleBlockFailures(failedResults)
273-
}
274-
}
275-
276303
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
277304
var blockFailures []common.BlockFailure
278305
for _, result := range results {

0 commit comments

Comments
 (0)