1
1
use async_trait:: async_trait;
2
2
use chroma_error:: { ChromaError , ErrorCodes } ;
3
+ use chroma_storage:: Storage ;
3
4
use chroma_sysdb:: SysDb ;
4
5
use chroma_system:: { Operator , OperatorType } ;
5
6
use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
7
+ use futures:: stream:: StreamExt ;
6
8
use std:: collections:: HashSet ;
7
9
use thiserror:: Error ;
8
10
9
- #[ derive( Clone , Debug ) ]
10
- pub struct DeleteVersionsAtSysDbOperator { }
11
+ #[ derive( Clone ) ]
12
+ pub struct DeleteVersionsAtSysDbOperator {
13
+ pub storage : Storage ,
14
+ }
15
+
16
+ impl std:: fmt:: Debug for DeleteVersionsAtSysDbOperator {
17
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
18
+ f. debug_struct ( "DeleteVersionsAtSysDbOperator" )
19
+ . finish_non_exhaustive ( )
20
+ }
21
+ }
11
22
12
23
#[ derive( Debug ) ]
13
24
pub struct DeleteVersionsAtSysDbInput {
@@ -29,6 +40,8 @@ pub struct DeleteVersionsAtSysDbOutput {
29
40
pub enum DeleteVersionsAtSysDbError {
30
41
#[ error( "Error deleting versions in sysdb: {0}" ) ]
31
42
SysDBError ( String ) ,
43
+ #[ error( "Error deleting version file {path}: {message}" ) ]
44
+ DeleteFileError { path : String , message : String } ,
32
45
}
33
46
34
47
impl ChromaError for DeleteVersionsAtSysDbError {
@@ -37,6 +50,72 @@ impl ChromaError for DeleteVersionsAtSysDbError {
37
50
}
38
51
}
39
52
53
+ impl DeleteVersionsAtSysDbOperator {
54
+ async fn delete_version_files (
55
+ & self ,
56
+ version_file : & CollectionVersionFile ,
57
+ versions_to_delete : & [ i64 ] ,
58
+ ) {
59
+ // Handle case where version_history is None
60
+ let version_history = match & version_file. version_history {
61
+ Some ( history) => history,
62
+ None => return , // Nothing to delete if there's no version history
63
+ } ;
64
+
65
+ let version_files_to_delete: Vec < String > = version_history
66
+ . versions
67
+ . iter ( )
68
+ . filter ( |v| versions_to_delete. contains ( & v. version ) )
69
+ . filter_map ( |v| {
70
+ if !v. version_file_name . is_empty ( ) {
71
+ Some ( v. version_file_name . clone ( ) )
72
+ } else {
73
+ None
74
+ }
75
+ } )
76
+ . collect ( ) ;
77
+
78
+ if version_files_to_delete. is_empty ( ) {
79
+ return ;
80
+ }
81
+
82
+ tracing:: info!(
83
+ files = ?version_files_to_delete,
84
+ "Deleting version files"
85
+ ) ;
86
+
87
+ let mut futures = Vec :: new ( ) ;
88
+ for file_path in & version_files_to_delete {
89
+ let storage = self . storage . clone ( ) ;
90
+ let path = file_path. clone ( ) ;
91
+ futures. push ( async move {
92
+ storage
93
+ . delete ( & path)
94
+ . await
95
+ . map_err ( |e| ( path, e. to_string ( ) ) )
96
+ } ) ;
97
+ }
98
+
99
+ let num_futures = futures. len ( ) ;
100
+ let results = futures:: stream:: iter ( futures)
101
+ . buffer_unordered ( num_futures)
102
+ . collect :: < Vec < _ > > ( )
103
+ . await ;
104
+
105
+ // Process any errors that occurred during file deletion
106
+ for result in results {
107
+ if let Err ( ( path, error) ) = result {
108
+ tracing:: warn!(
109
+ error = %error,
110
+ path = %path,
111
+ "Failed to delete version file {}, continuing since it could have been deleted already" ,
112
+ path
113
+ ) ;
114
+ }
115
+ }
116
+ }
117
+ }
118
+
40
119
#[ async_trait]
41
120
impl Operator < DeleteVersionsAtSysDbInput , DeleteVersionsAtSysDbOutput >
42
121
for DeleteVersionsAtSysDbOperator
@@ -69,6 +148,10 @@ impl Operator<DeleteVersionsAtSysDbInput, DeleteVersionsAtSysDbOutput>
69
148
let mut sysdb = input. sysdb_client . clone ( ) ;
70
149
71
150
if !input. versions_to_delete . versions . is_empty ( ) {
151
+ // First, delete the version files from the storage.
152
+ self . delete_version_files ( & input. version_file , & input. versions_to_delete . versions )
153
+ . await ;
154
+
72
155
tracing:: info!(
73
156
versions = ?input. versions_to_delete. versions,
74
157
"Deleting versions from SysDB"
@@ -110,12 +193,23 @@ impl Operator<DeleteVersionsAtSysDbInput, DeleteVersionsAtSysDbOutput>
110
193
#[ cfg( test) ]
111
194
mod tests {
112
195
use super :: * ;
196
+ use chroma_storage:: local:: LocalStorage ;
113
197
use chroma_sysdb:: TestSysDb ;
198
+ use chroma_types:: chroma_proto;
199
+ use tempfile:: TempDir ;
114
200
115
201
#[ tokio:: test]
116
202
async fn test_delete_versions_success ( ) {
203
+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
204
+ let storage = Storage :: Local ( LocalStorage :: new ( tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ) ) ;
117
205
let sysdb = SysDb :: Test ( TestSysDb :: new ( ) ) ;
118
- let version_file = CollectionVersionFile :: default ( ) ;
206
+
207
+ // Create a version file with actual version history
208
+ let version_file = CollectionVersionFile {
209
+ version_history : Some ( chroma_proto:: CollectionVersionHistory { versions : vec ! [ ] } ) ,
210
+ ..Default :: default ( )
211
+ } ;
212
+
119
213
let versions_to_delete = VersionListForCollection {
120
214
collection_id : "test_collection" . to_string ( ) ,
121
215
database_id : "default" . to_string ( ) ,
@@ -131,7 +225,9 @@ mod tests {
131
225
unused_s3_files : HashSet :: new ( ) ,
132
226
} ;
133
227
134
- let operator = DeleteVersionsAtSysDbOperator { } ;
228
+ let operator = DeleteVersionsAtSysDbOperator {
229
+ storage : storage. clone ( ) ,
230
+ } ;
135
231
let result = operator. run ( & input) . await ;
136
232
137
233
assert ! ( result. is_ok( ) ) ;
@@ -142,6 +238,8 @@ mod tests {
142
238
143
239
#[ tokio:: test]
144
240
async fn test_delete_versions_empty_list ( ) {
241
+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
242
+ let storage = Storage :: Local ( LocalStorage :: new ( tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ) ) ;
145
243
let sysdb = SysDb :: Test ( TestSysDb :: new ( ) ) ;
146
244
let version_file = CollectionVersionFile :: default ( ) ;
147
245
let versions_to_delete = VersionListForCollection {
@@ -159,12 +257,163 @@ mod tests {
159
257
unused_s3_files : HashSet :: new ( ) ,
160
258
} ;
161
259
162
- let operator = DeleteVersionsAtSysDbOperator { } ;
260
+ let operator = DeleteVersionsAtSysDbOperator {
261
+ storage : storage. clone ( ) ,
262
+ } ;
163
263
let result = operator. run ( & input) . await ;
164
264
165
265
assert ! ( result. is_ok( ) ) ;
166
266
let output = result. unwrap ( ) ;
167
267
assert_eq ! ( output. version_file, version_file) ;
168
268
assert_eq ! ( output. versions_to_delete, versions_to_delete) ;
169
269
}
270
+
271
+ #[ tokio:: test]
272
+ async fn test_delete_version_files ( ) {
273
+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
274
+ let storage = Storage :: Local ( LocalStorage :: new ( tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ) ) ;
275
+
276
+ // Create test files in the temporary directory
277
+ let test_files = vec ! [ "version_1" , "version_2" ] ;
278
+ for file in & test_files {
279
+ std:: fs:: write ( tmp_dir. path ( ) . join ( file) , "test content" ) . unwrap ( ) ;
280
+ }
281
+
282
+ // Create version file with history
283
+ let version_file = CollectionVersionFile {
284
+ version_history : Some ( chroma_proto:: CollectionVersionHistory {
285
+ versions : vec ! [
286
+ chroma_proto:: CollectionVersionInfo {
287
+ version: 1 ,
288
+ version_file_name: "version_1" . to_string( ) ,
289
+ ..Default :: default ( )
290
+ } ,
291
+ chroma_proto:: CollectionVersionInfo {
292
+ version: 2 ,
293
+ version_file_name: "version_2" . to_string( ) ,
294
+ ..Default :: default ( )
295
+ } ,
296
+ chroma_proto:: CollectionVersionInfo {
297
+ version: 3 ,
298
+ version_file_name: "" . to_string( ) , // Empty file name to test filtering
299
+ ..Default :: default ( )
300
+ } ,
301
+ ] ,
302
+ } ) ,
303
+ ..Default :: default ( )
304
+ } ;
305
+
306
+ let operator = DeleteVersionsAtSysDbOperator {
307
+ storage : storage. clone ( ) ,
308
+ } ;
309
+
310
+ // Test deleting specific versions
311
+ operator
312
+ . delete_version_files ( & version_file, & [ 1 , 2 , 3 ] )
313
+ . await ;
314
+
315
+ // Verify files were deleted
316
+ for file in & test_files {
317
+ assert ! (
318
+ !tmp_dir. path( ) . join( file) . exists( ) ,
319
+ "File {} should be deleted" ,
320
+ file
321
+ ) ;
322
+ }
323
+
324
+ // Test with non-existent files (should not panic)
325
+ operator
326
+ . delete_version_files ( & version_file, & [ 1 , 2 , 3 ] )
327
+ . await ;
328
+ }
329
+
330
+ #[ tokio:: test]
331
+ async fn test_delete_version_files_no_history ( ) {
332
+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
333
+ let storage = Storage :: Local ( LocalStorage :: new ( tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ) ) ;
334
+
335
+ // Create version file without history
336
+ let version_file = CollectionVersionFile {
337
+ version_history : None ,
338
+ ..Default :: default ( )
339
+ } ;
340
+
341
+ let operator = DeleteVersionsAtSysDbOperator {
342
+ storage : storage. clone ( ) ,
343
+ } ;
344
+
345
+ // Should return early without error
346
+ operator
347
+ . delete_version_files ( & version_file, & [ 1 , 2 , 3 ] )
348
+ . await ;
349
+ }
350
+
351
+ #[ tokio:: test]
352
+ async fn test_operator_deletes_version_files ( ) {
353
+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
354
+ let storage = Storage :: Local ( LocalStorage :: new ( tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ) ) ;
355
+ let sysdb = SysDb :: Test ( TestSysDb :: new ( ) ) ;
356
+
357
+ // Create test files in the temporary directory
358
+ let test_files = vec ! [ "version_1" , "version_2" ] ;
359
+ for file in & test_files {
360
+ std:: fs:: write ( tmp_dir. path ( ) . join ( file) , "test content" ) . unwrap ( ) ;
361
+ }
362
+
363
+ // Create version file with history
364
+ let version_file = CollectionVersionFile {
365
+ version_history : Some ( chroma_proto:: CollectionVersionHistory {
366
+ versions : vec ! [
367
+ chroma_proto:: CollectionVersionInfo {
368
+ version: 1 ,
369
+ version_file_name: "version_1" . to_string( ) ,
370
+ ..Default :: default ( )
371
+ } ,
372
+ chroma_proto:: CollectionVersionInfo {
373
+ version: 2 ,
374
+ version_file_name: "version_2" . to_string( ) ,
375
+ ..Default :: default ( )
376
+ } ,
377
+ ] ,
378
+ } ) ,
379
+ ..Default :: default ( )
380
+ } ;
381
+
382
+ let versions_to_delete = VersionListForCollection {
383
+ collection_id : "test_collection" . to_string ( ) ,
384
+ database_id : "default" . to_string ( ) ,
385
+ tenant_id : "default" . to_string ( ) ,
386
+ versions : vec ! [ 1 , 2 ] ,
387
+ } ;
388
+
389
+ let input = DeleteVersionsAtSysDbInput {
390
+ version_file : version_file. clone ( ) ,
391
+ versions_to_delete : versions_to_delete. clone ( ) ,
392
+ sysdb_client : sysdb,
393
+ epoch_id : 123 ,
394
+ unused_s3_files : HashSet :: new ( ) ,
395
+ } ;
396
+
397
+ let operator = DeleteVersionsAtSysDbOperator {
398
+ storage : storage. clone ( ) ,
399
+ } ;
400
+
401
+ // Run the operator
402
+ let result = operator. run ( & input) . await ;
403
+ assert ! ( result. is_ok( ) ) ;
404
+
405
+ // Verify files were deleted
406
+ for file in & test_files {
407
+ assert ! (
408
+ !tmp_dir. path( ) . join( file) . exists( ) ,
409
+ "File {} should have been deleted by the operator" ,
410
+ file
411
+ ) ;
412
+ }
413
+
414
+ // Verify the output matches our expectations
415
+ let output = result. unwrap ( ) ;
416
+ assert_eq ! ( output. version_file, version_file) ;
417
+ assert_eq ! ( output. versions_to_delete, versions_to_delete) ;
418
+ }
170
419
}
0 commit comments