Skip to content

Commit f334927

Browse files
committed
publish metrics only when actually publishing
1 parent cc624f5 commit f334927

File tree

2 files changed

+10
-12
lines changed

2 files changed

+10
-12
lines changed

internal/orchestrator/committer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,10 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
250250
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
251251
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
252252

253-
publishStart := time.Now()
254253
go func() {
255254
if err := c.publisher.PublishBlockData(blockData); err != nil {
256255
log.Error().Err(err).Msg("Failed to publish block data to kafka")
257256
}
258-
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
259-
metrics.PublishDuration.Observe(time.Since(publishStart).Seconds())
260257
}()
261258

262259
stagingDeleteStart := time.Now()

internal/publisher/publisher.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,7 @@ func (p *Publisher) initialize() error {
9797
}
9898

9999
func (p *Publisher) PublishBlockData(blockData []common.BlockData) error {
100-
err := p.publishBlockData(blockData, false)
101-
if err != nil {
102-
return err
103-
}
104-
metrics.PublisherBlockCounter.Add(float64(len(blockData)))
105-
metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
106-
return nil
100+
return p.publishBlockData(blockData, false)
107101
}
108102

109103
func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.BlockData) error {
@@ -114,8 +108,6 @@ func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.Bl
114108
if err := p.publishBlockData(newData, false); err != nil {
115109
return fmt.Errorf("failed to publish new block data: %v", err)
116110
}
117-
118-
metrics.PublisherReorgedBlockCounter.Add(float64(len(oldData)))
119111
return nil
120112
}
121113

@@ -168,6 +160,8 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool)
168160
return nil
169161
}
170162

163+
publishStart := time.Now()
164+
171165
// Prepare messages for blocks, events, transactions and traces
172166
blockMessages := make([]*kgo.Record, len(blockData))
173167
var eventMessages []*kgo.Record
@@ -251,6 +245,13 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool)
251245
}
252246
}
253247

248+
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
249+
metrics.PublishDuration.Observe(time.Since(publishStart).Seconds())
250+
metrics.PublisherBlockCounter.Add(float64(len(blockData)))
251+
metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
252+
if isReorg {
253+
metrics.PublisherReorgedBlockCounter.Add(float64(len(blockData)))
254+
}
254255
return nil
255256
}
256257

0 commit comments

Comments
 (0)