Skip to content

pass context to rpc and worker #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 21, 2025
Merged

Conversation

iuwqyir
Copy link
Collaborator

@iuwqyir iuwqyir commented May 20, 2025

TL;DR

Added context propagation throughout the codebase to improve cancellation handling and resource management.

What changed?

This PR adds proper context propagation throughout the codebase by:

  1. Adding context parameters to all RPC client interface methods:

    • GetFullBlocks
    • GetBlocks
    • GetTransactions
    • GetLatestBlockNumber
  2. Updating all orchestrator components to pass context to RPC calls:

    • ChainTracker
    • Committer
    • FailureRecoverer
    • Poller
    • ReorgHandler
  3. Propagating context through the worker package to ensure proper cancellation handling

  4. Updating all batch RPC operations to respect context cancellation

How to test?

  1. Run the application and verify that it functions normally
  2. Test context cancellation by:
    • Starting the application
    • Triggering a graceful shutdown
    • Verifying that in-progress operations are properly canceled

Why make this change?

This change improves resource management and application behavior by:

  1. Ensuring proper cancellation of in-progress operations when the application is shutting down
  2. Preventing resource leaks by propagating context through all layers of the application
  3. Following Go best practices for handling context propagation
  4. Enabling better timeout and cancellation handling for RPC operations

Summary by CodeRabbit

  • New Features

    • Added support for cancellation and timeout controls to blockchain data fetching and processing operations, improving responsiveness and robustness of long-running tasks.
    • Introduced context propagation to search functionality for better request handling and cancellation support.
  • Bug Fixes

    • Enhanced cancellation and timeout handling across multiple components to prevent unresponsive or hanging processes.
  • Tests

    • Updated tests to validate context-aware operations and ensure proper handling of cancellations and timeouts.

Copy link
Collaborator Author

iuwqyir commented May 20, 2025

@iuwqyir iuwqyir requested review from catalyst17 and nischitpra May 20, 2025 09:30
@iuwqyir iuwqyir marked this pull request as ready for review May 20, 2025 09:30
@iuwqyir iuwqyir force-pushed the 05-19-retry_fetching_block_data_with_decreasing_batch_size branch from 4f2ca9a to c692c5e Compare May 21, 2025 06:54
@iuwqyir iuwqyir force-pushed the 05-20-pass_context_to_rpc_and_worker branch from 205a0b8 to 5705df1 Compare May 21, 2025 06:54
Copy link

coderabbitai bot commented May 21, 2025

Walkthrough

This change propagates context.Context parameters throughout the orchestrator, RPC, worker, and test layers of the codebase. All major internal methods, RPC client interfaces, and worker routines now accept and forward a context, enabling cooperative cancellation and deadline handling for long-running and network-bound operations. Mock interfaces and tests are updated accordingly.

Changes

File(s) Change Summary
internal/orchestrator/chain_tracker.go Updated ChainTracker.Start to pass context to rpc.GetLatestBlockNumber.
internal/orchestrator/committer.go Added context.Context parameters to several methods (Start, getSequentialBlockDataToCommit, commit, handleGap, handleMissingStagingData) and propagated context to internal calls and polling operations.
internal/orchestrator/committer_test.go Updated tests to pass context.Context to committer and mock RPC methods. No logic changes.
internal/orchestrator/failure_recoverer.go Modified FailureRecoverer.Start to pass context to worker.Run.
internal/orchestrator/poller.go Added context.Context to Poll, getNextBlockRange, and internal calls. Propagated context throughout polling workflow.
internal/orchestrator/reorg_handler.go Added context.Context to multiple methods (Start, RunFromBlock, findReorgedBlockNumbers, getNewBlocksByNumber, handleReorg) and propagated context to all relevant internal and RPC client calls.
internal/orchestrator/reorg_handler_test.go Updated tests to pass context.Context to all relevant methods and mocks. No logic changes.
internal/rpc/batcher.go Added context.Context to RPCFetchInBatches and RPCFetchSingleBatch; all batch RPC calls now accept and propagate context.
internal/rpc/rpc.go Added context.Context to all RPC client interface methods and their implementations; all internal and external calls now propagate context.
internal/worker/worker.go Added context.Context to processChunkWithRetry and Run; added context checks for cancellation before and during chunk processing and retries. Propagated context to RPC calls.
test/mocks/MockIRPCClient.go Updated all mock methods, helpers, and expectations for MockIRPCClient to accept and propagate context.Context as the first parameter for all relevant RPC calls.
internal/handlers/search_handlers.go Added context.Context parameter propagation through search-related functions, including passing context to the RPC client's HasCode method.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Orchestrator
    participant Worker
    participant RPCClient
    participant Network

    Caller->>Orchestrator: Start(ctx)
    Orchestrator->>RPCClient: GetLatestBlockNumber(ctx)
    RPCClient->>Network: Fetch block number (with ctx)
    Network-->>RPCClient: Block number
    RPCClient-->>Orchestrator: Block number

    Orchestrator->>Worker: Run(ctx, blockNumbers)
    Worker->>RPCClient: GetFullBlocks(ctx, blockNumbers)
    RPCClient->>Network: Fetch blocks (with ctx)
    Network-->>RPCClient: Block data
    RPCClient-->>Worker: Block data
    Worker-->>Orchestrator: Results

    Note over Orchestrator,Worker: All long-running and RPC calls now respect ctx for cancellation and deadlines
Loading
sequenceDiagram
    participant Test
    participant MockIRPCClient

    Test->>MockIRPCClient: GetBlocks(ctx, blockNumbers)
    MockIRPCClient-->>Test: Mocked block results

    Test->>MockIRPCClient: GetFullBlocks(ctx, blockNumbers)
    MockIRPCClient-->>Test: Mocked full block results

    Note over Test,MockIRPCClient: All test invocations and mocks now require ctx as the first argument
Loading

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b3058e and dce6992.

📒 Files selected for processing (12)
  • internal/handlers/search_handlers.go (6 hunks)
  • internal/orchestrator/chain_tracker.go (1 hunks)
  • internal/orchestrator/committer.go (9 hunks)
  • internal/orchestrator/committer_test.go (9 hunks)
  • internal/orchestrator/failure_recoverer.go (1 hunks)
  • internal/orchestrator/poller.go (5 hunks)
  • internal/orchestrator/reorg_handler.go (8 hunks)
  • internal/orchestrator/reorg_handler_test.go (24 hunks)
  • internal/rpc/batcher.go (4 hunks)
  • internal/rpc/rpc.go (7 hunks)
  • internal/worker/worker.go (5 hunks)
  • test/mocks/MockIRPCClient.go (14 hunks)
✅ Files skipped from review due to trivial changes (1)
  • internal/orchestrator/chain_tracker.go
🚧 Files skipped from review as they are similar to previous changes (11)
  • internal/orchestrator/failure_recoverer.go
  • internal/orchestrator/committer_test.go
  • internal/rpc/rpc.go
  • internal/orchestrator/poller.go
  • internal/worker/worker.go
  • internal/rpc/batcher.go
  • internal/handlers/search_handlers.go
  • internal/orchestrator/committer.go
  • internal/orchestrator/reorg_handler.go
  • internal/orchestrator/reorg_handler_test.go
  • test/mocks/MockIRPCClient.go
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@iuwqyir iuwqyir force-pushed the 05-19-retry_fetching_block_data_with_decreasing_batch_size branch 2 times, most recently from afbb7e0 to 78e2b76 Compare May 21, 2025 06:56
@iuwqyir iuwqyir force-pushed the 05-20-pass_context_to_rpc_and_worker branch from 5705df1 to 5f133bc Compare May 21, 2025 06:56
@iuwqyir iuwqyir changed the base branch from 05-19-retry_fetching_block_data_with_decreasing_batch_size to graphite-base/205 May 21, 2025 06:58
@iuwqyir iuwqyir force-pushed the 05-20-pass_context_to_rpc_and_worker branch from 5f133bc to 9cb66bd Compare May 21, 2025 06:58
@graphite-app graphite-app bot changed the base branch from graphite-base/205 to main May 21, 2025 06:59
@iuwqyir iuwqyir force-pushed the 05-20-pass_context_to_rpc_and_worker branch from 9cb66bd to 150d5b4 Compare May 21, 2025 06:59
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (3)
internal/rpc/rpc.go (1)

301-307: 🛠️ Refactor suggestion

HasCode method still using context.Background()

While other RPC client methods have been updated to accept and use context parameters, the HasCode method still uses context.Background() which is inconsistent with the context propagation pattern.

-func (rpc *Client) HasCode(address string) (bool, error) {
-	code, err := rpc.EthClient.CodeAt(context.Background(), gethCommon.HexToAddress(address), nil)
+func (rpc *Client) HasCode(ctx context.Context, address string) (bool, error) {
+	code, err := rpc.EthClient.CodeAt(ctx, gethCommon.HexToAddress(address), nil)
	if err != nil {
		return false, err
	}
	return len(code) > 0, nil
}

Don't forget to update the interface definition:

type IRPCClient interface {
	GetFullBlocks(ctx context.Context, blockNumbers []*big.Int) []GetFullBlockResult
	GetBlocks(ctx context.Context, blockNumbers []*big.Int) []GetBlocksResult
	GetTransactions(ctx context.Context, txHashes []string) []GetTransactionsResult
	GetLatestBlockNumber(ctx context.Context) (*big.Int, error)
	GetChainID() *big.Int
	GetURL() string
	GetBlocksPerRequest() BlocksPerRequestConfig
	IsWebsocket() bool
	SupportsTraceBlock() bool
-	HasCode(address string) (bool, error)
+	HasCode(ctx context.Context, address string) (bool, error)
	Close()
}
internal/orchestrator/committer.go (1)

164-180: 🛠️ Refactor suggestion

Consider passing context to the asynchronous publisher operation

While the method now correctly accepts a context parameter, the asynchronous call to c.publisher.PublishBlockData in the goroutine doesn't receive the context. This means that if the context is canceled, the publishing operation might still continue.

 go func() {
-	if err := c.publisher.PublishBlockData(blockData); err != nil {
+	if err := c.publisher.PublishBlockData(ctx, blockData); err != nil {
 		log.Error().Err(err).Msg("Failed to publish block data to kafka")
 	}
 }()

This would require updating the Publisher interface and implementation to accept and respect the context.

internal/orchestrator/reorg_handler.go (1)

267-298: 🛠️ Refactor suggestion

Consider passing context to the asynchronous publisher operation

While the method now correctly accepts a context parameter and passes it to the worker's Run method, the asynchronous call to rh.publisher.PublishReorg in the goroutine doesn't receive the context. This means that if the context is canceled, the publishing operation might still continue.

 go func() {
-	if err := rh.publisher.PublishReorg(deletedBlockData, data); err != nil {
+	if err := rh.publisher.PublishReorg(ctx, deletedBlockData, data); err != nil {
 		log.Error().Err(err).Msg("Failed to publish reorg data to kafka")
 	}
 }()

This would require updating the Publisher interface and implementation to accept and respect the context.

🧹 Nitpick comments (5)
internal/worker/worker.go (1)

107-126: Consider adding context cancellation for result collection

After launching all goroutines, there's no context cancellation check when collecting and processing results. If the context is cancelled after goroutines are started but before they complete, the function will still wait for all results.

go func() {
+   done := make(chan struct{})
    wg.Wait()
-   close(resultsCh)
+   done <- struct{}{}
}()

-results := make([]rpc.GetFullBlockResult, 0, blockCount)
-for batchResults := range resultsCh {
-   results = append(results, batchResults...)
-}
+results := make([]rpc.GetFullBlockResult, 0, blockCount)
+loop:
+for {
+   select {
+   case <-ctx.Done():
+       log.Debug().Msg("Context canceled while collecting results")
+       return nil
+   case <-done:
+       close(resultsCh)
+       // Drain the channel after closing
+       for batchResults := range resultsCh {
+           results = append(results, batchResults...)
+       }
+       break loop
+   case batchResults, ok := <-resultsCh:
+       if !ok {
+           break loop
+       }
+       results = append(results, batchResults...)
+   }
+}
internal/rpc/batcher.go (2)

19-51: Consider adding context cancellation check before launching goroutines

The function launches goroutines without checking if the context is already cancelled, which could lead to unnecessary goroutine creation and work.

func RPCFetchInBatches[K any, T any](rpc *Client, ctx context.Context, keys []K, batchSize int, batchDelay int, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
+   // Check if context is already done
+   select {
+   case <-ctx.Done():
+       return []RPCFetchBatchResult[K, T]{}
+   default:
+       // continue processing
+   }
+
    if len(keys) <= batchSize {
        return RPCFetchSingleBatch[K, T](rpc, ctx, keys, method, argsFunc)
    }
    chunks := common.SliceToChunks[K](keys, batchSize)

    log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(keys), len(chunks), batchSize)

    var wg sync.WaitGroup
    resultsCh := make(chan []RPCFetchBatchResult[K, T], len(chunks))

    for _, chunk := range chunks {
+       select {
+       case <-ctx.Done():
+           log.Debug().Msg("Context canceled, stopping batch fetching")
+           // Skip launching more goroutines
+           goto WaitAndCollect
+       default:
+           // continue processing
+       }
+
        wg.Add(1)
        go func(chunk []K) {
            defer wg.Done()
            resultsCh <- RPCFetchSingleBatch[K, T](rpc, ctx, chunk, method, argsFunc)
            if batchDelay > 0 {
                time.Sleep(time.Duration(batchDelay) * time.Millisecond)
            }
        }(chunk)
    }
+WaitAndCollect:
    go func() {
        wg.Wait()
        close(resultsCh)
    }()

    results := make([]RPCFetchBatchResult[K, T], 0, len(keys))
    for batchResults := range resultsCh {
        results = append(results, batchResults...)
    }

    return results
}

53-83: Add context cancellation check in RPCFetchSingleBatch

The function should check if the context is cancelled before making the RPC call to avoid unnecessary network operations.

func RPCFetchSingleBatch[K any, T any](rpc *Client, ctx context.Context, keys []K, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
+   // Check if context is already done
+   select {
+   case <-ctx.Done():
+       // Return error results for all keys
+       results := make([]RPCFetchBatchResult[K, T], len(keys))
+       for i, key := range keys {
+           results[i] = RPCFetchBatchResult[K, T]{Key: key, Error: ctx.Err()}
+       }
+       return results
+   default:
+       // continue processing
+   }
+
    batch := make([]gethRpc.BatchElem, len(keys))
    results := make([]RPCFetchBatchResult[K, T], len(keys))

    for i, key := range keys {
        results[i] = RPCFetchBatchResult[K, T]{Key: key}
        batch[i] = gethRpc.BatchElem{
            Method: method,
            Args:   argsFunc(key),
            Result: new(T),
        }
    }

    err := rpc.RPCClient.BatchCallContext(ctx, batch)
    if err != nil {
        for i := range results {
            results[i].Error = err
        }
        return results
    }

    for i, elem := range batch {
        if elem.Error != nil {
            results[i].Error = elem.Error
        } else {
            results[i].Result = *elem.Result.(*T)
        }
    }

    return results
}
internal/rpc/rpc.go (1)

211-219: Consider updating setChainID to accept a context parameter

The setChainID method is an internal method that uses context.Background(). For consistency with the context propagation pattern, consider accepting a context parameter.

-func (rpc *Client) setChainID() error {
-	chainID, err := rpc.EthClient.ChainID(context.Background())
+func (rpc *Client) setChainID(ctx context.Context) error {
+	chainID, err := rpc.EthClient.ChainID(ctx)
	if err != nil {
		return fmt.Errorf("failed to get chain ID: %v", err)
	}
	rpc.chainID = chainID
	config.Cfg.RPC.ChainID = chainID.String()
	return nil
}

And update the call in the Initialize function:

-	chainIdErr := rpc.setChainID()
+	chainIdErr := rpc.setChainID(context.Background())
internal/orchestrator/reorg_handler_test.go (1)

1-787: Add tests for context cancellation scenarios

While all method calls have been updated to include context parameters, there are no tests that verify the behavior when a context is cancelled. Consider adding tests that explicitly test context cancellation scenarios.

Here's an example test that could be added:

func TestReorgHandlerRespectsContextCancellation(t *testing.T) {
	mockRPC := mocks.NewMockIRPCClient(t)
	mockMainStorage := mocks.NewMockIMainStorage(t)
	mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

	mockStorage := storage.IStorage{
		MainStorage:         mockMainStorage,
		OrchestratorStorage: mockOrchestratorStorage,
	}

	mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
	mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
	mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
	mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)

	// Create a context that will be cancelled
	ctx, cancel := context.WithCancel(context.Background())
	
	// Set up expectations for the first GetBlocks call
	mockRPC.EXPECT().GetBlocks(ctx, mock.Anything).Run(func(ctx context.Context, blockNumbers []*big.Int) {
		// Cancel the context before returning
		cancel()
	}).Return([]rpc.GetBlocksResult{})

	handler := NewReorgHandler(mockRPC, mockStorage)
	
	// The method should return early due to context cancellation
	_, err := handler.RunFromBlock(ctx, big.NewInt(99))
	
	assert.ErrorIs(t, err, context.Canceled)
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c8cf165 and 150d5b4.

📒 Files selected for processing (11)
  • internal/orchestrator/chain_tracker.go (1 hunks)
  • internal/orchestrator/committer.go (9 hunks)
  • internal/orchestrator/committer_test.go (9 hunks)
  • internal/orchestrator/failure_recoverer.go (1 hunks)
  • internal/orchestrator/poller.go (5 hunks)
  • internal/orchestrator/reorg_handler.go (8 hunks)
  • internal/orchestrator/reorg_handler_test.go (24 hunks)
  • internal/rpc/batcher.go (4 hunks)
  • internal/rpc/rpc.go (5 hunks)
  • internal/worker/worker.go (5 hunks)
  • test/mocks/MockIRPCClient.go (11 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (6)
internal/orchestrator/committer_test.go (1)
internal/rpc/rpc.go (1)
  • GetFullBlockResult (18-22)
internal/worker/worker.go (2)
internal/rpc/rpc.go (1)
  • GetFullBlockResult (18-22)
configs/config.go (1)
  • Cfg (170-170)
internal/orchestrator/reorg_handler_test.go (2)
internal/rpc/rpc.go (2)
  • GetBlocksResult (24-28)
  • GetFullBlockResult (18-22)
internal/common/block.go (1)
  • Block (8-33)
internal/rpc/batcher.go (1)
internal/rpc/rpc.go (1)
  • Client (56-65)
internal/orchestrator/committer.go (1)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
test/mocks/MockIRPCClient.go (1)
internal/rpc/rpc.go (3)
  • GetBlocksResult (24-28)
  • GetFullBlockResult (18-22)
  • GetTransactionsResult (30-33)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Graphite / mergeability_check
🔇 Additional comments (56)
internal/orchestrator/chain_tracker.go (1)

38-38: Good addition of context to RPC call

This change correctly passes the context to the GetLatestBlockNumber method, enabling proper cancellation and timeout handling during RPC operations. This aligns with the PR's goal of propagating context throughout the codebase.

internal/orchestrator/failure_recoverer.go (1)

78-78: Context properly propagated to worker

This change correctly passes the context to the worker's Run method, ensuring that cancellation signals are properly propagated from the failure recoverer to the worker. This is essential for graceful shutdown and resource management.

internal/orchestrator/committer_test.go (9)

257-257: Properly updated test with context parameter

This test now correctly includes the context parameter in the call to getSequentialBlockDataToCommit, maintaining consistency with the updated method signature.


293-293: Properly updated test with context parameter

This test now correctly includes the context parameter in the call to getSequentialBlockDataToCommit, maintaining consistency with the updated method signature.


323-323: Context properly passed to commit method in test

The test now correctly includes the context parameter in the commit method call, aligning with the method's updated signature.


346-346: Context propagated to RPC mock expectations

The mock expectation for GetFullBlocks now correctly includes the context parameter, ensuring that the test matches the updated interface contract.


355-355: Context properly passed to handleGap method in test

The test now correctly includes the context parameter in the handleGap method call, aligning with the method's updated signature.


466-466: Context properly passed to mock RPC expectations

The mock expectation for GetFullBlocks now correctly includes the context parameter, ensuring that the test matches the updated interface contract.


485-485: Properly updated test with context parameter

This test now correctly includes the context parameter in the call to getSequentialBlockDataToCommit, maintaining consistency with the updated method signature.


512-512: Context properly passed to mock RPC expectations

The mock expectation for GetFullBlocks now correctly includes the context parameter, ensuring that the test matches the updated interface contract.


529-529: Properly updated test with context parameter

This test now correctly includes the context parameter in the call to getSequentialBlockDataToCommit, maintaining consistency with the updated method signature.

internal/orchestrator/poller.go (6)

106-106: Context properly passed to getNextBlockRange method

This change correctly propagates the polling context to the getNextBlockRange method, ensuring that block range retrieval operations can be canceled when needed.


120-120: Context properly passed to Poll method

This change correctly propagates the polling context to the Poll method, ensuring that polling operations can be canceled when needed.


149-149: Method signature updated to accept context

The Poll method signature has been correctly updated to accept a context parameter, ensuring that context can be propagated through the polling flow.


164-164: Context properly passed to worker

This change correctly passes the context from the Poll method to the worker's Run method, ensuring that cancellation signals are properly propagated to the worker.


173-173: Method signature updated to accept context

The getNextBlockRange method signature has been correctly updated to accept a context parameter, allowing for context-aware block range retrieval.


174-174: Context properly passed to RPC call

This change correctly passes the context to the GetLatestBlockNumber method, enabling proper cancellation and timeout handling during RPC operations.

internal/worker/worker.go (8)

4-4: Add context package import to support cancellation

The addition of the context package import enables cooperative cancellation and deadline handling throughout the worker's operations.


27-32: Good implementation of context cancellation check

The early cancellation check at the beginning of processChunkWithRetry ensures we don't waste resources if the context is already cancelled before processing begins.


39-39: Propagate context to RPC calls

Correctly passing the context to the RPC client's GetFullBlocks method enables cancellation propagation down to the network layer.


71-71: Properly pass context to recursive calls

The context is correctly passed to recursive calls, ensuring cancellation can happen at any depth of retry attempts.


87-87: Context propagation in goroutines

Context is correctly passed to the parallel retry operations, allowing cancellation to be respected in both chunks when splitting failed blocks.

Also applies to: 92-92


98-98: Add context parameter to Run method

Adding the context parameter to the Run method allows the caller to control cancellation of the entire block processing operation.


108-114: Add context cancellation check in the main processing loop

This check ensures the worker stops processing new chunks if the context is cancelled, preventing unnecessary work.


119-119: Propagate context to chunk processing

Correctly passes the context to the chunk processing function, ensuring cancellation is respected in each processing goroutine.

internal/rpc/batcher.go (5)

19-19: Add context parameter to RPCFetchInBatches function

Adding a context parameter allows callers to control the lifecycle of batch operations, enabling cancellation propagation throughout the RPC layer.


21-21: Propagate context to single batch fetch

The context is correctly passed to the RPCFetchSingleBatch function when handling a small number of keys.


34-34: Pass context to batch operations in goroutines

The context is properly propagated to each batch operation running in a separate goroutine.


53-53: Add context parameter to RPCFetchSingleBatch function

This function now accepts a context parameter, allowing for cancellation control during the RPC batch call.


66-66: Use provided context for BatchCallContext

The function correctly uses the provided context when making the RPC batch call, allowing for cancellation during network operations.

internal/rpc/rpc.go (5)

42-53: Update IRPCClient interface to support context parameters

The interface has been properly updated to include context parameters in all relevant RPC methods, following Go best practices for context propagation.


221-261: Update GetFullBlocks method to accept and propagate context

The method now accepts a context parameter and passes it to all RPC calls, including multiple parallel goroutines, ensuring cancellation can be handled at all levels.


263-276: Update GetBlocks method to accept and propagate context

The method now accepts a context parameter and correctly passes it to the underlying RPC call.


278-291: Update GetTransactions method to accept and propagate context

The method now accepts a context parameter and correctly passes it to the underlying RPC call.


293-299: Update GetLatestBlockNumber method to use passed context

The method now accepts a context parameter and uses it when calling the Ethereum client's BlockNumber method.

internal/orchestrator/reorg_handler_test.go (6)

243-247: Update GetBlocks calls to include context parameter

All calls to the GetBlocks method have been correctly updated to include context.Background() as the first parameter, ensuring the tests match the updated method signature.

Also applies to: 277-281, 307-318, 357-362, 364-369, 410-420, 459-471, 600-607, 655-665


500-504: Update GetFullBlocks calls to include context parameter

All calls to the GetFullBlocks method have been correctly updated to include context.Background() as the first parameter, ensuring the tests match the updated method signature.

Also applies to: 609-611, 667-676, 731-737


250-250: Update findReorgedBlockNumbers calls to include context parameter

All calls to the findReorgedBlockNumbers method have been correctly updated to include context.Background() as the first parameter, ensuring the tests match the updated method signature.

Also applies to: 284-284, 333-333, 386-386, 435-435, 482-482


510-510: Update handleReorg call to include context parameter

The call to the handleReorg method has been correctly updated to include context.Background() as the first parameter, ensuring the test matches the updated method signature.


563-563: Update RunFromBlock calls to include context parameter

All calls to the RunFromBlock method have been correctly updated to include context.Background() as the first parameter, ensuring the tests match the updated method signature.

Also applies to: 618-618, 681-681, 744-744, 781-781, 818-818


539-539: Update to use context in Start method call

The call to the Start method has been updated to include context.Background(), ensuring proper context propagation for the reorg handler's continuous operation.

internal/orchestrator/committer.go (5)

54-66: Context propagation looks good!

The Start method now properly handles context cancellation in its select loop, and correctly passes the context to getSequentialBlockDataToCommit.


75-75: Context propagation looks good!

The context is correctly propagated to the commit method.


111-162: Context parameter correctly added to getSequentialBlockDataToCommit

The method now accepts a context parameter and properly propagates it to child method calls.


202-225: Context parameter correctly added to handleGap

The method now accepts a context parameter and properly propagates it to the Poll method call.


227-247: Context parameter correctly added to handleMissingStagingData

The method now accepts a context parameter and properly propagates it to the Poll method call.

internal/orchestrator/reorg_handler.go (4)

73-100: Context propagation looks good!

The Start method now properly handles context cancellation in its select loop, and correctly passes the context to RunFromBlock.

🧰 Tools
🪛 golangci-lint (1.64.8)

96-96: Error return value of rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber is not checked

(errcheck)


102-148: Context parameter correctly added to RunFromBlock

The method now accepts a context parameter and properly propagates it to child method calls.


193-225: Context properly propagated in findReorgedBlockNumbers

The method correctly passes the context to getNewBlocksByNumber and also to the recursive call to itself.


227-265: Context parameter correctly added to getNewBlocksByNumber

The method now accepts a context parameter and properly propagates it to the RPC GetBlocks call.

test/mocks/MockIRPCClient.go (6)

8-13: Context import added correctly

The context package is now properly imported.


61-79: Context parameter correctly added to GetBlocks mock

The mock method now accepts a context parameter, matching the updated interface.


87-108: Mock expectation helpers updated correctly

The helper methods for setting up expectations now handle the context parameter.


202-249: Context parameter correctly added to GetFullBlocks mock

The mock method and its helper methods now handle the context parameter.


251-307: Context parameter correctly added to GetLatestBlockNumber mock

The mock method and its helper methods now handle the context parameter.


309-356: Context parameter correctly added to GetTransactions mock

The mock method and its helper methods now handle the context parameter.

@iuwqyir iuwqyir force-pushed the 05-20-pass_context_to_rpc_and_worker branch from 150d5b4 to 3b3058e Compare May 21, 2025 07:11
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
internal/handlers/search_handlers.go (1)

61-93: 🛠️ Refactor suggestion

Consider propagating request context to downstream functions.

The Search function receives a Gin context but doesn't propagate the request context to downstream functions like executeSearch. This prevents proper cancellation propagation if the client disconnects.

func Search(c *gin.Context) {
+   ctx := c.Request.Context()
    
    chainId, err := api.GetChainId(c)
    if err != nil {
        api.BadRequestErrorHandler(c, err)
        return
    }
    searchInput := parseSearchInput(c.Param("input"))
    if searchInput.ErrorMessage != "" {
        api.BadRequestErrorHandler(c, fmt.Errorf(searchInput.ErrorMessage))
        return
    }

    mainStorage, err := getMainStorage()
    if err != nil {
        log.Error().Err(err).Msg("Error getting main storage")
        api.InternalErrorHandler(c)
        return
    }

-   result, err := executeSearch(mainStorage, chainId, searchInput)
+   result, err := executeSearch(ctx, mainStorage, chainId, searchInput)
    if err != nil {
        log.Error().Err(err).Msg("Error executing search")
        api.InternalErrorHandler(c)
        return
    }

    sendJSONResponse(c, api.QueryResponse{
        Meta: api.Meta{
            ChainId: chainId.Uint64(),
        },
        Data: result,
    })
}

Update other functions in the call chain accordingly to accept and propagate the context.

🧹 Nitpick comments (1)
internal/handlers/search_handlers.go (1)

333-374: Consider adding timeout to prevent long-running searches.

The searchByAddress function makes multiple storage and RPC calls without any timeout constraints. In line with the PR's goal of improving resource management, consider adding a timeout context to prevent potentially long-running searches from consuming resources indefinitely.

func searchByAddress(mainStorage storage.IMainStorage, chainId *big.Int, address string) (SearchResultModel, error) {
+   ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+   defer cancel()
    
    searchResult := SearchResultModel{Type: SearchResultTypeAddress}
-   contractCode, err := checkIfContractHasCode(chainId, address)
+   contractCode, err := checkIfContractHasCode(ctx, chainId, address)
    if err != nil {
        return searchResult, err
    }
    // ... rest of the function
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 150d5b4 and 3b3058e.

📒 Files selected for processing (12)
  • internal/handlers/search_handlers.go (2 hunks)
  • internal/orchestrator/chain_tracker.go (1 hunks)
  • internal/orchestrator/committer.go (9 hunks)
  • internal/orchestrator/committer_test.go (9 hunks)
  • internal/orchestrator/failure_recoverer.go (1 hunks)
  • internal/orchestrator/poller.go (5 hunks)
  • internal/orchestrator/reorg_handler.go (8 hunks)
  • internal/orchestrator/reorg_handler_test.go (24 hunks)
  • internal/rpc/batcher.go (4 hunks)
  • internal/rpc/rpc.go (7 hunks)
  • internal/worker/worker.go (5 hunks)
  • test/mocks/MockIRPCClient.go (14 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
  • internal/orchestrator/committer_test.go
  • internal/orchestrator/failure_recoverer.go
  • internal/orchestrator/chain_tracker.go
  • internal/orchestrator/poller.go
  • internal/worker/worker.go
  • internal/orchestrator/reorg_handler_test.go
  • internal/rpc/batcher.go
  • internal/orchestrator/reorg_handler.go
  • internal/rpc/rpc.go
  • internal/orchestrator/committer.go
  • test/mocks/MockIRPCClient.go
🔇 Additional comments (1)
internal/handlers/search_handlers.go (1)

4-4: Import context package for RPC client context support.

The addition of the context package is necessary to support the context parameters in RPC client method calls, which is part of the broader context propagation changes in this PR.

@iuwqyir iuwqyir force-pushed the 05-20-pass_context_to_rpc_and_worker branch from 3b3058e to dce6992 Compare May 21, 2025 07:19
@iuwqyir iuwqyir merged commit b32c54a into main May 21, 2025
6 checks passed
@iuwqyir iuwqyir deleted the 05-20-pass_context_to_rpc_and_worker branch May 21, 2025 09:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants