-
Notifications
You must be signed in to change notification settings - Fork 19
limit concurrent rpc requests in worker #206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughConcurrency control for RPC requests was introduced in the worker by adding a semaphore mechanism. The Changes
Sequence Diagram(s)sequenceDiagram
participant Run
participant processChunkWithRetry
participant RPC
Run->>processChunkWithRetry: Call with chunk, resultsCh, sem
processChunkWithRetry->>sem: Acquire semaphore slot
processChunkWithRetry->>RPC: GetFullBlocks RPC call
RPC-->>processChunkWithRetry: Return results
processChunkWithRetry->>sem: Release semaphore slot
processChunkWithRetry-->>Run: Send results
Note over Run: Delay (BatchDelay) between chunks after the first
Possibly related PRs
Suggested reviewers
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
18d61f3
to
37f5457
Compare
37f5457
to
aff8413
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/worker/worker.go (1)
104-106
: Consider making the semaphore limit configurable.The semaphore size of 20 is currently hardcoded. Since this is an important tuning parameter that might need adjustment based on environment or load patterns, consider making it configurable through your configuration system.
- // Create a semaphore channel to limit concurrent goroutines - sem := make(chan struct{}, 20) + // Create a semaphore channel to limit concurrent goroutines + concurrencyLimit := 20 + if config.Cfg.RPC.Blocks.MaxConcurrentRequests > 0 { + concurrencyLimit = config.Cfg.RPC.Blocks.MaxConcurrentRequests + } + sem := make(chan struct{}, concurrencyLimit)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/worker/worker.go
(6 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal/worker/worker.go (2)
internal/rpc/rpc.go (1)
GetFullBlockResult
(18-22)configs/config.go (1)
Cfg
(170-170)
🔇 Additional comments (6)
internal/worker/worker.go (6)
27-27
: Function signature updated correctly to support concurrency control.The addition of the semaphore parameter to
processChunkWithRetry
is appropriate and consistent with the PR objectives to limit concurrent RPC requests.
34-37
: Efficient semaphore usage.Good implementation of the semaphore acquisition and release. By limiting the semaphore to only the RPC request portion, you're maximizing throughput while still controlling concurrency. The clear comments also help explain the intention.
57-57
: Useful debug logging added.This log message provides valuable information for monitoring and troubleshooting the chunk processing. It will help identify potential issues with specific blocks or RPC endpoints.
70-70
: Semaphore correctly passed to recursive calls.Ensuring the semaphore is passed to recursive calls is crucial for maintaining the concurrency limits.
86-86
: Semaphore consistently passed to all recursive paths.Good attention to detail ensuring all recursive calls include the semaphore, which maintains the concurrency control throughout the entire processing lifecycle.
Also applies to: 91-91
109-112
: Improved batch processing timing.Moving the delay between initial dispatches of chunks rather than after processing each chunk provides more predictable pacing of RPC requests. Skipping the delay for the first chunk makes sense to start processing immediately.
TL;DR
Implemented a semaphore to limit concurrent RPC requests and improved batch processing timing.
What changed?
processChunkWithRetry
function to accept and use the semaphoreHow to test?
Why make this change?
The previous implementation could potentially create too many concurrent RPC requests during retry scenarios, which might overwhelm the RPC endpoint or the local system. By implementing a semaphore, we can control the maximum number of concurrent requests regardless of how many chunks are being processed or retried. Additionally, the improved timing of batch delays provides more predictable pacing of initial requests.
Summary by CodeRabbit
New Features
Refactor