Skip to content

Commit 2e4cbfa

Browse files
committed
log metrics around processing durations
1 parent 18cd55e commit 2e4cbfa

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

internal/orchestrator/committer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ func (c *Committer) Start(ctx context.Context) {
8080
}
8181

8282
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
83+
startTime := time.Now()
84+
defer func() {
85+
log.Debug().Str("metric", "get_block_numbers_to_commit_duration").Msgf("getBlockNumbersToCommit duration: %f", time.Since(startTime).Seconds())
86+
}()
87+
8388
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
8489
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
8590
if err != nil {
@@ -117,7 +122,10 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
117122
return nil, nil
118123
}
119124

125+
startTime := time.Now()
120126
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
127+
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
128+
121129
if err != nil {
122130
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
123131
}
@@ -168,20 +176,26 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
168176
}
169177
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
170178

179+
mainStorageStart := time.Now()
171180
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
172181
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
173182
return fmt.Errorf("error saving data to main storage: %v", err)
174183
}
184+
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
175185

186+
publishStart := time.Now()
176187
go func() {
177188
if err := c.publisher.PublishBlockData(blockData); err != nil {
178189
log.Error().Err(err).Msg("Failed to publish block data to kafka")
179190
}
191+
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
180192
}()
181193

194+
stagingDeleteStart := time.Now()
182195
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
183196
return fmt.Errorf("error deleting data from staging storage: %v", err)
184197
}
198+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
185199

186200
// Find highest block number from committed blocks
187201
highestBlock := blockData[0].Block

internal/orchestrator/poller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
234234
Traces: result.Data.Traces,
235235
})
236236
}
237+
238+
startTime := time.Now()
237239
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
238240
e := fmt.Errorf("error inserting block data: %v", err)
239241
log.Error().Err(e)
@@ -245,6 +247,7 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
245247
}
246248
metrics.PolledBatchSize.Set(float64(len(blockData)))
247249
}
250+
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
248251

249252
if len(failedResults) > 0 {
250253
p.handleBlockFailures(failedResults)

0 commit comments

Comments
 (0)