From 54a6fcdd6646c7eb586c6b0caa9e1325dc1447e1 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Mon, 19 May 2025 13:52:41 +0300 Subject: [PATCH] add rpc function to fetch transactions by hashes --- internal/common/transaction.go | 2 + internal/common/utils.go | 6 +- internal/orchestrator/reorg_handler.go | 2 +- internal/rpc/batcher.go | 41 ++++++------- internal/rpc/params.go | 4 ++ internal/rpc/rpc.go | 42 ++++++++++---- internal/rpc/serializer.go | 54 ++++++++++------- internal/worker/worker.go | 2 +- test/mocks/MockIRPCClient.go | 80 ++++++++++++++++++++++++++ 9 files changed, 175 insertions(+), 58 deletions(-) diff --git a/internal/common/transaction.go b/internal/common/transaction.go index 6e58629..35dc6d6 100644 --- a/internal/common/transaction.go +++ b/internal/common/transaction.go @@ -11,6 +11,8 @@ import ( "github.com/rs/zerolog/log" ) +type RawTransaction = map[string]interface{} + type Transaction struct { ChainId *big.Int `json:"chain_id" ch:"chain_id" swaggertype:"string"` Hash string `json:"hash" ch:"hash"` diff --git a/internal/common/utils.go b/internal/common/utils.go index a3b35bc..494b03a 100644 --- a/internal/common/utils.go +++ b/internal/common/utils.go @@ -7,11 +7,11 @@ import ( "strings" ) -func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int { +func SliceToChunks[T any](values []T, chunkSize int) [][]T { if chunkSize >= len(values) || chunkSize <= 0 { - return [][]*big.Int{values} + return [][]T{values} } - var chunks [][]*big.Int + var chunks [][]T for i := 0; i < len(values); i += chunkSize { end := i + chunkSize if end > len(values) { diff --git a/internal/orchestrator/reorg_handler.go b/internal/orchestrator/reorg_handler.go index 866f707..81c123f 100644 --- a/internal/orchestrator/reorg_handler.go +++ b/internal/orchestrator/reorg_handler.go @@ -230,7 +230,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) blockNumbers = append(blockNumbers, header.Number) } blockCount := len(blockNumbers) - chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks) + chunks := common.SliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks) var wg sync.WaitGroup resultsCh := make(chan []rpc.GetBlocksResult, len(chunks)) diff --git a/internal/rpc/batcher.go b/internal/rpc/batcher.go index d121c3f..ea82d7b 100644 --- a/internal/rpc/batcher.go +++ b/internal/rpc/batcher.go @@ -2,7 +2,6 @@ package rpc import ( "context" - "math/big" "sync" "time" @@ -11,28 +10,28 @@ import ( "github.com/thirdweb-dev/indexer/internal/common" ) -type RPCFetchBatchResult[T any] struct { - BlockNumber *big.Int - Error error - Result T +type RPCFetchBatchResult[K any, T any] struct { + Key K + Error error + Result T } -func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize int, batchDelay int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] { - if len(blockNumbers) <= batchSize { - return RPCFetchBatch[T](rpc, blockNumbers, method, argsFunc) +func RPCFetchInBatches[K any, T any](rpc *Client, keys []K, batchSize int, batchDelay int, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] { + if len(keys) <= batchSize { + return RPCFetchSingleBatch[K, T](rpc, keys, method, argsFunc) } - chunks := common.BigIntSliceToChunks(blockNumbers, batchSize) + chunks := common.SliceToChunks[K](keys, batchSize) - log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(blockNumbers), len(chunks), batchSize) + log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(keys), len(chunks), batchSize) var wg sync.WaitGroup - resultsCh := make(chan []RPCFetchBatchResult[T], len(chunks)) + resultsCh := make(chan []RPCFetchBatchResult[K, T], len(chunks)) for _, chunk := range chunks { wg.Add(1) - go func(chunk []*big.Int) { + go func(chunk []K) { defer wg.Done() - resultsCh <- RPCFetchBatch[T](rpc, chunk, method, argsFunc) + resultsCh <- RPCFetchSingleBatch[K, T](rpc, chunk, method, argsFunc) if batchDelay > 0 { time.Sleep(time.Duration(batchDelay) * time.Millisecond) } @@ -43,7 +42,7 @@ func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize in close(resultsCh) }() - results := make([]RPCFetchBatchResult[T], 0, len(blockNumbers)) + results := make([]RPCFetchBatchResult[K, T], 0, len(keys)) for batchResults := range resultsCh { results = append(results, batchResults...) } @@ -51,17 +50,15 @@ func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize in return results } -func RPCFetchBatch[T any](rpc *Client, blockNumbers []*big.Int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] { - batch := make([]gethRpc.BatchElem, len(blockNumbers)) - results := make([]RPCFetchBatchResult[T], len(blockNumbers)) +func RPCFetchSingleBatch[K any, T any](rpc *Client, keys []K, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] { + batch := make([]gethRpc.BatchElem, len(keys)) + results := make([]RPCFetchBatchResult[K, T], len(keys)) - for i, blockNum := range blockNumbers { - results[i] = RPCFetchBatchResult[T]{ - BlockNumber: blockNum, - } + for i, key := range keys { + results[i] = RPCFetchBatchResult[K, T]{Key: key} batch[i] = gethRpc.BatchElem{ Method: method, - Args: argsFunc(blockNum), + Args: argsFunc(key), Result: new(T), } } diff --git a/internal/rpc/params.go b/internal/rpc/params.go index 220f996..0bf5124 100644 --- a/internal/rpc/params.go +++ b/internal/rpc/params.go @@ -10,6 +10,10 @@ func GetBlockWithTransactionsParams(blockNum *big.Int) []interface{} { return []interface{}{hexutil.EncodeBig(blockNum), true} } +func GetTransactionParams(txHash string) []interface{} { + return []interface{}{txHash} +} + func GetBlockWithoutTransactionsParams(blockNum *big.Int) []interface{} { return []interface{}{hexutil.EncodeBig(blockNum), false} } diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index bb94a06..c18bd7b 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -27,6 +27,11 @@ type GetBlocksResult struct { Data common.Block } +type GetTransactionsResult struct { + Error error + Data common.Transaction +} + type BlocksPerRequestConfig struct { Blocks int Logs int @@ -37,6 +42,7 @@ type BlocksPerRequestConfig struct { type IRPCClient interface { GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult GetBlocks(blockNumbers []*big.Int) []GetBlocksResult + GetTransactions(txHashes []string) []GetTransactionsResult GetLatestBlockNumber() (*big.Int, error) GetChainID() *big.Int GetURL() string @@ -44,6 +50,7 @@ type IRPCClient interface { IsWebsocket() bool SupportsTraceBlock() bool HasCode(address string) (bool, error) + Close() } type Client struct { @@ -213,28 +220,28 @@ func (rpc *Client) setChainID() error { func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult { var wg sync.WaitGroup - var blocks []RPCFetchBatchResult[common.RawBlock] - var logs []RPCFetchBatchResult[common.RawLogs] - var traces []RPCFetchBatchResult[common.RawTraces] - var receipts []RPCFetchBatchResult[common.RawReceipts] + var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock] + var logs []RPCFetchBatchResult[*big.Int, common.RawLogs] + var traces []RPCFetchBatchResult[*big.Int, common.RawTraces] + var receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts] wg.Add(2) go func() { defer wg.Done() - result := RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams) + result := RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams) blocks = result }() if rpc.supportsBlockReceipts { go func() { defer wg.Done() - result := RPCFetchInBatches[common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams) + result := RPCFetchInBatches[*big.Int, common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams) receipts = result }() } else { go func() { defer wg.Done() - result := RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams) + result := RPCFetchInBatches[*big.Int, common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams) logs = result }() } @@ -243,7 +250,7 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult { wg.Add(1) go func() { defer wg.Done() - result := RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams) + result := RPCFetchInBatches[*big.Int, common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams) traces = result }() } @@ -255,19 +262,34 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult { func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult { var wg sync.WaitGroup - var blocks []RPCFetchBatchResult[common.RawBlock] + var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock] wg.Add(1) go func() { defer wg.Done() - blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams) + blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams) }() wg.Wait() return SerializeBlocks(rpc.chainID, blocks) } +func (rpc *Client) GetTransactions(txHashes []string) []GetTransactionsResult { + var wg sync.WaitGroup + var transactions []RPCFetchBatchResult[string, common.RawTransaction] + + wg.Add(1) + + go func() { + defer wg.Done() + transactions = RPCFetchSingleBatch[string, common.RawTransaction](rpc, txHashes, "eth_getTransactionByHash", GetTransactionParams) + }() + wg.Wait() + + return SerializeTransactions(rpc.chainID, transactions) +} + func (rpc *Client) GetLatestBlockNumber() (*big.Int, error) { blockNumber, err := rpc.EthClient.BlockNumber(context.Background()) if err != nil { diff --git a/internal/rpc/serializer.go b/internal/rpc/serializer.go index 3f7e7e7..50cc138 100644 --- a/internal/rpc/serializer.go +++ b/internal/rpc/serializer.go @@ -11,7 +11,7 @@ import ( "github.com/thirdweb-dev/indexer/internal/common" ) -func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock], logs []RPCFetchBatchResult[common.RawLogs], traces []RPCFetchBatchResult[common.RawTraces], receipts []RPCFetchBatchResult[common.RawReceipts]) []GetFullBlockResult { +func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock], logs []RPCFetchBatchResult[*big.Int, common.RawLogs], traces []RPCFetchBatchResult[*big.Int, common.RawTraces], receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]) []GetFullBlockResult { if blocks == nil { return []GetFullBlockResult{} } @@ -21,46 +21,46 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R rawReceiptsMap := mapBatchResultsByBlockNumber[common.RawReceipts](receipts) rawTracesMap := mapBatchResultsByBlockNumber[common.RawTraces](traces) - for _, rawBlock := range blocks { + for _, rawBlockData := range blocks { result := GetFullBlockResult{ - BlockNumber: rawBlock.BlockNumber, + BlockNumber: rawBlockData.Key, } - if rawBlock.Result == nil { - log.Warn().Err(rawBlock.Error).Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String()) - result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlock.Error) + if rawBlockData.Result == nil { + log.Warn().Err(rawBlockData.Error).Msgf("Received a nil block result for block %s.", rawBlockData.Key.String()) + result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlockData.Error) results = append(results, result) continue } - if rawBlock.Error != nil { - result.Error = rawBlock.Error + if rawBlockData.Error != nil { + result.Error = rawBlockData.Error results = append(results, result) continue } - result.Data.Block = serializeBlock(chainId, rawBlock.Result) + result.Data.Block = serializeBlock(chainId, rawBlockData.Result) blockTimestamp := result.Data.Block.Timestamp - if rawReceipts, exists := rawReceiptsMap[rawBlock.BlockNumber.String()]; exists { + if rawReceipts, exists := rawReceiptsMap[rawBlockData.Key.String()]; exists { if rawReceipts.Error != nil { result.Error = rawReceipts.Error } else { result.Data.Logs = serializeLogsFromReceipts(chainId, rawReceipts.Result, result.Data.Block) - result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result) + result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result) } } else { - if rawLogs, exists := rawLogsMap[rawBlock.BlockNumber.String()]; exists { + if rawLogs, exists := rawLogsMap[rawBlockData.Key.String()]; exists { if rawLogs.Error != nil { result.Error = rawLogs.Error } else { result.Data.Logs = serializeLogs(chainId, rawLogs.Result, result.Data.Block) - result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, nil) + result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, nil) } } } if result.Error == nil { - if rawTraces, exists := rawTracesMap[rawBlock.BlockNumber.String()]; exists { + if rawTraces, exists := rawTracesMap[rawBlockData.Key.String()]; exists { if rawTraces.Error != nil { result.Error = rawTraces.Error } else { @@ -75,26 +75,26 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R return results } -func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[T]) map[string]*RPCFetchBatchResult[T] { +func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[*big.Int, T]) map[string]*RPCFetchBatchResult[*big.Int, T] { if results == nil { - return make(map[string]*RPCFetchBatchResult[T], 0) + return make(map[string]*RPCFetchBatchResult[*big.Int, T], 0) } - resultsMap := make(map[string]*RPCFetchBatchResult[T], len(results)) + resultsMap := make(map[string]*RPCFetchBatchResult[*big.Int, T], len(results)) for _, result := range results { - resultsMap[result.BlockNumber.String()] = &result + resultsMap[result.Key.String()] = &result } return resultsMap } -func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock]) []GetBlocksResult { +func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]) []GetBlocksResult { results := make([]GetBlocksResult, 0, len(blocks)) for _, rawBlock := range blocks { result := GetBlocksResult{ - BlockNumber: rawBlock.BlockNumber, + BlockNumber: rawBlock.Key, } if rawBlock.Result == nil { - log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String()) + log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.Key.String()) result.Error = fmt.Errorf("received a nil block result from RPC") results = append(results, result) continue @@ -473,3 +473,15 @@ func interfaceToJsonString(value interface{}) string { } return string(jsonString) } + +func SerializeTransactions(chainId *big.Int, transactions []RPCFetchBatchResult[string, common.RawTransaction]) []GetTransactionsResult { + results := make([]GetTransactionsResult, 0, len(transactions)) + for _, transaction := range transactions { + result := GetTransactionsResult{ + Error: transaction.Error, + Data: serializeTransaction(chainId, transaction.Result, time.Time{}, nil), + } + results = append(results, result) + } + return results +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 7394480..42ed4af 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -24,7 +24,7 @@ func NewWorker(rpc rpc.IRPCClient) *Worker { func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult { blockCount := len(blockNumbers) - chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks) + chunks := common.SliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks) var wg sync.WaitGroup resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks)) diff --git a/test/mocks/MockIRPCClient.go b/test/mocks/MockIRPCClient.go index 816b205..5d64428 100644 --- a/test/mocks/MockIRPCClient.go +++ b/test/mocks/MockIRPCClient.go @@ -24,6 +24,38 @@ func (_m *MockIRPCClient) EXPECT() *MockIRPCClient_Expecter { return &MockIRPCClient_Expecter{mock: &_m.Mock} } +// Close provides a mock function with no fields +func (_m *MockIRPCClient) Close() { + _m.Called() +} + +// MockIRPCClient_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockIRPCClient_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockIRPCClient_Expecter) Close() *MockIRPCClient_Close_Call { + return &MockIRPCClient_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockIRPCClient_Close_Call) Run(run func()) *MockIRPCClient_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockIRPCClient_Close_Call) Return() *MockIRPCClient_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockIRPCClient_Close_Call) RunAndReturn(run func()) *MockIRPCClient_Close_Call { + _c.Run(run) + return _c +} + // GetBlocks provides a mock function with given fields: blockNumbers func (_m *MockIRPCClient) GetBlocks(blockNumbers []*big.Int) []rpc.GetBlocksResult { ret := _m.Called(blockNumbers) @@ -269,6 +301,54 @@ func (_c *MockIRPCClient_GetLatestBlockNumber_Call) RunAndReturn(run func() (*bi return _c } +// GetTransactions provides a mock function with given fields: txHashes +func (_m *MockIRPCClient) GetTransactions(txHashes []string) []rpc.GetTransactionsResult { + ret := _m.Called(txHashes) + + if len(ret) == 0 { + panic("no return value specified for GetTransactions") + } + + var r0 []rpc.GetTransactionsResult + if rf, ok := ret.Get(0).(func([]string) []rpc.GetTransactionsResult); ok { + r0 = rf(txHashes) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]rpc.GetTransactionsResult) + } + } + + return r0 +} + +// MockIRPCClient_GetTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTransactions' +type MockIRPCClient_GetTransactions_Call struct { + *mock.Call +} + +// GetTransactions is a helper method to define mock.On call +// - txHashes []string +func (_e *MockIRPCClient_Expecter) GetTransactions(txHashes interface{}) *MockIRPCClient_GetTransactions_Call { + return &MockIRPCClient_GetTransactions_Call{Call: _e.mock.On("GetTransactions", txHashes)} +} + +func (_c *MockIRPCClient_GetTransactions_Call) Run(run func(txHashes []string)) *MockIRPCClient_GetTransactions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *MockIRPCClient_GetTransactions_Call) Return(_a0 []rpc.GetTransactionsResult) *MockIRPCClient_GetTransactions_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIRPCClient_GetTransactions_Call) RunAndReturn(run func([]string) []rpc.GetTransactionsResult) *MockIRPCClient_GetTransactions_Call { + _c.Call.Return(run) + return _c +} + // GetURL provides a mock function with no fields func (_m *MockIRPCClient) GetURL() string { ret := _m.Called()