Skip to content

Commit 7765df5

Browse files
[ENH] Fork the wal3 log. (chroma-core#4416)
2 parents 29c0c3a + 7382cd0 commit 7765df5

10 files changed

+558
-15
lines changed

rust/log-service/src/lib.rs

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -766,9 +766,79 @@ impl LogService for LogServer {
766766

767767
async fn fork_logs(
768768
&self,
769-
_request: Request<ForkLogsRequest>,
769+
request: Request<ForkLogsRequest>,
770770
) -> Result<Response<ForkLogsResponse>, Status> {
771-
unimplemented!("Log forking is unimplemented for WAL3 for now")
771+
let request = request.into_inner();
772+
let source_collection_id = Uuid::parse_str(&request.source_collection_id)
773+
.map(CollectionUuid)
774+
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;
775+
let target_collection_id = Uuid::parse_str(&request.target_collection_id)
776+
.map(CollectionUuid)
777+
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;
778+
let source_prefix = storage_prefix_for_log(source_collection_id);
779+
let target_prefix = storage_prefix_for_log(target_collection_id);
780+
let span = tracing::info_span!(
781+
"fork_logs",
782+
source_collection_id = source_collection_id.to_string(),
783+
target_collection_id = target_collection_id.to_string(),
784+
);
785+
let storage = Arc::clone(&self.storage);
786+
let options = self.config.writer.clone();
787+
788+
async move {
789+
let log_reader = LogReader::new(
790+
self.config.reader.clone(),
791+
Arc::clone(&storage),
792+
source_prefix.clone(),
793+
);
794+
let cursors = CursorStore::new(
795+
CursorStoreOptions::default(),
796+
Arc::clone(&storage),
797+
source_prefix,
798+
"copy task".to_string(),
799+
);
800+
let cursor_name = &COMPACTION;
801+
let witness = cursors.load(cursor_name).await.map_err(|err| {
802+
Status::new(err.code().into(), format!("Failed to load cursor: {}", err))
803+
})?;
804+
// This is the existing compaction_offset, which is the last record that was compacted.
805+
let offset = witness
806+
.map(|x| x.1.position)
807+
.unwrap_or(LogPosition::from_offset(0));
808+
wal3::copy(
809+
&storage,
810+
&options,
811+
&log_reader,
812+
// + 1 to get to the first uncompacted record.
813+
offset + 1u64,
814+
target_prefix.clone(),
815+
)
816+
.await
817+
.map_err(|err| {
818+
Status::new(err.code().into(), format!("Failed to copy log: {}", err))
819+
})?;
820+
let log_reader = LogReader::new(
821+
self.config.reader.clone(),
822+
Arc::clone(&storage),
823+
target_prefix,
824+
);
825+
// This is the next record to insert, so we'll have to adjust downwards.
826+
let max_offset = log_reader.maximum_log_position().await.map_err(|err| {
827+
Status::new(err.code().into(), format!("Failed to copy log: {}", err))
828+
})?;
829+
if max_offset < offset {
830+
return Err(Status::new(
831+
chroma_error::ErrorCodes::Internal.into(),
832+
format!("max_offset={:?} < offset={:?}", max_offset, offset),
833+
));
834+
}
835+
Ok(Response::new(ForkLogsResponse {
836+
compaction_offset: offset.offset(),
837+
enumeration_offset: (max_offset - 1u64).offset(),
838+
}))
839+
}
840+
.instrument(span)
841+
.await
772842
}
773843

774844
#[tracing::instrument(info, skip(self, request), err(Display))]

rust/wal3/src/copy.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use chroma_storage::Storage;
2+
use setsum::Setsum;
3+
4+
use crate::manifest::{unprefixed_snapshot_path, Manifest, Snapshot};
5+
use crate::reader::LogReader;
6+
use crate::writer::copy_parquet;
7+
use crate::{Error, Fragment, LogPosition, LogWriterOptions, SnapshotPointer};
8+
9+
pub async fn copy_snapshot(
10+
storage: &Storage,
11+
options: &LogWriterOptions,
12+
reader: &LogReader,
13+
root: &SnapshotPointer,
14+
offset: LogPosition,
15+
target: &str,
16+
) -> Result<SnapshotPointer, Error> {
17+
let Some(snapshot) =
18+
Snapshot::load(&options.throttle_manifest, storage, &reader.prefix, root).await?
19+
else {
20+
return Err(Error::CorruptManifest(format!(
21+
"snapshot {} does not exist",
22+
root.setsum.hexdigest(),
23+
)));
24+
};
25+
let mut dropped = vec![];
26+
let mut snapshots = vec![];
27+
for snapshot in &snapshot.snapshots {
28+
if snapshot.limit > offset {
29+
snapshots.push(
30+
Box::pin(copy_snapshot(
31+
storage, options, reader, snapshot, offset, target,
32+
))
33+
.await?,
34+
);
35+
} else {
36+
dropped.push(snapshot.setsum);
37+
}
38+
}
39+
let mut fragments = vec![];
40+
for fragment in &snapshot.fragments {
41+
if fragment.limit > offset {
42+
fragments.push(copy_fragment(storage, options, reader, fragment, target).await?);
43+
} else {
44+
dropped.push(fragment.setsum);
45+
}
46+
}
47+
let dropped = dropped.iter().fold(Setsum::default(), |x, y| x + *y);
48+
let kept_snapshots = snapshots
49+
.iter()
50+
.fold(Setsum::default(), |x, y| x + y.setsum);
51+
let kept_fragments = fragments
52+
.iter()
53+
.fold(Setsum::default(), |x, y| x + y.setsum);
54+
if dropped + kept_snapshots + kept_fragments != root.setsum {
55+
// NOTE(rescrv): If you see this error you have to figure out where data is lost. This
56+
// will require writing a test case rather than trying to deduce it from the setsums.
57+
return Err(Error::CorruptManifest(
58+
"Copying failed because the setsum was not balanced".to_string(),
59+
));
60+
}
61+
let depth = snapshots.iter().map(|x| x.depth + 1).max().unwrap_or(0);
62+
let snapshot = Snapshot {
63+
path: unprefixed_snapshot_path(kept_snapshots + kept_fragments),
64+
depth,
65+
setsum: kept_snapshots + kept_fragments,
66+
writer: "copy task".to_string(),
67+
snapshots,
68+
fragments,
69+
};
70+
snapshot
71+
.install(&options.throttle_manifest, storage, target)
72+
.await
73+
}
74+
75+
pub async fn copy_fragment(
76+
storage: &Storage,
77+
options: &LogWriterOptions,
78+
reader: &LogReader,
79+
frag: &Fragment,
80+
target: &str,
81+
) -> Result<Fragment, Error> {
82+
copy_parquet(
83+
options,
84+
storage,
85+
&format!("{}/{}", reader.prefix, frag.path),
86+
&format!("{}/{}", target, frag.path),
87+
)
88+
.await?;
89+
Ok(frag.clone())
90+
}
91+
92+
pub async fn copy(
93+
storage: &Storage,
94+
options: &LogWriterOptions,
95+
reader: &LogReader,
96+
offset: LogPosition,
97+
target: String,
98+
) -> Result<(), Error> {
99+
let Some(manifest) = reader.manifest().await? else {
100+
return Err(Error::UninitializedLog);
101+
};
102+
let mut snapshots = vec![];
103+
for snapshot in &manifest.snapshots {
104+
snapshots.push(copy_snapshot(storage, options, reader, snapshot, offset, &target).await?);
105+
}
106+
let mut fragments = vec![];
107+
for fragment in &manifest.fragments {
108+
fragments.push(copy_fragment(storage, options, reader, fragment, &target).await?);
109+
}
110+
let setsum = snapshots
111+
.iter()
112+
.map(|x| x.setsum)
113+
.fold(Setsum::default(), |x, y| x + y)
114+
+ fragments
115+
.iter()
116+
.map(|x| x.setsum)
117+
.fold(Setsum::default(), |x, y| x + y);
118+
let acc_bytes = snapshots.iter().map(|x| x.num_bytes).sum::<u64>()
119+
+ fragments.iter().map(|x| x.num_bytes).sum::<u64>();
120+
let manifest = Manifest {
121+
setsum,
122+
acc_bytes,
123+
writer: "copy task".to_string(),
124+
snapshots,
125+
fragments,
126+
};
127+
Manifest::initialize_from_manifest(options, storage, &target, manifest).await?;
128+
Ok(())
129+
}

rust/wal3/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use setsum::Setsum;
77

88
mod backoff;
99
mod batch_manager;
10+
mod copy;
1011
mod cursors;
1112
mod manifest;
1213
mod manifest_manager;
@@ -15,6 +16,7 @@ mod writer;
1516

1617
pub use backoff::ExponentialBackoff;
1718
pub use batch_manager::BatchManager;
19+
pub use copy::copy;
1820
pub use cursors::{Cursor, CursorName, CursorStore, Witness};
1921
pub use manifest::{Manifest, Snapshot, SnapshotPointer};
2022
pub use manifest_manager::ManifestManager;
@@ -219,6 +221,16 @@ impl std::ops::Add<usize> for LogPosition {
219221
}
220222
}
221223

224+
impl std::ops::Sub<u64> for LogPosition {
225+
type Output = LogPosition;
226+
227+
fn sub(self, rhs: u64) -> Self::Output {
228+
LogPosition {
229+
offset: self.offset.wrapping_sub(rhs),
230+
}
231+
}
232+
}
233+
222234
impl std::ops::Sub<LogPosition> for LogPosition {
223235
type Output = u64;
224236

rust/wal3/src/manifest.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ use crate::{
2121

2222
/////////////////////////////////////////////// paths //////////////////////////////////////////////
2323

24-
fn manifest_path(prefix: &str) -> String {
24+
pub fn manifest_path(prefix: &str) -> String {
2525
format!("{prefix}/manifest/MANIFEST")
2626
}
2727

28-
fn unprefixed_snapshot_path(setsum: Setsum) -> String {
28+
pub fn unprefixed_snapshot_path(setsum: Setsum) -> String {
2929
format!("snapshot/SNAPSHOT.{}", setsum.hexdigest())
3030
}
3131

32-
fn snapshot_setsum(path: &str) -> Result<Setsum, Error> {
32+
pub fn snapshot_setsum(path: &str) -> Result<Setsum, Error> {
3333
let setsum = path
3434
.strip_prefix("snapshot/SNAPSHOT.")
3535
.ok_or_else(|| Error::CorruptManifest(format!("unparseable snapshot path: {}", path,)))?;
@@ -199,7 +199,7 @@ impl Snapshot {
199199
options: &ThrottleOptions,
200200
storage: &Storage,
201201
prefix: &str,
202-
) -> Result<(), Error> {
202+
) -> Result<SnapshotPointer, Error> {
203203
let exp_backoff = crate::backoff::ExponentialBackoff::new(
204204
options.throughput as f64,
205205
options.headroom as f64,
@@ -214,15 +214,15 @@ impl Snapshot {
214214
let options = PutOptions::if_not_exists(StorageRequestPriority::P0);
215215
match storage.put_bytes(&path, payload, options).await {
216216
Ok(_) => {
217-
return Ok(());
217+
return Ok(self.to_pointer());
218218
}
219219
Err(StorageError::Precondition { path: _, source: _ }) => {
220220
// NOTE(rescrv): This is something of a lie. We know that someone put the
221221
// file before us, and we know the setsum of the file is embedded in the path.
222222
// Because the setsum is only calculable if you have the file and we assume
223223
// non-malicious code, anyone who puts the same setsum as us has, in all
224224
// likelihood, put something referencing the same content as us.
225-
return Ok(());
225+
return Ok(self.to_pointer());
226226
}
227227
Err(e) => {
228228
tracing::error!("error uploading manifest: {e:?}");
@@ -279,6 +279,17 @@ impl Snapshot {
279279
(None, None) => LogPosition::default(),
280280
}
281281
}
282+
283+
pub fn to_pointer(&self) -> SnapshotPointer {
284+
SnapshotPointer {
285+
setsum: self.setsum,
286+
path_to_snapshot: self.path.clone(),
287+
depth: self.depth,
288+
start: self.minimum_log_position(),
289+
limit: self.maximum_log_position(),
290+
num_bytes: self.num_bytes(),
291+
}
292+
}
282293
}
283294

284295
///////////////////////////////////////////// Manifest /////////////////////////////////////////////
@@ -533,7 +544,7 @@ impl Manifest {
533544
/// Initialize the log with an empty manifest.
534545
#[tracing::instrument(skip(storage), err(Display))]
535546
pub async fn initialize(
536-
_: &LogWriterOptions,
547+
options: &LogWriterOptions,
537548
storage: &Storage,
538549
prefix: &str,
539550
writer: &str,
@@ -546,6 +557,17 @@ impl Manifest {
546557
snapshots: vec![],
547558
fragments: vec![],
548559
};
560+
Self::initialize_from_manifest(options, storage, prefix, initial).await
561+
}
562+
563+
/// Initialize the log with an empty manifest.
564+
#[tracing::instrument(skip(storage), err(Display))]
565+
pub async fn initialize_from_manifest(
566+
_: &LogWriterOptions,
567+
storage: &Storage,
568+
prefix: &str,
569+
initial: Manifest,
570+
) -> Result<(), Error> {
549571
let payload = serde_json::to_string(&initial)
550572
.map_err(|e| Error::CorruptManifest(format!("could not encode JSON manifest: {e:?}")))?
551573
.into_bytes();

rust/wal3/src/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct Limits {
2828
pub struct LogReader {
2929
options: LogReaderOptions,
3030
storage: Arc<Storage>,
31-
prefix: String,
31+
pub(crate) prefix: String,
3232
}
3333

3434
impl LogReader {

rust/wal3/src/writer.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::time::{Duration, Instant, SystemTime};
44

55
use arrow::array::{ArrayRef, BinaryArray, RecordBatch, UInt64Array};
66
use chroma_storage::admissioncontrolleds3::StorageRequestPriority;
7-
use chroma_storage::{PutOptions, Storage, StorageError};
7+
use chroma_storage::{GetOptions, PutOptions, Storage, StorageError};
88
use parquet::arrow::ArrowWriter;
99
use parquet::basic::Compression;
1010
use parquet::file::properties::WriterProperties;
@@ -520,3 +520,41 @@ pub async fn upload_parquet(
520520
}
521521
}
522522
}
523+
524+
#[tracing::instrument(skip(options, storage))]
525+
pub async fn copy_parquet(
526+
options: &LogWriterOptions,
527+
storage: &Storage,
528+
source: &str,
529+
target: &str,
530+
) -> Result<(), Error> {
531+
let parquet = storage
532+
.get(source, GetOptions::new(StorageRequestPriority::P0))
533+
.await
534+
.map_err(Arc::new)?;
535+
let exp_backoff: ExponentialBackoff = options.throttle_fragment.into();
536+
let start = Instant::now();
537+
loop {
538+
match storage
539+
.put_bytes(
540+
target,
541+
parquet.to_vec(),
542+
PutOptions::if_not_exists(StorageRequestPriority::P0),
543+
)
544+
.await
545+
{
546+
Ok(_) => return Ok(()),
547+
Err(StorageError::Precondition { path: _, source: _ }) => return Ok(()),
548+
Err(err) => {
549+
if start.elapsed() > Duration::from_secs(60) {
550+
return Err(Error::StorageError(Arc::new(err)));
551+
}
552+
let mut backoff = exp_backoff.next();
553+
if backoff > Duration::from_secs(3_600) {
554+
backoff = Duration::from_secs(3_600);
555+
}
556+
tokio::time::sleep(backoff).await;
557+
}
558+
}
559+
}
560+
}

0 commit comments

Comments
 (0)