Skip to content

Commit 275e955

Browse files
committed
add rpc function to fetch transactions by hashes
1 parent 210ebd3 commit 275e955

File tree

7 files changed

+185
-53
lines changed

7 files changed

+185
-53
lines changed

internal/common/transaction.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/rs/zerolog/log"
1212
)
1313

14+
type RawTransaction = map[string]interface{}
15+
1416
type Transaction struct {
1517
ChainId *big.Int `json:"chain_id" ch:"chain_id" swaggertype:"string"`
1618
Hash string `json:"hash" ch:"hash"`

internal/common/utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,21 @@ import (
77
"strings"
88
)
99

10+
func SliceToChunks[T any](values []T, chunkSize int) [][]T {
11+
if chunkSize >= len(values) || chunkSize <= 0 {
12+
return [][]T{values}
13+
}
14+
var chunks [][]T
15+
for i := 0; i < len(values); i += chunkSize {
16+
end := i + chunkSize
17+
if end > len(values) {
18+
end = len(values)
19+
}
20+
chunks = append(chunks, values[i:end])
21+
}
22+
return chunks
23+
}
24+
1025
func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
1126
if chunkSize >= len(values) || chunkSize <= 0 {
1227
return [][]*big.Int{values}

internal/rpc/batcher.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package rpc
22

33
import (
44
"context"
5-
"math/big"
65
"sync"
76
"time"
87

@@ -11,28 +10,28 @@ import (
1110
"github.com/thirdweb-dev/indexer/internal/common"
1211
)
1312

14-
type RPCFetchBatchResult[T any] struct {
15-
BlockNumber *big.Int
16-
Error error
17-
Result T
13+
type RPCFetchBatchResult[K any, T any] struct {
14+
Key K
15+
Error error
16+
Result T
1817
}
1918

20-
func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize int, batchDelay int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] {
21-
if len(blockNumbers) <= batchSize {
22-
return RPCFetchBatch[T](rpc, blockNumbers, method, argsFunc)
19+
func RPCFetchInBatches[K any, T any](rpc *Client, keys []K, batchSize int, batchDelay int, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
20+
if len(keys) <= batchSize {
21+
return RPCFetchSingleBatch[K, T](rpc, keys, method, argsFunc)
2322
}
24-
chunks := common.BigIntSliceToChunks(blockNumbers, batchSize)
23+
chunks := common.SliceToChunks[K](keys, batchSize)
2524

26-
log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(blockNumbers), len(chunks), batchSize)
25+
log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(keys), len(chunks), batchSize)
2726

2827
var wg sync.WaitGroup
29-
resultsCh := make(chan []RPCFetchBatchResult[T], len(chunks))
28+
resultsCh := make(chan []RPCFetchBatchResult[K, T], len(chunks))
3029

3130
for _, chunk := range chunks {
3231
wg.Add(1)
33-
go func(chunk []*big.Int) {
32+
go func(chunk []K) {
3433
defer wg.Done()
35-
resultsCh <- RPCFetchBatch[T](rpc, chunk, method, argsFunc)
34+
resultsCh <- RPCFetchSingleBatch[K, T](rpc, chunk, method, argsFunc)
3635
if batchDelay > 0 {
3736
time.Sleep(time.Duration(batchDelay) * time.Millisecond)
3837
}
@@ -43,25 +42,23 @@ func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize in
4342
close(resultsCh)
4443
}()
4544

46-
results := make([]RPCFetchBatchResult[T], 0, len(blockNumbers))
45+
results := make([]RPCFetchBatchResult[K, T], 0, len(keys))
4746
for batchResults := range resultsCh {
4847
results = append(results, batchResults...)
4948
}
5049

5150
return results
5251
}
5352

54-
func RPCFetchBatch[T any](rpc *Client, blockNumbers []*big.Int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] {
55-
batch := make([]gethRpc.BatchElem, len(blockNumbers))
56-
results := make([]RPCFetchBatchResult[T], len(blockNumbers))
53+
func RPCFetchSingleBatch[K any, T any](rpc *Client, keys []K, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
54+
batch := make([]gethRpc.BatchElem, len(keys))
55+
results := make([]RPCFetchBatchResult[K, T], len(keys))
5756

58-
for i, blockNum := range blockNumbers {
59-
results[i] = RPCFetchBatchResult[T]{
60-
BlockNumber: blockNum,
61-
}
57+
for i, key := range keys {
58+
results[i] = RPCFetchBatchResult[K, T]{Key: key}
6259
batch[i] = gethRpc.BatchElem{
6360
Method: method,
64-
Args: argsFunc(blockNum),
61+
Args: argsFunc(key),
6562
Result: new(T),
6663
}
6764
}

internal/rpc/params.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ func GetBlockWithTransactionsParams(blockNum *big.Int) []interface{} {
1010
return []interface{}{hexutil.EncodeBig(blockNum), true}
1111
}
1212

13+
func GetTransactionParams(txHash string) []interface{} {
14+
return []interface{}{txHash}
15+
}
16+
1317
func GetBlockWithoutTransactionsParams(blockNum *big.Int) []interface{} {
1418
return []interface{}{hexutil.EncodeBig(blockNum), false}
1519
}

internal/rpc/rpc.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ type GetBlocksResult struct {
2727
Data common.Block
2828
}
2929

30+
type GetTransactionsResult struct {
31+
Error error
32+
Data common.Transaction
33+
}
34+
3035
type BlocksPerRequestConfig struct {
3136
Blocks int
3237
Logs int
@@ -37,13 +42,15 @@ type BlocksPerRequestConfig struct {
3742
type IRPCClient interface {
3843
GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult
3944
GetBlocks(blockNumbers []*big.Int) []GetBlocksResult
45+
GetTransactions(txHashes []string) []GetTransactionsResult
4046
GetLatestBlockNumber() (*big.Int, error)
4147
GetChainID() *big.Int
4248
GetURL() string
4349
GetBlocksPerRequest() BlocksPerRequestConfig
4450
IsWebsocket() bool
4551
SupportsTraceBlock() bool
4652
HasCode(address string) (bool, error)
53+
Close()
4754
}
4855

4956
type Client struct {
@@ -213,28 +220,28 @@ func (rpc *Client) setChainID() error {
213220

214221
func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
215222
var wg sync.WaitGroup
216-
var blocks []RPCFetchBatchResult[common.RawBlock]
217-
var logs []RPCFetchBatchResult[common.RawLogs]
218-
var traces []RPCFetchBatchResult[common.RawTraces]
219-
var receipts []RPCFetchBatchResult[common.RawReceipts]
223+
var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]
224+
var logs []RPCFetchBatchResult[*big.Int, common.RawLogs]
225+
var traces []RPCFetchBatchResult[*big.Int, common.RawTraces]
226+
var receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]
220227
wg.Add(2)
221228

222229
go func() {
223230
defer wg.Done()
224-
result := RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
231+
result := RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
225232
blocks = result
226233
}()
227234

228235
if rpc.supportsBlockReceipts {
229236
go func() {
230237
defer wg.Done()
231-
result := RPCFetchInBatches[common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
238+
result := RPCFetchInBatches[*big.Int, common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
232239
receipts = result
233240
}()
234241
} else {
235242
go func() {
236243
defer wg.Done()
237-
result := RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
244+
result := RPCFetchInBatches[*big.Int, common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
238245
logs = result
239246
}()
240247
}
@@ -243,7 +250,7 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
243250
wg.Add(1)
244251
go func() {
245252
defer wg.Done()
246-
result := RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
253+
result := RPCFetchInBatches[*big.Int, common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
247254
traces = result
248255
}()
249256
}
@@ -255,19 +262,34 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
255262

256263
func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult {
257264
var wg sync.WaitGroup
258-
var blocks []RPCFetchBatchResult[common.RawBlock]
265+
var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]
259266

260267
wg.Add(1)
261268

262269
go func() {
263270
defer wg.Done()
264-
blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
271+
blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
265272
}()
266273
wg.Wait()
267274

268275
return SerializeBlocks(rpc.chainID, blocks)
269276
}
270277

278+
func (rpc *Client) GetTransactions(txHashes []string) []GetTransactionsResult {
279+
var wg sync.WaitGroup
280+
var transactions []RPCFetchBatchResult[string, common.RawTransaction]
281+
282+
wg.Add(1)
283+
284+
go func() {
285+
defer wg.Done()
286+
transactions = RPCFetchSingleBatch[string, common.RawTransaction](rpc, txHashes, "eth_getTransactionByHash", GetTransactionParams)
287+
}()
288+
wg.Wait()
289+
290+
return SerializeTransactions(rpc.chainID, transactions)
291+
}
292+
271293
func (rpc *Client) GetLatestBlockNumber() (*big.Int, error) {
272294
blockNumber, err := rpc.EthClient.BlockNumber(context.Background())
273295
if err != nil {

internal/rpc/serializer.go

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/thirdweb-dev/indexer/internal/common"
1212
)
1313

14-
func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock], logs []RPCFetchBatchResult[common.RawLogs], traces []RPCFetchBatchResult[common.RawTraces], receipts []RPCFetchBatchResult[common.RawReceipts]) []GetFullBlockResult {
14+
func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock], logs []RPCFetchBatchResult[*big.Int, common.RawLogs], traces []RPCFetchBatchResult[*big.Int, common.RawTraces], receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]) []GetFullBlockResult {
1515
if blocks == nil {
1616
return []GetFullBlockResult{}
1717
}
@@ -21,46 +21,46 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R
2121
rawReceiptsMap := mapBatchResultsByBlockNumber[common.RawReceipts](receipts)
2222
rawTracesMap := mapBatchResultsByBlockNumber[common.RawTraces](traces)
2323

24-
for _, rawBlock := range blocks {
24+
for _, rawBlockData := range blocks {
2525
result := GetFullBlockResult{
26-
BlockNumber: rawBlock.BlockNumber,
26+
BlockNumber: rawBlockData.Key,
2727
}
28-
if rawBlock.Result == nil {
29-
log.Warn().Err(rawBlock.Error).Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
30-
result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlock.Error)
28+
if rawBlockData.Result == nil {
29+
log.Warn().Err(rawBlockData.Error).Msgf("Received a nil block result for block %s.", rawBlockData.Key.String())
30+
result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlockData.Error)
3131
results = append(results, result)
3232
continue
3333
}
3434

35-
if rawBlock.Error != nil {
36-
result.Error = rawBlock.Error
35+
if rawBlockData.Error != nil {
36+
result.Error = rawBlockData.Error
3737
results = append(results, result)
3838
continue
3939
}
4040

41-
result.Data.Block = serializeBlock(chainId, rawBlock.Result)
41+
result.Data.Block = serializeBlock(chainId, rawBlockData.Result)
4242
blockTimestamp := result.Data.Block.Timestamp
4343

44-
if rawReceipts, exists := rawReceiptsMap[rawBlock.BlockNumber.String()]; exists {
44+
if rawReceipts, exists := rawReceiptsMap[rawBlockData.Key.String()]; exists {
4545
if rawReceipts.Error != nil {
4646
result.Error = rawReceipts.Error
4747
} else {
4848
result.Data.Logs = serializeLogsFromReceipts(chainId, rawReceipts.Result, result.Data.Block)
49-
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
49+
result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
5050
}
5151
} else {
52-
if rawLogs, exists := rawLogsMap[rawBlock.BlockNumber.String()]; exists {
52+
if rawLogs, exists := rawLogsMap[rawBlockData.Key.String()]; exists {
5353
if rawLogs.Error != nil {
5454
result.Error = rawLogs.Error
5555
} else {
5656
result.Data.Logs = serializeLogs(chainId, rawLogs.Result, result.Data.Block)
57-
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, nil)
57+
result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, nil)
5858
}
5959
}
6060
}
6161

6262
if result.Error == nil {
63-
if rawTraces, exists := rawTracesMap[rawBlock.BlockNumber.String()]; exists {
63+
if rawTraces, exists := rawTracesMap[rawBlockData.Key.String()]; exists {
6464
if rawTraces.Error != nil {
6565
result.Error = rawTraces.Error
6666
} else {
@@ -75,26 +75,26 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R
7575
return results
7676
}
7777

78-
func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[T]) map[string]*RPCFetchBatchResult[T] {
78+
func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[*big.Int, T]) map[string]*RPCFetchBatchResult[*big.Int, T] {
7979
if results == nil {
80-
return make(map[string]*RPCFetchBatchResult[T], 0)
80+
return make(map[string]*RPCFetchBatchResult[*big.Int, T], 0)
8181
}
82-
resultsMap := make(map[string]*RPCFetchBatchResult[T], len(results))
82+
resultsMap := make(map[string]*RPCFetchBatchResult[*big.Int, T], len(results))
8383
for _, result := range results {
84-
resultsMap[result.BlockNumber.String()] = &result
84+
resultsMap[result.Key.String()] = &result
8585
}
8686
return resultsMap
8787
}
8888

89-
func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock]) []GetBlocksResult {
89+
func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]) []GetBlocksResult {
9090
results := make([]GetBlocksResult, 0, len(blocks))
9191

9292
for _, rawBlock := range blocks {
9393
result := GetBlocksResult{
94-
BlockNumber: rawBlock.BlockNumber,
94+
BlockNumber: rawBlock.Key,
9595
}
9696
if rawBlock.Result == nil {
97-
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
97+
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.Key.String())
9898
result.Error = fmt.Errorf("received a nil block result from RPC")
9999
results = append(results, result)
100100
continue
@@ -473,3 +473,15 @@ func interfaceToJsonString(value interface{}) string {
473473
}
474474
return string(jsonString)
475475
}
476+
477+
func SerializeTransactions(chainId *big.Int, transactions []RPCFetchBatchResult[string, common.RawTransaction]) []GetTransactionsResult {
478+
results := make([]GetTransactionsResult, 0, len(transactions))
479+
for _, transaction := range transactions {
480+
result := GetTransactionsResult{
481+
Error: transaction.Error,
482+
Data: serializeTransaction(chainId, transaction.Result, time.Time{}, nil),
483+
}
484+
results = append(results, result)
485+
}
486+
return results
487+
}

0 commit comments

Comments
 (0)