Skip to content

Commit 31b9604

Browse files
authored
[ENH]: Prefetch posting list in query + compactor (#4257)
## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - ... - New functionality - Prefetches Posting lists in the beginning of query and compaction. Also, tracks which sparse index ids were already prefetched so as to not do it again. This state is not persistent across restarts and has a ttl. ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None
1 parent 8ec8ce5 commit 31b9604

File tree

8 files changed

+152
-36
lines changed

8 files changed

+152
-36
lines changed

rust/blockstore/src/arrow/provider.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ use chroma_storage::{
2121
admissioncontrolleds3::StorageRequestPriority, GetOptions, PutOptions, Storage,
2222
};
2323
use futures::{stream::FuturesUnordered, StreamExt};
24-
use std::sync::Arc;
24+
use std::{
25+
collections::HashMap,
26+
sync::Arc,
27+
time::{Duration, SystemTime, UNIX_EPOCH},
28+
};
2529
use thiserror::Error;
2630
use tracing::{Instrument, Span};
2731
use uuid::Uuid;
@@ -43,6 +47,8 @@ impl ChromaError for ArrowBlockfileProviderPrefetchError {
4347
}
4448
}
4549

50+
const PREFETCH_TTL_HOURS: u64 = 8;
51+
4652
/// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production).
4753
/// For now, it keeps a simple local cache of blockfiles.
4854
#[derive(Clone)]
@@ -83,6 +89,9 @@ impl ArrowBlockfileProvider {
8389
}
8490

8591
pub async fn prefetch(&self, id: &Uuid) -> Result<usize, ArrowBlockfileProviderPrefetchError> {
92+
if !self.root_manager.should_prefetch(id) {
93+
return Ok(0);
94+
}
8695
// We call .get_all_block_ids() here instead of just reading the root because reading the root requires a concrete Key type.
8796
let block_ids = self
8897
.root_manager
@@ -99,10 +108,13 @@ impl ArrowBlockfileProvider {
99108
}
100109
let count = futures.len();
101110

111+
tracing::info!("Prefetching {} blocks for blockfile ID: {:?}", count, id);
112+
102113
while let Some(result) = futures.next().await {
103114
result?;
104115
}
105116

117+
tracing::info!("Prefetched {} blocks for blockfile ID: {:?}", count, id);
106118
Ok(count)
107119
}
108120

@@ -175,6 +187,7 @@ impl ArrowBlockfileProvider {
175187
pub async fn clear(&self) -> Result<(), CacheError> {
176188
self.block_manager.block_cache.clear().await?;
177189
self.root_manager.cache.clear().await?;
190+
self.root_manager.prefetched_roots.lock().clear();
178191
Ok(())
179192
}
180193
}
@@ -466,12 +479,18 @@ impl ChromaError for RootManagerError {
466479
pub struct RootManager {
467480
cache: Arc<dyn PersistentCache<Uuid, RootReader>>,
468481
storage: Storage,
482+
// Sparse indexes that have already been prefetched and don't need to be prefetched again.
483+
prefetched_roots: Arc<parking_lot::Mutex<HashMap<Uuid, Duration>>>,
469484
}
470485

471486
impl RootManager {
472487
pub fn new(storage: Storage, cache: Box<dyn PersistentCache<Uuid, RootReader>>) -> Self {
473488
let cache: Arc<dyn PersistentCache<Uuid, RootReader>> = cache.into();
474-
Self { cache, storage }
489+
Self {
490+
cache,
491+
storage,
492+
prefetched_roots: Arc::new(parking_lot::Mutex::new(HashMap::new())),
493+
}
475494
}
476495

477496
pub async fn get<'new, K: ArrowReadableKey<'new> + 'new>(
@@ -580,6 +599,41 @@ impl RootManager {
580599
// This will be replaced with a full prefix-based storage shortly
581600
format!("sparse_index/{}", id)
582601
}
602+
603+
fn should_prefetch(&self, id: &Uuid) -> bool {
604+
let mut lock_guard = self.prefetched_roots.lock();
605+
let expires_at = lock_guard.get(id);
606+
match expires_at {
607+
Some(expires_at) => {
608+
if SystemTime::now()
609+
.duration_since(UNIX_EPOCH)
610+
.expect("Do not deploy before UNIX epoch")
611+
< *expires_at
612+
{
613+
false
614+
} else {
615+
lock_guard.insert(
616+
*id,
617+
SystemTime::now()
618+
.duration_since(UNIX_EPOCH)
619+
.expect("Do not deploy before UNIX epoch")
620+
+ std::time::Duration::from_secs(PREFETCH_TTL_HOURS * 3600),
621+
);
622+
true
623+
}
624+
}
625+
None => {
626+
lock_guard.insert(
627+
*id,
628+
SystemTime::now()
629+
.duration_since(UNIX_EPOCH)
630+
.expect("Do not deploy before UNIX epoch")
631+
+ std::time::Duration::from_secs(PREFETCH_TTL_HOURS * 3600),
632+
);
633+
true
634+
}
635+
}
636+
}
583637
}
584638

585639
#[cfg(test)]

rust/segment/src/blockfile_metadata.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ use chroma_index::metadata::types::{
1212
MetadataIndexError, MetadataIndexFlusher, MetadataIndexReader, MetadataIndexWriter,
1313
};
1414
use chroma_types::SegmentType;
15+
use chroma_types::BOOL_METADATA;
16+
use chroma_types::F32_METADATA;
17+
use chroma_types::FULL_TEXT_PLS;
18+
use chroma_types::STRING_METADATA;
19+
use chroma_types::U32_METADATA;
1520
use chroma_types::{MaterializedLogOperation, MetadataValue, Segment, SegmentUuid};
1621
use core::panic;
1722
use roaring::RoaringBitmap;
@@ -21,12 +26,6 @@ use tantivy::tokenizer::NgramTokenizer;
2126
use thiserror::Error;
2227
use uuid::Uuid;
2328

24-
const FULL_TEXT_PLS: &str = "full_text_pls";
25-
const STRING_METADATA: &str = "string_metadata";
26-
const BOOL_METADATA: &str = "bool_metadata";
27-
const F32_METADATA: &str = "f32_metadata";
28-
const U32_METADATA: &str = "u32_metadata";
29-
3029
#[derive(Clone)]
3130
pub struct MetadataSegmentWriter<'me> {
3231
pub(crate) full_text_index_writer: Option<FullTextIndexWriter>,

rust/segment/src/blockfile_record.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use chroma_blockstore::{
66
};
77
use chroma_error::{ChromaError, ErrorCodes};
88
use chroma_index::fulltext::types::FullTextIndexError;
9-
use chroma_types::{DataRecord, MaterializedLogOperation, Segment, SegmentType, SegmentUuid};
9+
use chroma_types::{
10+
DataRecord, MaterializedLogOperation, Segment, SegmentType, SegmentUuid, MAX_OFFSET_ID,
11+
OFFSET_ID_TO_DATA, OFFSET_ID_TO_USER_ID, USER_ID_TO_OFFSET_ID,
12+
};
1013
use futures::{Stream, StreamExt, TryStreamExt};
1114
use std::collections::HashMap;
1215
use std::fmt::{self, Debug, Formatter};
@@ -16,11 +19,6 @@ use std::sync::Arc;
1619
use thiserror::Error;
1720
use uuid::Uuid;
1821

19-
const USER_ID_TO_OFFSET_ID: &str = "user_id_to_offset_id";
20-
const OFFSET_ID_TO_USER_ID: &str = "offset_id_to_user_id";
21-
const OFFSET_ID_TO_DATA: &str = "offset_id_to_data";
22-
const MAX_OFFSET_ID: &str = "max_offset_id";
23-
2422
#[derive(Clone)]
2523
pub struct RecordSegmentWriter {
2624
// These are Option<> so that we can take() them when we commit

rust/segment/src/distributed_spann.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@ use chroma_index::{hnsw_provider::HnswIndexProvider, spann::types::SpannIndexWri
1818
use chroma_types::Collection;
1919
use chroma_types::InternalSpannConfiguration;
2020
use chroma_types::SegmentUuid;
21+
use chroma_types::HNSW_PATH;
22+
use chroma_types::MAX_HEAD_ID_BF_PATH;
23+
use chroma_types::POSTING_LIST_PATH;
24+
use chroma_types::VERSION_MAP_PATH;
2125
use chroma_types::{MaterializedLogOperation, Segment, SegmentScope, SegmentType};
2226
use std::collections::HashMap;
2327
use std::fmt::Debug;
2428
use std::fmt::Formatter;
2529
use thiserror::Error;
2630
use uuid::Uuid;
2731

28-
const HNSW_PATH: &str = "hnsw_path";
29-
const VERSION_MAP_PATH: &str = "version_map_path";
30-
const POSTING_LIST_PATH: &str = "posting_list_path";
31-
const MAX_HEAD_ID_BF_PATH: &str = "max_head_id_path";
32-
3332
#[derive(Clone)]
3433
pub struct SpannSegmentWriter {
3534
index: SpannIndexWriter,

rust/types/src/segment.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,22 @@ use thiserror::Error;
99
use tonic::Status;
1010
use uuid::Uuid;
1111

12+
pub const USER_ID_TO_OFFSET_ID: &str = "user_id_to_offset_id";
13+
pub const OFFSET_ID_TO_USER_ID: &str = "offset_id_to_user_id";
14+
pub const OFFSET_ID_TO_DATA: &str = "offset_id_to_data";
15+
pub const MAX_OFFSET_ID: &str = "max_offset_id";
16+
17+
pub const FULL_TEXT_PLS: &str = "full_text_pls";
18+
pub const STRING_METADATA: &str = "string_metadata";
19+
pub const BOOL_METADATA: &str = "bool_metadata";
20+
pub const F32_METADATA: &str = "f32_metadata";
21+
pub const U32_METADATA: &str = "u32_metadata";
22+
23+
pub const HNSW_PATH: &str = "hnsw_path";
24+
pub const VERSION_MAP_PATH: &str = "version_map_path";
25+
pub const POSTING_LIST_PATH: &str = "posting_list_path";
26+
pub const MAX_HEAD_ID_BF_PATH: &str = "max_head_id_path";
27+
1228
/// SegmentUuid is a wrapper around Uuid to provide a type for the segment id.
1329
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
1430
pub struct SegmentUuid(pub Uuid);
@@ -94,6 +110,33 @@ pub struct Segment {
94110
pub file_path: HashMap<String, Vec<String>>,
95111
}
96112

113+
impl Segment {
114+
pub fn prefetch_supported(&self) -> bool {
115+
matches!(
116+
self.r#type,
117+
SegmentType::BlockfileMetadata | SegmentType::BlockfileRecord | SegmentType::Spann
118+
)
119+
}
120+
121+
pub fn filepaths_to_prefetch(&self) -> Vec<String> {
122+
let mut res = Vec::new();
123+
match self.r#type {
124+
SegmentType::Spann => {
125+
if let Some(pl_path) = self.file_path.get(POSTING_LIST_PATH) {
126+
res.extend(pl_path.iter().cloned());
127+
}
128+
}
129+
SegmentType::BlockfileMetadata | SegmentType::BlockfileRecord => {
130+
for paths in self.file_path.values() {
131+
res.extend(paths.iter().cloned());
132+
}
133+
}
134+
_ => {}
135+
}
136+
res
137+
}
138+
}
139+
97140
#[derive(Error, Debug)]
98141
pub enum SegmentConversionError {
99142
#[error("Invalid UUID")]

rust/worker/src/execution/operators/prefetch_segment.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,27 +67,18 @@ impl Operator<PrefetchSegmentInput, PrefetchSegmentOutput> for PrefetchSegmentOp
6767
&self,
6868
input: &PrefetchSegmentInput,
6969
) -> Result<PrefetchSegmentOutput, PrefetchSegmentError> {
70-
if input.segment.r#type != SegmentType::BlockfileMetadata
71-
&& input.segment.r#type != SegmentType::BlockfileRecord
72-
{
70+
if !input.segment.prefetch_supported() {
7371
return Err(PrefetchSegmentError::UnsupportedSegmentType(
7472
input.segment.r#type,
7573
));
7674
}
7775

78-
tracing::info!(
79-
"Prefetching segment: {:?} ({:?})",
80-
input.segment.r#type,
81-
input.segment.id,
82-
);
83-
8476
let mut futures = input
8577
.segment
86-
.file_path
87-
.values()
88-
.flatten()
78+
.filepaths_to_prefetch()
79+
.into_iter()
8980
.map(|blockfile_id| async move {
90-
let blockfile_id = Uuid::parse_str(blockfile_id)?;
81+
let blockfile_id = Uuid::parse_str(&blockfile_id)?;
9182
let count = input.blockfile_provider.prefetch(&blockfile_id).await?;
9283
Ok::<_, PrefetchSegmentError>(count)
9384
})

rust/worker/src/execution/orchestration/compact.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -644,14 +644,14 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
644644
Some(writer) => writer,
645645
None => return,
646646
};
647-
let vector_writer = match vector_segment.r#type {
647+
let (vector_writer, is_vector_segment_spann) = match vector_segment.r#type {
648648
SegmentType::Spann => match self.ok_or_terminate(
649649
self.spann_provider
650650
.write(&collection, &vector_segment, dimension)
651651
.await,
652652
ctx,
653653
) {
654-
Some(writer) => VectorSegmentWriter::Spann(writer),
654+
Some(writer) => (VectorSegmentWriter::Spann(writer), true),
655655
None => return,
656656
},
657657
_ => match self.ok_or_terminate(
@@ -665,7 +665,7 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
665665
.map_err(|err| *err),
666666
ctx,
667667
) {
668-
Some(writer) => VectorSegmentWriter::Hnsw(writer),
668+
Some(writer) => (VectorSegmentWriter::Hnsw(writer), false),
669669
None => return,
670670
},
671671
};
@@ -690,7 +690,13 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
690690
// Prefetch segments
691691
let prefetch_segments = match self.rebuild {
692692
true => vec![output.record_segment],
693-
false => vec![output.metadata_segment, output.record_segment],
693+
false => {
694+
let mut segments = vec![output.metadata_segment, output.record_segment];
695+
if is_vector_segment_spann {
696+
segments.push(output.vector_segment);
697+
}
698+
segments
699+
}
694700
};
695701
for segment in prefetch_segments {
696702
let prefetch_task = wrap(

rust/worker/src/execution/orchestration/spann_knn.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use crate::execution::operators::{
1717
prefetch_record::{
1818
PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, PrefetchRecordOutput,
1919
},
20+
prefetch_segment::{
21+
PrefetchSegmentError, PrefetchSegmentInput, PrefetchSegmentOperator, PrefetchSegmentOutput,
22+
},
2023
spann_bf_pl::{SpannBfPlError, SpannBfPlInput, SpannBfPlOperator, SpannBfPlOutput},
2124
spann_centers_search::{
2225
SpannCentersSearchError, SpannCentersSearchInput, SpannCentersSearchOperator,
@@ -170,6 +173,16 @@ impl Orchestrator for SpannKnnOrchestrator {
170173
);
171174
tasks.push(head_search_task);
172175

176+
let prefetch_task = wrap(
177+
Box::new(PrefetchSegmentOperator::new()),
178+
PrefetchSegmentInput::new(
179+
self.knn_filter_output.vector_segment.clone(),
180+
self.spann_provider.blockfile_provider.clone(),
181+
),
182+
ctx.receiver(),
183+
);
184+
tasks.push(prefetch_task);
185+
173186
tasks
174187
}
175188

@@ -188,6 +201,19 @@ impl Orchestrator for SpannKnnOrchestrator {
188201
}
189202
}
190203

204+
#[async_trait]
205+
impl Handler<TaskResult<PrefetchSegmentOutput, PrefetchSegmentError>> for SpannKnnOrchestrator {
206+
type Result = ();
207+
208+
async fn handle(
209+
&mut self,
210+
_: TaskResult<PrefetchSegmentOutput, PrefetchSegmentError>,
211+
_: &ComponentContext<SpannKnnOrchestrator>,
212+
) {
213+
// Nothing to do.
214+
}
215+
}
216+
191217
#[async_trait]
192218
impl Handler<TaskResult<KnnLogOutput, KnnLogError>> for SpannKnnOrchestrator {
193219
type Result = ();

0 commit comments

Comments
 (0)