Skip to content

Commit 7a90409

Browse files
[BUG] Handle uninitialized spann segment reader (chroma-core#4348)
1 parent a55e548 commit 7a90409

File tree

1 file changed

+39
-29
lines changed

1 file changed

+39
-29
lines changed

rust/worker/src/execution/orchestration/spann_knn.rs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use async_trait::async_trait;
22
use chroma_distance::{normalize, DistanceFunction};
3-
use chroma_segment::{distributed_spann::SpannSegmentReader, spann_provider::SpannProvider};
3+
use chroma_segment::{
4+
distributed_spann::{SpannSegmentReader, SpannSegmentReaderError},
5+
spann_provider::SpannProvider,
6+
};
47
use chroma_system::{
58
wrap, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, TaskMessage,
69
TaskResult,
@@ -131,25 +134,6 @@ impl SpannKnnOrchestrator {
131134
self.send(task, ctx).await;
132135
}
133136
}
134-
135-
async fn set_spann_reader(&mut self, ctx: &ComponentContext<Self>) {
136-
let reader_res = SpannSegmentReader::from_segment(
137-
&self.collection,
138-
&self.knn_filter_output.vector_segment,
139-
&self.spann_provider.blockfile_provider,
140-
&self.spann_provider.hnsw_provider,
141-
self.knn_filter_output.dimension,
142-
)
143-
.await;
144-
let reader = match self.ok_or_terminate(reader_res, ctx).await {
145-
Some(reader) => reader,
146-
None => {
147-
tracing::error!("Failed to create SpannSegmentReader");
148-
return;
149-
}
150-
};
151-
self.spann_reader = Some(reader);
152-
}
153137
}
154138

155139
#[async_trait]
@@ -176,16 +160,42 @@ impl Orchestrator for SpannKnnOrchestrator {
176160
ctx.receiver(),
177161
);
178162
tasks.push(knn_log_task);
179-
self.set_spann_reader(ctx).await;
180-
let head_search_task = wrap(
181-
Box::new(self.head_search.clone()),
182-
SpannCentersSearchInput {
183-
reader: self.spann_reader.clone(),
184-
normalized_query: self.normalized_query_emb.clone(),
163+
let reader_res = SpannSegmentReader::from_segment(
164+
&self.collection,
165+
&self.knn_filter_output.vector_segment,
166+
&self.spann_provider.blockfile_provider,
167+
&self.spann_provider.hnsw_provider,
168+
self.knn_filter_output.dimension,
169+
)
170+
.await;
171+
match reader_res {
172+
Ok(reader) => {
173+
self.spann_reader = Some(reader.clone());
174+
// Spawn the centers search task if reader is found.
175+
let head_search_task = wrap(
176+
Box::new(self.head_search.clone()),
177+
SpannCentersSearchInput {
178+
reader: Some(reader),
179+
normalized_query: self.normalized_query_emb.clone(),
180+
},
181+
ctx.receiver(),
182+
);
183+
tasks.push(head_search_task);
184+
}
185+
Err(e) => match e {
186+
// Segment uninited means no compaction yet.
187+
SpannSegmentReaderError::UninitializedSegment => {
188+
// If the segment is uninitialized, we can skip the head search.
189+
self.spann_reader = None;
190+
self.heads_searched = true;
191+
}
192+
_ => {
193+
let _: Option<()> = self
194+
.ok_or_terminate(Err(KnnError::SpannSegmentReaderCreationError(e)), ctx)
195+
.await;
196+
}
185197
},
186-
ctx.receiver(),
187-
);
188-
tasks.push(head_search_task);
198+
}
189199

190200
let prefetch_task = wrap(
191201
Box::new(PrefetchSegmentOperator::new()),

0 commit comments

Comments
 (0)