Skip to content

Commit 9f4504d

Browse files
authored
poll new blocks in the committer when in live mode (#226)
### TL;DR Added support for real-time block committing in addition to backfill mode. ### What changed? - Added a new `getBlockToCommitUntil` function that determines the end block for committing based on work mode - In real-time mode, the committer now checks the latest block from RPC and uses that as the upper bound if it's less than the calculated end block - Created a `fetchBlockDataToCommit` function that handles different data retrieval strategies based on work mode: - In backfill mode: retrieves data from staging storage - In real-time mode: uses a `BoundlessPoller` to fetch data directly without saving to staging first - Updated the `getBlockNumbersToCommit` function to accept a context parameter - Refactored `getSequentialBlockDataToCommit` to use the new functions ### How to test? 1. Run the application in real-time mode and verify it correctly commits blocks up to the latest block from RPC 2. Run the application in backfill mode and verify it commits blocks in batches of `blocksPerCommit` 3. Verify that when switching between modes, the committer correctly adjusts its behavior ### Why make this change? This change enables the committer to operate in two distinct modes: 1. Backfill mode: for historical data processing where blocks are first staged then committed 2. Real-time mode: for processing current blocks directly from the RPC without staging This dual-mode approach improves efficiency by allowing real-time processing to skip the staging step when immediate data is needed, while maintaining the ability to process historical data in batches. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved block data fetching logic to dynamically adjust commit ranges based on current mode and blockchain state. - **Bug Fixes** - Updated gap handling to skip unnecessary operations in live mode, reducing redundant processing. - **Tests** - Enhanced test coverage to reflect new context-based method signatures and work mode handling. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 918459c + abb607d commit 9f4504d

File tree

2 files changed

+80
-19
lines changed

2 files changed

+80
-19
lines changed

internal/orchestrator/committer.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (c *Committer) Start(ctx context.Context) {
105105
}
106106
}
107107

108-
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
108+
func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
109109
startTime := time.Now()
110110
defer func() {
111111
log.Debug().Str("metric", "get_block_numbers_to_commit_duration").Msgf("getBlockNumbersToCommit duration: %f", time.Since(startTime).Seconds())
@@ -129,7 +129,10 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
129129
}
130130

131131
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
132-
endBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
132+
endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber)
133+
if err != nil {
134+
return nil, fmt.Errorf("error getting block to commit until: %v", err)
135+
}
133136

134137
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
135138
blockNumbers := make([]*big.Int, blockCount)
@@ -140,26 +143,64 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
140143
return blockNumbers, nil
141144
}
142145

146+
func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
147+
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
148+
if c.workMode == WorkModeBackfill {
149+
return untilBlock, nil
150+
} else {
151+
// get latest block from RPC and if that's less than until block, return that
152+
latestBlock, err := c.rpc.GetLatestBlockNumber(ctx)
153+
if err != nil {
154+
return nil, fmt.Errorf("error getting latest block from RPC: %v", err)
155+
}
156+
if latestBlock.Cmp(untilBlock) < 0 {
157+
log.Debug().Msgf("Committing until latest block: %s", latestBlock.String())
158+
return latestBlock, nil
159+
}
160+
return untilBlock, nil
161+
}
162+
}
163+
164+
func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
165+
if c.workMode == WorkModeBackfill {
166+
startTime := time.Now()
167+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
168+
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
169+
metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds())
170+
171+
if err != nil {
172+
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
173+
}
174+
if len(blocksData) == 0 {
175+
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64())
176+
c.handleMissingStagingData(ctx, blockNumbers)
177+
return nil, nil
178+
}
179+
return blocksData, nil
180+
} else {
181+
poller := NewBoundlessPoller(c.rpc, c.storage)
182+
blocksData, err := poller.PollWithoutSaving(ctx, blockNumbers)
183+
if err != nil {
184+
return nil, fmt.Errorf("poller error: %v", err)
185+
}
186+
return blocksData, nil
187+
}
188+
}
189+
143190
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
144-
blocksToCommit, err := c.getBlockNumbersToCommit()
191+
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
145192
if err != nil {
146193
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
147194
}
148195
if len(blocksToCommit) == 0 {
149196
return nil, nil
150197
}
151198

152-
startTime := time.Now()
153-
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
154-
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
155-
metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds())
156-
199+
blocksData, err := c.fetchBlockDataToCommit(ctx, blocksToCommit)
157200
if err != nil {
158-
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
201+
return nil, err
159202
}
160203
if len(blocksData) == 0 {
161-
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
162-
c.handleMissingStagingData(ctx, blocksToCommit)
163204
return nil, nil
164205
}
165206

@@ -250,6 +291,11 @@ func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big
250291
// record the first missed block number in prometheus
251292
metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64()))
252293

294+
if c.workMode == WorkModeLive {
295+
log.Debug().Msgf("Skipping gap handling in live mode. Expected block %s, actual first block %s", expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
296+
return nil
297+
}
298+
253299
poller := NewBoundlessPoller(c.rpc, c.storage)
254300

255301
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()

internal/orchestrator/committer_test.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestNewCommitter(t *testing.T) {
2525
StagingStorage: mockStagingStorage,
2626
}
2727
committer := NewCommitter(mockRPC, mockStorage)
28+
committer.workMode = WorkModeBackfill
2829

2930
assert.NotNil(t, committer)
3031
assert.Equal(t, DEFAULT_COMMITTER_TRIGGER_INTERVAL, committer.triggerIntervalMs)
@@ -40,12 +41,13 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
4041
StagingStorage: mockStagingStorage,
4142
}
4243
committer := NewCommitter(mockRPC, mockStorage)
44+
committer.workMode = WorkModeBackfill
4345
chainID := big.NewInt(1)
4446

4547
mockRPC.EXPECT().GetChainID().Return(chainID)
4648
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
4749

48-
blockNumbers, err := committer.getBlockNumbersToCommit()
50+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
4951

5052
assert.NoError(t, err)
5153
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -63,12 +65,13 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) {
6365
StagingStorage: mockStagingStorage,
6466
}
6567
committer := NewCommitter(mockRPC, mockStorage)
68+
committer.workMode = WorkModeBackfill
6669
chainID := big.NewInt(1)
6770

6871
mockRPC.EXPECT().GetChainID().Return(chainID)
6972
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
7073

71-
blockNumbers, err := committer.getBlockNumbersToCommit()
74+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
7275

7376
assert.NoError(t, err)
7477
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -89,12 +92,13 @@ func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) {
8992
StagingStorage: mockStagingStorage,
9093
}
9194
committer := NewCommitter(mockRPC, mockStorage)
95+
committer.workMode = WorkModeBackfill
9296
chainID := big.NewInt(1)
9397

9498
mockRPC.EXPECT().GetChainID().Return(chainID)
9599
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
96100

97-
blockNumbers, err := committer.getBlockNumbersToCommit()
101+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
98102

99103
assert.NoError(t, err)
100104
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -115,12 +119,13 @@ func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) {
115119
StagingStorage: mockStagingStorage,
116120
}
117121
committer := NewCommitter(mockRPC, mockStorage)
122+
committer.workMode = WorkModeBackfill
118123
chainID := big.NewInt(1)
119124

120125
mockRPC.EXPECT().GetChainID().Return(chainID)
121126
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
122127

123-
blockNumbers, err := committer.getBlockNumbersToCommit()
128+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
124129

125130
assert.NoError(t, err)
126131
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -138,12 +143,13 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) {
138143
StagingStorage: mockStagingStorage,
139144
}
140145
committer := NewCommitter(mockRPC, mockStorage)
146+
committer.workMode = WorkModeBackfill
141147
chainID := big.NewInt(1)
142148

143149
mockRPC.EXPECT().GetChainID().Return(chainID)
144150
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
145151

146-
blockNumbers, err := committer.getBlockNumbersToCommit()
152+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
147153

148154
assert.NoError(t, err)
149155
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -164,12 +170,13 @@ func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) {
164170
StagingStorage: mockStagingStorage,
165171
}
166172
committer := NewCommitter(mockRPC, mockStorage)
173+
committer.workMode = WorkModeBackfill
167174
chainID := big.NewInt(1)
168175

169176
mockRPC.EXPECT().GetChainID().Return(chainID)
170177
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
171178

172-
blockNumbers, err := committer.getBlockNumbersToCommit()
179+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
173180

174181
assert.NoError(t, err)
175182
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -190,12 +197,13 @@ func TestGetBlockNumbersToCommitWithStoredLowerThanInMemory(t *testing.T) {
190197
StagingStorage: mockStagingStorage,
191198
}
192199
committer := NewCommitter(mockRPC, mockStorage)
200+
committer.workMode = WorkModeBackfill
193201
chainID := big.NewInt(1)
194202

195203
mockRPC.EXPECT().GetChainID().Return(chainID)
196204
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(99), nil)
197205

198-
blockNumbers, err := committer.getBlockNumbersToCommit()
206+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
199207

200208
assert.NoError(t, err)
201209
assert.Equal(t, 0, len(blockNumbers))
@@ -214,12 +222,13 @@ func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) {
214222
StagingStorage: mockStagingStorage,
215223
}
216224
committer := NewCommitter(mockRPC, mockStorage)
225+
committer.workMode = WorkModeBackfill
217226
chainID := big.NewInt(1)
218227

219228
mockRPC.EXPECT().GetChainID().Return(chainID)
220229
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
221230

222-
blockNumbers, err := committer.getBlockNumbersToCommit()
231+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
223232

224233
assert.NoError(t, err)
225234
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -239,6 +248,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
239248
StagingStorage: mockStagingStorage,
240249
}
241250
committer := NewCommitter(mockRPC, mockStorage)
251+
committer.workMode = WorkModeBackfill
242252
chainID := big.NewInt(1)
243253

244254
mockRPC.EXPECT().GetChainID().Return(chainID)
@@ -273,6 +283,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
273283
StagingStorage: mockStagingStorage,
274284
}
275285
committer := NewCommitter(mockRPC, mockStorage)
286+
committer.workMode = WorkModeBackfill
276287
chainID := big.NewInt(1)
277288

278289
mockRPC.EXPECT().GetChainID().Return(chainID)
@@ -311,6 +322,7 @@ func TestCommit(t *testing.T) {
311322
OrchestratorStorage: mockOrchestratorStorage,
312323
}
313324
committer := NewCommitter(mockRPC, mockStorage)
325+
committer.workMode = WorkModeBackfill
314326

315327
blockData := []common.BlockData{
316328
{Block: common.Block{Number: big.NewInt(101)}},
@@ -336,6 +348,7 @@ func TestHandleGap(t *testing.T) {
336348
OrchestratorStorage: mockOrchestratorStorage,
337349
}
338350
committer := NewCommitter(mockRPC, mockStorage)
351+
committer.workMode = WorkModeBackfill
339352

340353
expectedStartBlockNumber := big.NewInt(100)
341354
actualFirstBlock := common.Block{Number: big.NewInt(105)}
@@ -459,6 +472,7 @@ func TestHandleMissingStagingData(t *testing.T) {
459472
}
460473

461474
committer := NewCommitter(mockRPC, mockStorage)
475+
committer.workMode = WorkModeBackfill
462476

463477
chainID := big.NewInt(1)
464478
mockRPC.EXPECT().GetChainID().Return(chainID)
@@ -505,6 +519,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
505519
}
506520

507521
committer := NewCommitter(mockRPC, mockStorage)
522+
committer.workMode = WorkModeBackfill
508523

509524
chainID := big.NewInt(1)
510525
mockRPC.EXPECT().GetChainID().Return(chainID)

0 commit comments

Comments
 (0)