diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 315108e..7cbcf88 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -253,13 +253,10 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - publishStart := time.Now() go func() { if err := c.publisher.PublishBlockData(blockData); err != nil { log.Error().Err(err).Msg("Failed to publish block data to kafka") } - log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) - metrics.PublishDuration.Observe(time.Since(publishStart).Seconds()) }() stagingDeleteStart := time.Now() diff --git a/internal/publisher/publisher.go b/internal/publisher/publisher.go index 4ee4e26..984115a 100644 --- a/internal/publisher/publisher.go +++ b/internal/publisher/publisher.go @@ -97,13 +97,7 @@ func (p *Publisher) initialize() error { } func (p *Publisher) PublishBlockData(blockData []common.BlockData) error { - err := p.publishBlockData(blockData, false) - if err != nil { - return err - } - metrics.PublisherBlockCounter.Add(float64(len(blockData))) - metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64())) - return nil + return p.publishBlockData(blockData, false) } func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.BlockData) error { @@ -114,8 +108,6 @@ func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.Bl if err := p.publishBlockData(newData, false); err != nil { return fmt.Errorf("failed to publish new block data: %v", err) } - - metrics.PublisherReorgedBlockCounter.Add(float64(len(oldData))) return nil } @@ -168,6 +160,8 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool) return nil } + publishStart := time.Now() + // Prepare messages for blocks, events, transactions and traces blockMessages := make([]*kgo.Record, len(blockData)) var eventMessages []*kgo.Record @@ -251,6 +245,13 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool) } } + log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) + metrics.PublishDuration.Observe(time.Since(publishStart).Seconds()) + metrics.PublisherBlockCounter.Add(float64(len(blockData))) + metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64())) + if isReorg { + metrics.PublisherReorgedBlockCounter.Add(float64(len(blockData))) + } return nil }