Skip to content

add rpc function to fetch transactions by hashes #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/common/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/rs/zerolog/log"
)

type RawTransaction = map[string]interface{}

type Transaction struct {
ChainId *big.Int `json:"chain_id" ch:"chain_id" swaggertype:"string"`
Hash string `json:"hash" ch:"hash"`
Expand Down
6 changes: 3 additions & 3 deletions internal/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"strings"
)

func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
func SliceToChunks[T any](values []T, chunkSize int) [][]T {
if chunkSize >= len(values) || chunkSize <= 0 {
return [][]*big.Int{values}
return [][]T{values}
}
var chunks [][]*big.Int
var chunks [][]T
for i := 0; i < len(values); i += chunkSize {
end := i + chunkSize
if end > len(values) {
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
blockNumbers = append(blockNumbers, header.Number)
}
blockCount := len(blockNumbers)
chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
chunks := common.SliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)

var wg sync.WaitGroup
resultsCh := make(chan []rpc.GetBlocksResult, len(chunks))
Expand Down
41 changes: 19 additions & 22 deletions internal/rpc/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rpc

import (
"context"
"math/big"
"sync"
"time"

Expand All @@ -11,28 +10,28 @@ import (
"github.com/thirdweb-dev/indexer/internal/common"
)

type RPCFetchBatchResult[T any] struct {
BlockNumber *big.Int
Error error
Result T
type RPCFetchBatchResult[K any, T any] struct {
Key K
Error error
Result T
}

func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize int, batchDelay int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] {
if len(blockNumbers) <= batchSize {
return RPCFetchBatch[T](rpc, blockNumbers, method, argsFunc)
func RPCFetchInBatches[K any, T any](rpc *Client, keys []K, batchSize int, batchDelay int, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
if len(keys) <= batchSize {
return RPCFetchSingleBatch[K, T](rpc, keys, method, argsFunc)
}
chunks := common.BigIntSliceToChunks(blockNumbers, batchSize)
chunks := common.SliceToChunks[K](keys, batchSize)

log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(blockNumbers), len(chunks), batchSize)
log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(keys), len(chunks), batchSize)

var wg sync.WaitGroup
resultsCh := make(chan []RPCFetchBatchResult[T], len(chunks))
resultsCh := make(chan []RPCFetchBatchResult[K, T], len(chunks))

for _, chunk := range chunks {
wg.Add(1)
go func(chunk []*big.Int) {
go func(chunk []K) {
defer wg.Done()
resultsCh <- RPCFetchBatch[T](rpc, chunk, method, argsFunc)
resultsCh <- RPCFetchSingleBatch[K, T](rpc, chunk, method, argsFunc)
if batchDelay > 0 {
time.Sleep(time.Duration(batchDelay) * time.Millisecond)
}
Expand All @@ -43,25 +42,23 @@ func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize in
close(resultsCh)
}()

results := make([]RPCFetchBatchResult[T], 0, len(blockNumbers))
results := make([]RPCFetchBatchResult[K, T], 0, len(keys))
for batchResults := range resultsCh {
results = append(results, batchResults...)
}

return results
}

func RPCFetchBatch[T any](rpc *Client, blockNumbers []*big.Int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] {
batch := make([]gethRpc.BatchElem, len(blockNumbers))
results := make([]RPCFetchBatchResult[T], len(blockNumbers))
func RPCFetchSingleBatch[K any, T any](rpc *Client, keys []K, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
batch := make([]gethRpc.BatchElem, len(keys))
results := make([]RPCFetchBatchResult[K, T], len(keys))

for i, blockNum := range blockNumbers {
results[i] = RPCFetchBatchResult[T]{
BlockNumber: blockNum,
}
for i, key := range keys {
results[i] = RPCFetchBatchResult[K, T]{Key: key}
batch[i] = gethRpc.BatchElem{
Method: method,
Args: argsFunc(blockNum),
Args: argsFunc(key),
Result: new(T),
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ func GetBlockWithTransactionsParams(blockNum *big.Int) []interface{} {
return []interface{}{hexutil.EncodeBig(blockNum), true}
}

func GetTransactionParams(txHash string) []interface{} {
return []interface{}{txHash}
}

func GetBlockWithoutTransactionsParams(blockNum *big.Int) []interface{} {
return []interface{}{hexutil.EncodeBig(blockNum), false}
}
Expand Down
42 changes: 32 additions & 10 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type GetBlocksResult struct {
Data common.Block
}

type GetTransactionsResult struct {
Error error
Data common.Transaction
}

type BlocksPerRequestConfig struct {
Blocks int
Logs int
Expand All @@ -37,13 +42,15 @@ type BlocksPerRequestConfig struct {
type IRPCClient interface {
GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult
GetBlocks(blockNumbers []*big.Int) []GetBlocksResult
GetTransactions(txHashes []string) []GetTransactionsResult
GetLatestBlockNumber() (*big.Int, error)
GetChainID() *big.Int
GetURL() string
GetBlocksPerRequest() BlocksPerRequestConfig
IsWebsocket() bool
SupportsTraceBlock() bool
HasCode(address string) (bool, error)
Close()
}

type Client struct {
Expand Down Expand Up @@ -213,28 +220,28 @@ func (rpc *Client) setChainID() error {

func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
var wg sync.WaitGroup
var blocks []RPCFetchBatchResult[common.RawBlock]
var logs []RPCFetchBatchResult[common.RawLogs]
var traces []RPCFetchBatchResult[common.RawTraces]
var receipts []RPCFetchBatchResult[common.RawReceipts]
var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]
var logs []RPCFetchBatchResult[*big.Int, common.RawLogs]
var traces []RPCFetchBatchResult[*big.Int, common.RawTraces]
var receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]
wg.Add(2)

go func() {
defer wg.Done()
result := RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
result := RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
blocks = result
}()

if rpc.supportsBlockReceipts {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
result := RPCFetchInBatches[*big.Int, common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
receipts = result
}()
} else {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
result := RPCFetchInBatches[*big.Int, common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
logs = result
}()
}
Expand All @@ -243,7 +250,7 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
wg.Add(1)
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
result := RPCFetchInBatches[*big.Int, common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
traces = result
}()
}
Expand All @@ -255,19 +262,34 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {

func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult {
var wg sync.WaitGroup
var blocks []RPCFetchBatchResult[common.RawBlock]
var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]

wg.Add(1)

go func() {
defer wg.Done()
blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
}()
wg.Wait()

return SerializeBlocks(rpc.chainID, blocks)
}

func (rpc *Client) GetTransactions(txHashes []string) []GetTransactionsResult {
var wg sync.WaitGroup
var transactions []RPCFetchBatchResult[string, common.RawTransaction]

wg.Add(1)

go func() {
defer wg.Done()
transactions = RPCFetchSingleBatch[string, common.RawTransaction](rpc, txHashes, "eth_getTransactionByHash", GetTransactionParams)
}()
wg.Wait()

return SerializeTransactions(rpc.chainID, transactions)
}

func (rpc *Client) GetLatestBlockNumber() (*big.Int, error) {
blockNumber, err := rpc.EthClient.BlockNumber(context.Background())
if err != nil {
Expand Down
54 changes: 33 additions & 21 deletions internal/rpc/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/thirdweb-dev/indexer/internal/common"
)

func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock], logs []RPCFetchBatchResult[common.RawLogs], traces []RPCFetchBatchResult[common.RawTraces], receipts []RPCFetchBatchResult[common.RawReceipts]) []GetFullBlockResult {
func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock], logs []RPCFetchBatchResult[*big.Int, common.RawLogs], traces []RPCFetchBatchResult[*big.Int, common.RawTraces], receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]) []GetFullBlockResult {
if blocks == nil {
return []GetFullBlockResult{}
}
Expand All @@ -21,46 +21,46 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R
rawReceiptsMap := mapBatchResultsByBlockNumber[common.RawReceipts](receipts)
rawTracesMap := mapBatchResultsByBlockNumber[common.RawTraces](traces)

for _, rawBlock := range blocks {
for _, rawBlockData := range blocks {
result := GetFullBlockResult{
BlockNumber: rawBlock.BlockNumber,
BlockNumber: rawBlockData.Key,
}
if rawBlock.Result == nil {
log.Warn().Err(rawBlock.Error).Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlock.Error)
if rawBlockData.Result == nil {
log.Warn().Err(rawBlockData.Error).Msgf("Received a nil block result for block %s.", rawBlockData.Key.String())
result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlockData.Error)
results = append(results, result)
continue
}

if rawBlock.Error != nil {
result.Error = rawBlock.Error
if rawBlockData.Error != nil {
result.Error = rawBlockData.Error
results = append(results, result)
continue
}

result.Data.Block = serializeBlock(chainId, rawBlock.Result)
result.Data.Block = serializeBlock(chainId, rawBlockData.Result)
blockTimestamp := result.Data.Block.Timestamp

if rawReceipts, exists := rawReceiptsMap[rawBlock.BlockNumber.String()]; exists {
if rawReceipts, exists := rawReceiptsMap[rawBlockData.Key.String()]; exists {
if rawReceipts.Error != nil {
result.Error = rawReceipts.Error
} else {
result.Data.Logs = serializeLogsFromReceipts(chainId, rawReceipts.Result, result.Data.Block)
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
}
} else {
if rawLogs, exists := rawLogsMap[rawBlock.BlockNumber.String()]; exists {
if rawLogs, exists := rawLogsMap[rawBlockData.Key.String()]; exists {
if rawLogs.Error != nil {
result.Error = rawLogs.Error
} else {
result.Data.Logs = serializeLogs(chainId, rawLogs.Result, result.Data.Block)
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, nil)
result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, nil)
}
}
}

if result.Error == nil {
if rawTraces, exists := rawTracesMap[rawBlock.BlockNumber.String()]; exists {
if rawTraces, exists := rawTracesMap[rawBlockData.Key.String()]; exists {
if rawTraces.Error != nil {
result.Error = rawTraces.Error
} else {
Expand All @@ -75,26 +75,26 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R
return results
}

func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[T]) map[string]*RPCFetchBatchResult[T] {
func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[*big.Int, T]) map[string]*RPCFetchBatchResult[*big.Int, T] {
if results == nil {
return make(map[string]*RPCFetchBatchResult[T], 0)
return make(map[string]*RPCFetchBatchResult[*big.Int, T], 0)
}
resultsMap := make(map[string]*RPCFetchBatchResult[T], len(results))
resultsMap := make(map[string]*RPCFetchBatchResult[*big.Int, T], len(results))
for _, result := range results {
resultsMap[result.BlockNumber.String()] = &result
resultsMap[result.Key.String()] = &result
}
return resultsMap
}

func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock]) []GetBlocksResult {
func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]) []GetBlocksResult {
results := make([]GetBlocksResult, 0, len(blocks))

for _, rawBlock := range blocks {
result := GetBlocksResult{
BlockNumber: rawBlock.BlockNumber,
BlockNumber: rawBlock.Key,
}
if rawBlock.Result == nil {
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.Key.String())
result.Error = fmt.Errorf("received a nil block result from RPC")
results = append(results, result)
continue
Expand Down Expand Up @@ -473,3 +473,15 @@ func interfaceToJsonString(value interface{}) string {
}
return string(jsonString)
}

func SerializeTransactions(chainId *big.Int, transactions []RPCFetchBatchResult[string, common.RawTransaction]) []GetTransactionsResult {
results := make([]GetTransactionsResult, 0, len(transactions))
for _, transaction := range transactions {
result := GetTransactionsResult{
Error: transaction.Error,
Data: serializeTransaction(chainId, transaction.Result, time.Time{}, nil),
}
results = append(results, result)
}
return results
}
2 changes: 1 addition & 1 deletion internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {

func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
blockCount := len(blockNumbers)
chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)
chunks := common.SliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)

var wg sync.WaitGroup
resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks))
Expand Down
Loading