1
1
use async_trait:: async_trait;
2
2
use chroma_distance:: { normalize, DistanceFunction } ;
3
- use chroma_segment:: { distributed_spann:: SpannSegmentReader , spann_provider:: SpannProvider } ;
3
+ use chroma_segment:: {
4
+ distributed_spann:: { SpannSegmentReader , SpannSegmentReaderError } ,
5
+ spann_provider:: SpannProvider ,
6
+ } ;
4
7
use chroma_system:: {
5
8
wrap, ComponentContext , ComponentHandle , Dispatcher , Handler , Orchestrator , TaskMessage ,
6
9
TaskResult ,
@@ -131,25 +134,6 @@ impl SpannKnnOrchestrator {
131
134
self . send ( task, ctx) . await ;
132
135
}
133
136
}
134
-
135
- async fn set_spann_reader ( & mut self , ctx : & ComponentContext < Self > ) {
136
- let reader_res = SpannSegmentReader :: from_segment (
137
- & self . collection ,
138
- & self . knn_filter_output . vector_segment ,
139
- & self . spann_provider . blockfile_provider ,
140
- & self . spann_provider . hnsw_provider ,
141
- self . knn_filter_output . dimension ,
142
- )
143
- . await ;
144
- let reader = match self . ok_or_terminate ( reader_res, ctx) . await {
145
- Some ( reader) => reader,
146
- None => {
147
- tracing:: error!( "Failed to create SpannSegmentReader" ) ;
148
- return ;
149
- }
150
- } ;
151
- self . spann_reader = Some ( reader) ;
152
- }
153
137
}
154
138
155
139
#[ async_trait]
@@ -176,16 +160,42 @@ impl Orchestrator for SpannKnnOrchestrator {
176
160
ctx. receiver ( ) ,
177
161
) ;
178
162
tasks. push ( knn_log_task) ;
179
- self . set_spann_reader ( ctx) . await ;
180
- let head_search_task = wrap (
181
- Box :: new ( self . head_search . clone ( ) ) ,
182
- SpannCentersSearchInput {
183
- reader : self . spann_reader . clone ( ) ,
184
- normalized_query : self . normalized_query_emb . clone ( ) ,
163
+ let reader_res = SpannSegmentReader :: from_segment (
164
+ & self . collection ,
165
+ & self . knn_filter_output . vector_segment ,
166
+ & self . spann_provider . blockfile_provider ,
167
+ & self . spann_provider . hnsw_provider ,
168
+ self . knn_filter_output . dimension ,
169
+ )
170
+ . await ;
171
+ match reader_res {
172
+ Ok ( reader) => {
173
+ self . spann_reader = Some ( reader. clone ( ) ) ;
174
+ // Spawn the centers search task if reader is found.
175
+ let head_search_task = wrap (
176
+ Box :: new ( self . head_search . clone ( ) ) ,
177
+ SpannCentersSearchInput {
178
+ reader : Some ( reader) ,
179
+ normalized_query : self . normalized_query_emb . clone ( ) ,
180
+ } ,
181
+ ctx. receiver ( ) ,
182
+ ) ;
183
+ tasks. push ( head_search_task) ;
184
+ }
185
+ Err ( e) => match e {
186
+ // Segment uninited means no compaction yet.
187
+ SpannSegmentReaderError :: UninitializedSegment => {
188
+ // If the segment is uninitialized, we can skip the head search.
189
+ self . spann_reader = None ;
190
+ self . heads_searched = true ;
191
+ }
192
+ _ => {
193
+ let _: Option < ( ) > = self
194
+ . ok_or_terminate ( Err ( KnnError :: SpannSegmentReaderCreationError ( e) ) , ctx)
195
+ . await ;
196
+ }
185
197
} ,
186
- ctx. receiver ( ) ,
187
- ) ;
188
- tasks. push ( head_search_task) ;
198
+ }
189
199
190
200
let prefetch_task = wrap (
191
201
Box :: new ( PrefetchSegmentOperator :: new ( ) ) ,
0 commit comments