Skip to content

Commit 120b7e9

Browse files
authored
publish metrics only when actually publishing (#227)
### TL;DR Moved metrics collection from the Committer to the Publisher for more accurate timing and consolidated metrics reporting. ### What changed? - Removed metrics collection for publishing duration from the Committer's goroutine - Moved the timing measurement for publishing to the Publisher itself - Consolidated all metrics reporting (block counter, last published block, reorg counter) to happen in one place within the Publisher - Fixed a bug where reorg metrics were only being recorded for old blocks, not new blocks ### How to test? 1. Run the application and verify that metrics are still being reported correctly 2. Check that the publish_duration metric now accurately reflects the actual time spent in the Publisher.PublishBlockData method 3. Verify that reorg metrics are properly recorded when a reorg occurs ### Why make this change? The previous implementation had inaccurate timing measurements since the metrics were being recorded in the Committer's goroutine rather than in the Publisher itself. This change ensures that the publish_duration metric accurately reflects the actual time spent publishing data. Additionally, consolidating all metrics reporting in one place makes the code more maintainable and ensures consistent metrics collection.
2 parents 9f4504d + 34cef4e commit 120b7e9

File tree

2 files changed

+10
-12
lines changed

2 files changed

+10
-12
lines changed

internal/orchestrator/committer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,10 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
253253
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
254254
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
255255

256-
publishStart := time.Now()
257256
go func() {
258257
if err := c.publisher.PublishBlockData(blockData); err != nil {
259258
log.Error().Err(err).Msg("Failed to publish block data to kafka")
260259
}
261-
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
262-
metrics.PublishDuration.Observe(time.Since(publishStart).Seconds())
263260
}()
264261

265262
stagingDeleteStart := time.Now()

internal/publisher/publisher.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,7 @@ func (p *Publisher) initialize() error {
9797
}
9898

9999
func (p *Publisher) PublishBlockData(blockData []common.BlockData) error {
100-
err := p.publishBlockData(blockData, false)
101-
if err != nil {
102-
return err
103-
}
104-
metrics.PublisherBlockCounter.Add(float64(len(blockData)))
105-
metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
106-
return nil
100+
return p.publishBlockData(blockData, false)
107101
}
108102

109103
func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.BlockData) error {
@@ -114,8 +108,6 @@ func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.Bl
114108
if err := p.publishBlockData(newData, false); err != nil {
115109
return fmt.Errorf("failed to publish new block data: %v", err)
116110
}
117-
118-
metrics.PublisherReorgedBlockCounter.Add(float64(len(oldData)))
119111
return nil
120112
}
121113

@@ -168,6 +160,8 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool)
168160
return nil
169161
}
170162

163+
publishStart := time.Now()
164+
171165
// Prepare messages for blocks, events, transactions and traces
172166
blockMessages := make([]*kgo.Record, len(blockData))
173167
var eventMessages []*kgo.Record
@@ -251,6 +245,13 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool)
251245
}
252246
}
253247

248+
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
249+
metrics.PublishDuration.Observe(time.Since(publishStart).Seconds())
250+
metrics.PublisherBlockCounter.Add(float64(len(blockData)))
251+
metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
252+
if isReorg {
253+
metrics.PublisherReorgedBlockCounter.Add(float64(len(blockData)))
254+
}
254255
return nil
255256
}
256257

0 commit comments

Comments
 (0)