Skip to content

Commit a4ab66f

Browse files
committed
log metrics around processing durations
1 parent 23884ad commit a4ab66f

File tree

3 files changed

+62
-0
lines changed

3 files changed

+62
-0
lines changed

internal/metrics/metrics.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,42 @@ var (
105105
Help: "The last block number that was published",
106106
})
107107
)
108+
109+
// Operation Duration Metrics
110+
var (
111+
StagingInsertDuration = promauto.NewHistogram(prometheus.HistogramOpts{
112+
Name: "staging_insert_duration_seconds",
113+
Help: "Time taken to insert data into staging storage",
114+
Buckets: prometheus.DefBuckets,
115+
})
116+
117+
MainStorageInsertDuration = promauto.NewHistogram(prometheus.HistogramOpts{
118+
Name: "main_storage_insert_duration_seconds",
119+
Help: "Time taken to insert data into main storage",
120+
Buckets: prometheus.DefBuckets,
121+
})
122+
123+
PublishDuration = promauto.NewHistogram(prometheus.HistogramOpts{
124+
Name: "publish_duration_seconds",
125+
Help: "Time taken to publish block data to Kafka",
126+
Buckets: prometheus.DefBuckets,
127+
})
128+
129+
StagingDeleteDuration = promauto.NewHistogram(prometheus.HistogramOpts{
130+
Name: "staging_delete_duration_seconds",
131+
Help: "Time taken to delete data from staging storage",
132+
Buckets: prometheus.DefBuckets,
133+
})
134+
135+
GetBlockNumbersToCommitDuration = promauto.NewHistogram(prometheus.HistogramOpts{
136+
Name: "get_block_numbers_to_commit_duration_seconds",
137+
Help: "Time taken to get block numbers to commit from storage",
138+
Buckets: prometheus.DefBuckets,
139+
})
140+
141+
GetStagingDataDuration = promauto.NewHistogram(prometheus.HistogramOpts{
142+
Name: "get_staging_data_duration_seconds",
143+
Help: "Time taken to get data from staging storage",
144+
Buckets: prometheus.DefBuckets,
145+
})
146+
)

internal/orchestrator/committer.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func (c *Committer) Start(ctx context.Context) {
8080
}
8181

8282
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
83+
startTime := time.Now()
84+
defer func() {
85+
log.Debug().Str("metric", "get_block_numbers_to_commit_duration").Msgf("getBlockNumbersToCommit duration: %f", time.Since(startTime).Seconds())
86+
metrics.GetBlockNumbersToCommitDuration.Observe(time.Since(startTime).Seconds())
87+
}()
88+
8389
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
8490
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
8591
if err != nil {
@@ -117,7 +123,11 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
117123
return nil, nil
118124
}
119125

126+
startTime := time.Now()
120127
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
128+
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
129+
metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds())
130+
121131
if err != nil {
122132
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
123133
}
@@ -168,20 +178,29 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
168178
}
169179
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
170180

181+
mainStorageStart := time.Now()
171182
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
172183
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
173184
return fmt.Errorf("error saving data to main storage: %v", err)
174185
}
186+
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
187+
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
175188

189+
publishStart := time.Now()
176190
go func() {
177191
if err := c.publisher.PublishBlockData(blockData); err != nil {
178192
log.Error().Err(err).Msg("Failed to publish block data to kafka")
179193
}
194+
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
195+
metrics.PublishDuration.Observe(time.Since(publishStart).Seconds())
180196
}()
181197

198+
stagingDeleteStart := time.Now()
182199
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
183200
return fmt.Errorf("error deleting data from staging storage: %v", err)
184201
}
202+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
203+
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
185204

186205
// Find highest block number from committed blocks
187206
highestBlock := blockData[0].Block

internal/orchestrator/poller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
234234
Traces: result.Data.Traces,
235235
})
236236
}
237+
238+
startTime := time.Now()
237239
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
238240
e := fmt.Errorf("error inserting block data: %v", err)
239241
log.Error().Err(e)
@@ -245,6 +247,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
245247
}
246248
metrics.PolledBatchSize.Set(float64(len(blockData)))
247249
}
250+
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
251+
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
248252

249253
if len(failedResults) > 0 {
250254
p.handleBlockFailures(failedResults)

0 commit comments

Comments
 (0)