@@ -140,21 +140,20 @@ impl GarbageCollector {
140
140
. as_ref ( )
141
141
. ok_or ( GarbageCollectCollectionError :: Uninitialized ) ?;
142
142
143
- let use_v1_orchestrator = match cleanup_mode {
144
- CleanupMode :: DryRun | CleanupMode :: Rename | CleanupMode :: Delete => true ,
145
- CleanupMode :: DryRunV2 | CleanupMode :: DeleteV2 => false ,
146
- } ;
147
-
148
- if use_v1_orchestrator {
143
+ if cleanup_mode. is_v2 ( ) {
149
144
let orchestrator =
150
- crate :: garbage_collector_orchestrator :: GarbageCollectorOrchestrator :: new (
145
+ crate :: garbage_collector_orchestrator_v2 :: GarbageCollectorOrchestrator :: new (
151
146
collection. id ,
152
147
collection. version_file_path ,
148
+ collection. lineage_file_path ,
153
149
absolute_cutoff_time,
154
150
self . sysdb_client . clone ( ) ,
155
151
dispatcher. clone ( ) ,
152
+ system. clone ( ) ,
156
153
self . storage . clone ( ) ,
154
+ self . root_manager . clone ( ) ,
157
155
cleanup_mode,
156
+ 2 ,
158
157
) ;
159
158
160
159
let started_at = SystemTime :: now ( ) ;
@@ -178,23 +177,19 @@ impl GarbageCollector {
178
177
format ! ( "{:?}" , cleanup_mode) ,
179
178
) ] ,
180
179
) ;
180
+
181
181
return Ok ( result) ;
182
182
}
183
183
184
- let orchestrator =
185
- crate :: garbage_collector_orchestrator_v2:: GarbageCollectorOrchestrator :: new (
186
- collection. id ,
187
- collection. version_file_path ,
188
- collection. lineage_file_path ,
189
- absolute_cutoff_time,
190
- self . sysdb_client . clone ( ) ,
191
- dispatcher. clone ( ) ,
192
- system. clone ( ) ,
193
- self . storage . clone ( ) ,
194
- self . root_manager . clone ( ) ,
195
- cleanup_mode,
196
- 2 ,
197
- ) ;
184
+ let orchestrator = crate :: garbage_collector_orchestrator:: GarbageCollectorOrchestrator :: new (
185
+ collection. id ,
186
+ collection. version_file_path ,
187
+ absolute_cutoff_time,
188
+ self . sysdb_client . clone ( ) ,
189
+ dispatcher. clone ( ) ,
190
+ self . storage . clone ( ) ,
191
+ cleanup_mode,
192
+ ) ;
198
193
199
194
let started_at = SystemTime :: now ( ) ;
200
195
let result = orchestrator. run ( system. clone ( ) ) . await ?;
@@ -217,7 +212,6 @@ impl GarbageCollector {
217
212
format ! ( "{:?}" , cleanup_mode) ,
218
213
) ] ,
219
214
) ;
220
-
221
215
Ok ( result)
222
216
}
223
217
}
@@ -360,9 +354,18 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
360
354
continue ;
361
355
}
362
356
363
- if collection. lineage_file_path . is_some ( ) {
357
+ let cleanup_mode = if let Some ( tenant_mode_overrides) = & self . tenant_mode_overrides {
358
+ tenant_mode_overrides
359
+ . get ( & collection. tenant )
360
+ . cloned ( )
361
+ . unwrap_or ( self . default_cleanup_mode )
362
+ } else {
363
+ self . default_cleanup_mode
364
+ } ;
365
+
366
+ if collection. lineage_file_path . is_some ( ) && !cleanup_mode. is_v2 ( ) {
364
367
tracing:: debug!(
365
- "Skipping garbage collection for root of fork tree: {}" ,
368
+ "Skipping garbage collection for root of fork tree because GC v1 cannot handle fork trees : {}" ,
366
369
collection. id
367
370
) ;
368
371
num_skipped_jobs += 1 ;
@@ -376,17 +379,6 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
376
379
collection. version_file_path
377
380
) ;
378
381
379
- let cleanup_mode = if let Some ( tenant_mode_overrides) = & self . tenant_mode_overrides {
380
- tenant_mode_overrides
381
- . get ( & collection. tenant )
382
- . cloned ( )
383
- . unwrap_or ( self . default_cleanup_mode )
384
- } else {
385
- self . default_cleanup_mode
386
- } ;
387
-
388
- tracing:: info!( "Creating gc orchestrator for collection: {}" , collection. id) ;
389
-
390
382
let instrumented_span = span ! ( parent: None , tracing:: Level :: INFO , "Garbage collection job" , collection_id = ?collection. id, tenant_id = %collection. tenant, cleanup_mode = ?cleanup_mode) ;
391
383
instrumented_span. follows_from ( Span :: current ( ) ) ;
392
384
@@ -1011,6 +1003,26 @@ mod tests {
1011
1003
let ( collection_in_delete_mode, database_name_in_delete_mode) =
1012
1004
collection_in_delete_mode_handle. await . unwrap ( ) ;
1013
1005
1006
+ // Fork collection in delete mode to give it a lineage file (only GC v2 can handle fork trees)
1007
+ {
1008
+ let source_collection = sysdb
1009
+ . get_collections ( Some ( collection_in_delete_mode) , None , None , None , None , 0 )
1010
+ . await
1011
+ . unwrap ( ) ;
1012
+ let source_collection = source_collection. first ( ) . unwrap ( ) ;
1013
+
1014
+ sysdb
1015
+ . fork_collection (
1016
+ collection_in_delete_mode,
1017
+ source_collection. total_records_post_compaction ,
1018
+ source_collection. total_records_post_compaction ,
1019
+ CollectionUuid :: new ( ) ,
1020
+ "test-fork" . to_string ( ) ,
1021
+ )
1022
+ . await
1023
+ . unwrap ( ) ;
1024
+ }
1025
+
1014
1026
// Wait 1 second for cutoff time
1015
1027
tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
1016
1028
0 commit comments