Skip to content

Commit e6012cf

Browse files
author
sicheng
committed
Range scan instead of point lookup for bruteforce
1 parent 13df491 commit e6012cf

File tree

3 files changed

+28
-18
lines changed

3 files changed

+28
-18
lines changed

rust/segment/src/blockfile_record.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -828,10 +828,10 @@ impl RecordSegmentReader<'_> {
828828
pub async fn get_data_stream<'me>(
829829
&'me self,
830830
offset_range: impl RangeBounds<u32> + Clone + Send + 'me,
831-
) -> impl Stream<Item = Result<DataRecord<'me>, Box<dyn ChromaError>>> + 'me {
831+
) -> impl Stream<Item = Result<(u32, DataRecord<'me>), Box<dyn ChromaError>>> + 'me {
832832
self.id_to_data
833833
.get_range_stream(""..="", offset_range)
834-
.map(|res| res.map(|(_, _, rec)| rec))
834+
.map(|res| res.map(|(_, offset, rec)| (offset, rec)))
835835
}
836836

837837
/// Get a stream of offset ids from the smallest to the largest in the given range

rust/worker/src/execution/operators/filter.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -232,32 +232,42 @@ impl<'me> MetadataProvider<'me> {
232232
record_segment_reader,
233233
) {
234234
let literal_expr = LiteralExpr::from(chroma_regex.hir().clone());
235-
let approximate_matching_offset_ids = match fti_reader
235+
let approximate_matching_offset_ids = fti_reader
236236
.match_literal_expression(&literal_expr)
237237
.await
238-
.map_err(MetadataIndexError::from)?
239-
{
240-
Some(ids) => ids,
241-
None => rec_reader
242-
.get_offset_stream(..)
243-
.try_collect::<Vec<_>>()
244-
.await
245-
.map(|ids| ids.into_iter().collect())?,
246-
};
238+
.map_err(MetadataIndexError::from)?;
247239
let is_exact_match = chroma_regex.properties().look_set().is_empty()
248240
&& fti_reader.can_match_exactly(&literal_expr);
249241
if is_exact_match {
250-
Ok(approximate_matching_offset_ids)
242+
Ok(approximate_matching_offset_ids
243+
.unwrap_or(rec_reader.get_offset_stream(..).try_collect().await?))
251244
} else {
252245
let regex = chroma_regex.regex()?;
253246
let mut exact_matching_offset_ids = RoaringBitmap::new();
254-
for id in approximate_matching_offset_ids {
255-
if let Some(rec) = rec_reader.get_data_for_offset_id(id).await? {
256-
if rec.document.is_some_and(|doc| regex.is_match(doc)) {
257-
exact_matching_offset_ids.insert(id);
247+
match approximate_matching_offset_ids {
248+
Some(offset_ids) => {
249+
for id in offset_ids {
250+
if rec_reader.get_data_for_offset_id(id).await?.is_some_and(
251+
|rec| rec.document.is_some_and(|doc| regex.is_match(doc)),
252+
) {
253+
exact_matching_offset_ids.insert(id);
254+
}
255+
}
256+
}
257+
None => {
258+
for (offset, record) in rec_reader
259+
.get_data_stream(..)
260+
.await
261+
.try_collect::<Vec<_>>()
262+
.await?
263+
{
264+
if record.document.is_some_and(|doc| regex.is_match(doc)) {
265+
exact_matching_offset_ids.insert(offset);
266+
}
258267
}
259268
}
260269
}
270+
261271
Ok(exact_matching_offset_ids)
262272
}
263273
} else {

rust/worker/src/execution/operators/source_record_segment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl Operator<SourceRecordSegmentInput, SourceRecordSegmentOutput> for SourceRec
5555
.await
5656
.enumerate()
5757
.map(|(offset, res)| {
58-
res.map(|rec| LogRecord {
58+
res.map(|(_, rec)| LogRecord {
5959
// Log offset starts with 1
6060
log_offset: offset as i64 + 1,
6161
record: OperationRecord {

0 commit comments

Comments
 (0)