Skip to content

Commit e167434

Browse files
committed
[ENH]: move collection hard deletes to garbage collector
1 parent aac60ef commit e167434

File tree

9 files changed

+747
-186
lines changed

9 files changed

+747
-186
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
use crate::operators::{
2-
fetch_lineage_file::{
3-
FetchLineageFileError, FetchLineageFileInput, FetchLineageFileOperator,
4-
FetchLineageFileOutput,
5-
},
6-
fetch_version_file::{
7-
FetchVersionFileError, FetchVersionFileInput, FetchVersionFileOperator,
8-
FetchVersionFileOutput,
9-
},
10-
get_version_file_paths::{
11-
GetVersionFilePathsError, GetVersionFilePathsInput, GetVersionFilePathsOperator,
12-
GetVersionFilePathsOutput,
1+
use crate::{
2+
operators::{
3+
fetch_lineage_file::{
4+
FetchLineageFileError, FetchLineageFileInput, FetchLineageFileOperator,
5+
FetchLineageFileOutput,
6+
},
7+
fetch_version_file::{
8+
FetchVersionFileError, FetchVersionFileInput, FetchVersionFileOperator,
9+
FetchVersionFileOutput,
10+
},
11+
get_version_file_paths::{
12+
GetVersionFilePathsError, GetVersionFilePathsInput, GetVersionFilePathsOperator,
13+
GetVersionFilePathsOutput,
14+
},
1315
},
16+
types::{VersionGraph, VersionGraphNode, VersionStatus},
1417
};
1518
use async_trait::async_trait;
1619
use base64::{prelude::BASE64_STANDARD, Engine};
@@ -32,15 +35,6 @@ use thiserror::Error;
3235
use tokio::sync::oneshot::{error::RecvError, Sender};
3336
use tracing::Span;
3437

35-
#[derive(Debug, Clone, Copy)]
36-
pub enum VersionStatus {
37-
#[allow(dead_code)]
38-
Alive {
39-
created_at: DateTime<chrono::Utc>,
40-
},
41-
Deleted,
42-
}
43-
4438
#[derive(Debug, Clone)]
4539
struct VersionDependency {
4640
source_collection_id: CollectionUuid,
@@ -91,16 +85,6 @@ impl ConstructVersionGraphOrchestrator {
9185
}
9286
}
9387

94-
#[derive(Debug, Clone)]
95-
pub struct VersionGraphNode {
96-
pub collection_id: CollectionUuid,
97-
pub version: i64,
98-
#[allow(dead_code)]
99-
pub status: VersionStatus,
100-
}
101-
102-
pub type VersionGraph = DiGraph<VersionGraphNode, ()>;
103-
10488
#[derive(Debug)]
10589
#[allow(dead_code)]
10690
pub struct ConstructVersionGraphResponse {
@@ -130,8 +114,10 @@ pub enum ConstructVersionGraphError {
130114
InvalidUuid(#[from] uuid::Error),
131115
#[error("Invalid timestamp: {0}")]
132116
InvalidTimestamp(i64),
133-
#[error("Expected node not found while constructing graph")]
134-
ExpectedNodeNotFound,
117+
#[error("Expected node not found while constructing graph (collection {0}@v{1:?})")]
118+
ExpectedNodeNotFound(CollectionUuid, Option<i64>),
119+
#[error("Invariant violation: {0}")]
120+
InvariantViolation(String),
135121
}
136122

137123
impl<E> From<TaskError<E>> for ConstructVersionGraphError
@@ -159,7 +145,8 @@ impl ChromaError for ConstructVersionGraphError {
159145
ConstructVersionGraphError::FetchVersionFilePaths(err) => err.code(),
160146
ConstructVersionGraphError::InvalidUuid(_) => ErrorCodes::Internal,
161147
ConstructVersionGraphError::InvalidTimestamp(_) => ErrorCodes::InvalidArgument,
162-
ConstructVersionGraphError::ExpectedNodeNotFound => ErrorCodes::Internal,
148+
ConstructVersionGraphError::ExpectedNodeNotFound(_, _) => ErrorCodes::Internal,
149+
ConstructVersionGraphError::InvariantViolation(_) => ErrorCodes::Internal,
163150
}
164151
}
165152
}
@@ -224,9 +211,11 @@ impl ConstructVersionGraphOrchestrator {
224211
ctx: &ComponentContext<ConstructVersionGraphOrchestrator>,
225212
) -> Result<(), ConstructVersionGraphError> {
226213
if self.num_pending_tasks == 0 {
214+
// This map will be used as a basis for building the graph
227215
let mut versions_by_collection_id: HashMap<CollectionUuid, Vec<(i64, VersionStatus)>> =
228216
HashMap::new();
229217

218+
// Add all known versions from version files to map
230219
for (collection_id, version_file) in self.version_files.iter() {
231220
if let Some(versions) = &version_file.version_history {
232221
for version in versions.versions.iter() {
@@ -251,6 +240,7 @@ impl ConstructVersionGraphOrchestrator {
251240
}
252241
}
253242

243+
// If any version appears as a version dependency (from the lineage file) but does not already exist in the map from the version files, the version must have been deleted.
254244
for dependency in self.version_dependencies.iter() {
255245
let source_collection_id = dependency.source_collection_id;
256246
let source_collection_version = dependency.source_collection_version;
@@ -272,6 +262,11 @@ impl ConstructVersionGraphOrchestrator {
272262
versions.sort_unstable_by_key(|v| v.0);
273263
}
274264

265+
tracing::trace!(
266+
"Versions by collection ID: {:#?}",
267+
versions_by_collection_id
268+
);
269+
275270
let mut graph = DiGraph::new();
276271
for (collection_id, versions) in versions_by_collection_id.iter() {
277272
let mut prev_node = None;
@@ -282,12 +277,14 @@ impl ConstructVersionGraphOrchestrator {
282277
status: *status,
283278
});
284279
if let Some(prev) = prev_node {
280+
// Add edge between each successive pair of collection versions
285281
graph.add_edge(prev, node, ());
286282
}
287283
prev_node = Some(node);
288284
}
289285
}
290286

287+
// Add edges for forked collections
291288
for dependency in self.version_dependencies.iter() {
292289
let source_node = graph
293290
.node_indices()
@@ -296,15 +293,25 @@ impl ConstructVersionGraphOrchestrator {
296293
node.collection_id == dependency.source_collection_id
297294
&& node.version == dependency.source_collection_version
298295
})
299-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
296+
.ok_or_else(|| {
297+
ConstructVersionGraphError::ExpectedNodeNotFound(
298+
dependency.source_collection_id,
299+
Some(dependency.source_collection_version),
300+
)
301+
})?;
300302

301303
let target_node = graph
302304
.node_indices()
303305
.find(|n| {
304306
let node = graph.node_weight(*n).expect("node index should exist");
305307
node.collection_id == dependency.target_collection_id
306308
})
307-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
309+
.ok_or_else(|| {
310+
ConstructVersionGraphError::ExpectedNodeNotFound(
311+
dependency.target_collection_id,
312+
None,
313+
)
314+
})?;
308315

309316
graph.add_edge(source_node, target_node, ());
310317
}
@@ -317,6 +324,15 @@ impl ConstructVersionGraphOrchestrator {
317324

318325
tracing::trace!("Version files: {:#?}", self.version_files);
319326

327+
let components = petgraph::algo::connected_components(&graph);
328+
if components != 1 {
329+
// This is a defensive check, it should never happen
330+
return Err(ConstructVersionGraphError::InvariantViolation(format!(
331+
"Graph is not fully connected, found {} components",
332+
components
333+
)));
334+
}
335+
320336
self.terminate_with_result(
321337
Ok(ConstructVersionGraphResponse {
322338
graph,

0 commit comments

Comments
 (0)