Skip to content

Commit c3ca0b9

Browse files
committed
poll new blocks in the committer when in live mode
1 parent 4450a45 commit c3ca0b9

File tree

2 files changed

+62
-19
lines changed

2 files changed

+62
-19
lines changed

internal/orchestrator/committer.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (c *Committer) Start(ctx context.Context) {
105105
}
106106
}
107107

108-
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
108+
func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
109109
startTime := time.Now()
110110
defer func() {
111111
log.Debug().Str("metric", "get_block_numbers_to_commit_duration").Msgf("getBlockNumbersToCommit duration: %f", time.Since(startTime).Seconds())
@@ -129,7 +129,10 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
129129
}
130130

131131
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
132-
endBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
132+
endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber)
133+
if err != nil {
134+
return nil, fmt.Errorf("error getting block to commit until: %v", err)
135+
}
133136

134137
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
135138
blockNumbers := make([]*big.Int, blockCount)
@@ -140,26 +143,61 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
140143
return blockNumbers, nil
141144
}
142145

146+
func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
147+
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
148+
if c.workMode == WorkModeBackfill {
149+
return untilBlock, nil
150+
} else {
151+
// get latest block from RPC and if that's less than until block, return that
152+
latestBlock, err := c.rpc.GetLatestBlockNumber(ctx)
153+
if err != nil {
154+
return nil, fmt.Errorf("error getting latest block from RPC: %v", err)
155+
}
156+
if latestBlock.Cmp(untilBlock) < 0 {
157+
log.Debug().Msgf("Committing until latest block: %s", latestBlock.String())
158+
return latestBlock, nil
159+
}
160+
return untilBlock, nil
161+
}
162+
}
163+
164+
func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
165+
if c.workMode == WorkModeBackfill {
166+
startTime := time.Now()
167+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
168+
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
169+
metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds())
170+
171+
if err != nil {
172+
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
173+
}
174+
if len(blocksData) == 0 {
175+
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64())
176+
c.handleMissingStagingData(ctx, blockNumbers)
177+
return nil, nil
178+
}
179+
return blocksData, nil
180+
} else {
181+
poller := NewBoundlessPoller(c.rpc, c.storage)
182+
blocksData, _ := poller.PollWithoutSaving(ctx, blockNumbers)
183+
return blocksData, nil
184+
}
185+
}
186+
143187
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
144-
blocksToCommit, err := c.getBlockNumbersToCommit()
188+
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
145189
if err != nil {
146190
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
147191
}
148192
if len(blocksToCommit) == 0 {
149193
return nil, nil
150194
}
151195

152-
startTime := time.Now()
153-
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
154-
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
155-
metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds())
156-
196+
blocksData, err := c.fetchBlockDataToCommit(ctx, blocksToCommit)
157197
if err != nil {
158-
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
198+
return nil, err
159199
}
160200
if len(blocksData) == 0 {
161-
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
162-
c.handleMissingStagingData(ctx, blocksToCommit)
163201
return nil, nil
164202
}
165203

@@ -250,6 +288,11 @@ func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big
250288
// record the first missed block number in prometheus
251289
metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64()))
252290

291+
if c.workMode == WorkModeLive {
292+
log.Debug().Msgf("Skipping gap handling in live mode. Expected block %s, actual first block %s", expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
293+
return nil
294+
}
295+
253296
poller := NewBoundlessPoller(c.rpc, c.storage)
254297

255298
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()

internal/orchestrator/committer_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
4545
mockRPC.EXPECT().GetChainID().Return(chainID)
4646
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
4747

48-
blockNumbers, err := committer.getBlockNumbersToCommit()
48+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
4949

5050
assert.NoError(t, err)
5151
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -68,7 +68,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) {
6868
mockRPC.EXPECT().GetChainID().Return(chainID)
6969
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
7070

71-
blockNumbers, err := committer.getBlockNumbersToCommit()
71+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
7272

7373
assert.NoError(t, err)
7474
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -94,7 +94,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) {
9494
mockRPC.EXPECT().GetChainID().Return(chainID)
9595
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
9696

97-
blockNumbers, err := committer.getBlockNumbersToCommit()
97+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
9898

9999
assert.NoError(t, err)
100100
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -120,7 +120,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) {
120120
mockRPC.EXPECT().GetChainID().Return(chainID)
121121
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
122122

123-
blockNumbers, err := committer.getBlockNumbersToCommit()
123+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
124124

125125
assert.NoError(t, err)
126126
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -143,7 +143,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) {
143143
mockRPC.EXPECT().GetChainID().Return(chainID)
144144
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
145145

146-
blockNumbers, err := committer.getBlockNumbersToCommit()
146+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
147147

148148
assert.NoError(t, err)
149149
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -169,7 +169,7 @@ func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) {
169169
mockRPC.EXPECT().GetChainID().Return(chainID)
170170
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
171171

172-
blockNumbers, err := committer.getBlockNumbersToCommit()
172+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
173173

174174
assert.NoError(t, err)
175175
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
@@ -195,7 +195,7 @@ func TestGetBlockNumbersToCommitWithStoredLowerThanInMemory(t *testing.T) {
195195
mockRPC.EXPECT().GetChainID().Return(chainID)
196196
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(99), nil)
197197

198-
blockNumbers, err := committer.getBlockNumbersToCommit()
198+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
199199

200200
assert.NoError(t, err)
201201
assert.Equal(t, 0, len(blockNumbers))
@@ -219,7 +219,7 @@ func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) {
219219
mockRPC.EXPECT().GetChainID().Return(chainID)
220220
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
221221

222-
blockNumbers, err := committer.getBlockNumbersToCommit()
222+
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
223223

224224
assert.NoError(t, err)
225225
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))

0 commit comments

Comments
 (0)