Skip to content

pass context to rpc and worker #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions internal/handlers/search_handlers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers

import (
"context"
"encoding/hex"
"fmt"
"math/big"
Expand Down Expand Up @@ -76,7 +77,7 @@ func Search(c *gin.Context) {
return
}

result, err := executeSearch(mainStorage, chainId, searchInput)
result, err := executeSearch(c.Request.Context(), mainStorage, chainId, searchInput)
if err != nil {
log.Error().Err(err).Msg("Error executing search")
api.InternalErrorHandler(c)
Expand Down Expand Up @@ -124,7 +125,7 @@ func isValidHashWithLength(input string, length int) bool {
return false
}

func executeSearch(storage storage.IMainStorage, chainId *big.Int, input SearchInput) (SearchResultModel, error) {
func executeSearch(ctx context.Context, storage storage.IMainStorage, chainId *big.Int, input SearchInput) (SearchResultModel, error) {
switch {
case input.BlockNumber != nil:
block, err := searchByBlockNumber(storage, chainId, input.BlockNumber)
Expand All @@ -134,7 +135,7 @@ func executeSearch(storage storage.IMainStorage, chainId *big.Int, input SearchI
return searchByHash(storage, chainId, input.Hash)

case input.Address != "":
return searchByAddress(storage, chainId, input.Address)
return searchByAddress(ctx, storage, chainId, input.Address)

case input.FunctionSignature != "":
transactions, err := searchByFunctionSelectorOptimistically(storage, chainId, input.FunctionSignature)
Expand Down Expand Up @@ -329,9 +330,9 @@ func searchByHash(mainStorage storage.IMainStorage, chainId *big.Int, hash strin
}
}

func searchByAddress(mainStorage storage.IMainStorage, chainId *big.Int, address string) (SearchResultModel, error) {
func searchByAddress(ctx context.Context, mainStorage storage.IMainStorage, chainId *big.Int, address string) (SearchResultModel, error) {
searchResult := SearchResultModel{Type: SearchResultTypeAddress}
contractCode, err := checkIfContractHasCode(chainId, address)
contractCode, err := checkIfContractHasCode(ctx, chainId, address)
if err != nil {
return searchResult, err
}
Expand Down Expand Up @@ -437,14 +438,14 @@ const (
ContractCodeDoesNotExist
)

func checkIfContractHasCode(chainId *big.Int, address string) (ContractCodeState, error) {
func checkIfContractHasCode(ctx context.Context, chainId *big.Int, address string) (ContractCodeState, error) {
if config.Cfg.API.Thirdweb.ClientId != "" {
rpcUrl := fmt.Sprintf("https://%s.rpc.thirdweb.com/%s", chainId.String(), config.Cfg.API.Thirdweb.ClientId)
r, err := rpc.InitializeSimpleRPCWithUrl(rpcUrl)
if err != nil {
return ContractCodeUnknown, err
}
hasCode, err := r.HasCode(address)
hasCode, err := r.HasCode(ctx, address)
if err != nil {
return ContractCodeUnknown, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (ct *ChainTracker) Start(ctx context.Context) {
log.Info().Msg("Chain tracker shutting down")
return
case <-ticker.C:
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber(ctx)
if err != nil {
log.Error().Err(err).Msg("Error getting latest block number")
continue
Expand Down
20 changes: 10 additions & 10 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *Committer) Start(ctx context.Context) {
return
default:
time.Sleep(interval)
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
if err != nil {
log.Error().Err(err).Msg("Error getting block data to commit")
continue
Expand All @@ -72,7 +72,7 @@ func (c *Committer) Start(ctx context.Context) {
log.Debug().Msg("No block data to commit")
continue
}
if err := c.commit(blockDataToCommit); err != nil {
if err := c.commit(ctx, blockDataToCommit); err != nil {
log.Error().Err(err).Msg("Error committing blocks")
}
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
return blockNumbers, nil
}

func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
Expand All @@ -123,7 +123,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
}
if len(blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
c.handleMissingStagingData(blocksToCommit)
c.handleMissingStagingData(ctx, blocksToCommit)
return nil, nil
}

Expand All @@ -133,7 +133,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
})

if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
return nil, c.handleGap(ctx, blocksToCommit[0], blocksData[0].Block)
}

var sequentialBlockData []common.BlockData
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
return sequentialBlockData, nil
}

func (c *Committer) commit(blockData []common.BlockData) error {
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
for i, block := range blockData {
blockNumbers[i] = block.Block.Number
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *Committer) commit(blockData []common.BlockData) error {
return nil
}

func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
// increment the gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
Expand All @@ -220,11 +220,11 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
}

log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
poller.Poll(missingBlockNumbers)
poller.Poll(ctx, missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}

func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
func (c *Committer) handleMissingStagingData(ctx context.Context, blocksToCommit []*big.Int) {
// Checks if there are any blocks in staging after the current range end
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
if err != nil {
Expand All @@ -242,6 +242,6 @@ func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
}
poller.Poll(blocksToPoll)
poller.Poll(ctx, blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
}
18 changes: 9 additions & 9 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()
result, err := committer.getSequentialBlockDataToCommit(context.Background())

assert.NoError(t, err)
assert.NotNil(t, result)
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()
result, err := committer.getSequentialBlockDataToCommit(context.Background())

assert.NoError(t, err)
assert.NotNil(t, result)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestCommit(t *testing.T) {
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
mockStagingStorage.EXPECT().DeleteStagingData(blockData).Return(nil)

err := committer.commit(blockData)
err := committer.commit(context.Background(), blockData)

assert.NoError(t, err)
}
Expand All @@ -343,7 +343,7 @@ func TestHandleGap(t *testing.T) {
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 5,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
Expand All @@ -352,7 +352,7 @@ func TestHandleGap(t *testing.T) {
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)
err := committer.handleGap(context.Background(), expectedStartBlockNumber, actualFirstBlock)

assert.Error(t, err)
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestHandleMissingStagingData(t *testing.T) {
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 100,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
Expand All @@ -482,7 +482,7 @@ func TestHandleMissingStagingData(t *testing.T) {
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()
result, err := committer.getSequentialBlockDataToCommit(context.Background())

assert.NoError(t, err)
assert.Nil(t, result)
Expand All @@ -509,7 +509,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 3,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
Expand All @@ -526,7 +526,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()
result, err := committer.getSequentialBlockDataToCommit(context.Background())

assert.NoError(t, err)
assert.Nil(t, result)
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (fr *FailureRecoverer) Start(ctx context.Context) {
// Trigger worker for recovery
log.Debug().Msgf("Triggering Failure Recoverer for blocks: %v", blocksToTrigger)
worker := worker.NewWorker(fr.rpc)
results := worker.Run(blocksToTrigger)
results := worker.Run(ctx, blocksToTrigger)
fr.handleWorkerResults(blockFailures, results)

// Track recovery activity
Expand Down
12 changes: 6 additions & 6 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (p *Poller) Start(ctx context.Context) {
return
}
blockRangeMutex.Lock()
blockNumbers, err := p.getNextBlockRange()
blockNumbers, err := p.getNextBlockRange(pollCtx)
blockRangeMutex.Unlock()

if pollCtx.Err() != nil {
Expand All @@ -117,7 +117,7 @@ func (p *Poller) Start(ctx context.Context) {
continue
}

lastPolledBlock := p.Poll(blockNumbers)
lastPolledBlock := p.Poll(pollCtx, blockNumbers)
if p.reachedPollLimit(lastPolledBlock) {
log.Debug().Msg("Reached poll limit, exiting poller")
cancel()
Expand Down Expand Up @@ -146,7 +146,7 @@ func (p *Poller) Start(ctx context.Context) {
}
}

func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
if len(blockNumbers) < 1 {
log.Debug().Msg("No blocks to poll, skipping")
return
Expand All @@ -161,7 +161,7 @@ func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)

worker := worker.NewWorker(p.rpc)
results := worker.Run(blockNumbers)
results := worker.Run(ctx, blockNumbers)
p.handleWorkerResults(results)
return endBlock
}
Expand All @@ -170,8 +170,8 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
return blockNumber == nil || (p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0)
}

func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
latestBlock, err := p.rpc.GetLatestBlockNumber()
func (p *Poller) getNextBlockRange(ctx context.Context) ([]*big.Int, error) {
latestBlock, err := p.rpc.GetLatestBlockNumber(ctx)
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
rh.publisher.Close()
return
case <-ticker.C:
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
mostRecentBlockChecked, err := rh.RunFromBlock(ctx, rh.lastCheckedBlock)
if err != nil {
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
continue
Expand All @@ -99,7 +99,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
}
}

func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
func (rh *ReorgHandler) RunFromBlock(ctx context.Context, latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
fromBlock, toBlock, err := rh.getReorgCheckRange(latestCheckedBlock)
if err != nil {
return nil, err
Expand Down Expand Up @@ -130,7 +130,7 @@ func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBl

metrics.ReorgCounter.Inc()
reorgedBlockNumbers := make([]*big.Int, 0)
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
err = rh.findReorgedBlockNumbers(ctx, blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
if err != nil {
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
}
Expand All @@ -140,7 +140,7 @@ func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBl
return mostRecentBlockHeader.Number, nil
}

err = rh.handleReorg(reorgedBlockNumbers)
err = rh.handleReorg(ctx, reorgedBlockNumbers)
if err != nil {
return nil, fmt.Errorf("error while handling reorg: %w", err)
}
Expand Down Expand Up @@ -190,8 +190,8 @@ func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (
return -1, nil
}

func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
func (rh *ReorgHandler) findReorgedBlockNumbers(ctx context.Context, blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
newBlocksByNumber, err := rh.getNewBlocksByNumber(ctx, blockHeadersDescending)
if err != nil {
return err
}
Expand Down Expand Up @@ -219,12 +219,12 @@ func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.
sort.Slice(nextHeadersBatch, func(i, j int) bool {
return nextHeadersBatch[i].Number.Cmp(nextHeadersBatch[j].Number) > 0
})
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
return rh.findReorgedBlockNumbers(ctx, nextHeadersBatch, reorgedBlockNumbers)
}
return nil
}

func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
func (rh *ReorgHandler) getNewBlocksByNumber(ctx context.Context, blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
for _, header := range blockHeaders {
blockNumbers = append(blockNumbers, header.Number)
Expand All @@ -241,7 +241,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
wg.Add(1)
go func(chunk []*big.Int) {
defer wg.Done()
resultsCh <- rh.rpc.GetBlocks(chunk)
resultsCh <- rh.rpc.GetBlocks(ctx, chunk)
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
}
Expand All @@ -264,9 +264,9 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
return fetchedBlocksByNumber, nil
}

func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
func (rh *ReorgHandler) handleReorg(ctx context.Context, reorgedBlockNumbers []*big.Int) error {
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
results := rh.worker.Run(reorgedBlockNumbers)
results := rh.worker.Run(ctx, reorgedBlockNumbers)
data := make([]common.BlockData, 0, len(results))
blocksToDelete := make([]*big.Int, 0, len(results))
for _, result := range results {
Expand Down
Loading