Skip to content

[ENH] Enable rust log service in CI and add some tracing. #4418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/frontend/sample_configs/tilt_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ log:
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
alt_host: "rust-log-service.chroma"
#use_alt_host_for_everything: true
use_alt_host_for_everything: true

executor:
distributed:
Expand Down
2 changes: 1 addition & 1 deletion rust/garbage_collector/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl ChromaGrpcClients {
let sysdb_channel = Channel::from_static("http://localhost:50051")
.connect()
.await?;
let logservice_channel = Channel::from_static("http://localhost:50052")
let logservice_channel = Channel::from_static("http://localhost:50054")
.connect()
.await?;
let queryservice_channel = Channel::from_static("http://localhost:50053")
Expand Down
83 changes: 46 additions & 37 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ pub struct LogServer {

#[async_trait::async_trait]
impl LogService for LogServer {
#[tracing::instrument(skip(self, request))]
async fn push_logs(
&self,
request: Request<PushLogsRequest>,
Expand All @@ -613,43 +612,49 @@ impl LogService for LogServer {
if push_logs.records.is_empty() {
return Err(Status::invalid_argument("Too few records"));
}
let prefix = storage_prefix_for_log(collection_id);
let key = LogKey { collection_id };
let handle = self.open_logs.get_or_create_state(key);
let mark_dirty = MarkDirty {
collection_id,
dirty_log: Arc::clone(&self.dirty_log),
};
let log = get_log_from_handle(
&handle,
&self.config.writer,
&self.storage,
&prefix,
mark_dirty,
)
.await
// TODO(rescrv): better error handling.
.map_err(|err| Status::unknown(err.to_string()))?;
let mut messages = Vec::with_capacity(push_logs.records.len());
for record in push_logs.records {
let mut buf = vec![];
record
.encode(&mut buf)
.map_err(|err| Status::unknown(err.to_string()))?;
messages.push(buf);
}
let record_count = messages.len() as i32;
log.append_many(messages).await.map_err(|err| {
if let wal3::Error::Backoff = err {
Status::new(
chroma_error::ErrorCodes::Unavailable.into(),
err.to_string(),
)
} else {
Status::new(err.code().into(), err.to_string())
let span = tracing::info_span!("push_logs");

async move {
let prefix = storage_prefix_for_log(collection_id);
let key = LogKey { collection_id };
let handle = self.open_logs.get_or_create_state(key);
let mark_dirty = MarkDirty {
collection_id,
dirty_log: Arc::clone(&self.dirty_log),
};
let log = get_log_from_handle(
&handle,
&self.config.writer,
&self.storage,
&prefix,
mark_dirty,
)
.await
// TODO(rescrv): better error handling.
.map_err(|err| Status::unknown(err.to_string()))?;
let mut messages = Vec::with_capacity(push_logs.records.len());
for record in push_logs.records {
let mut buf = vec![];
record
.encode(&mut buf)
.map_err(|err| Status::unknown(err.to_string()))?;
messages.push(buf);
}
})?;
Ok(Response::new(PushLogsResponse { record_count }))
let record_count = messages.len() as i32;
log.append_many(messages).await.map_err(|err| {
if let wal3::Error::Backoff = err {
Status::new(
chroma_error::ErrorCodes::Unavailable.into(),
err.to_string(),
)
} else {
Status::new(err.code().into(), err.to_string())
}
})?;
Ok(Response::new(PushLogsResponse { record_count }))
}
.instrument(span)
.await
}

async fn scout_logs(
Expand Down Expand Up @@ -799,6 +804,7 @@ impl LogService for LogServer {
);
let cursor_name = &COMPACTION;
let witness = cursors.load(cursor_name).await.map_err(|err| {
tracing::info!("FINDME");
Status::new(err.code().into(), format!("Failed to load cursor: {}", err))
})?;
// This is the existing compaction_offset, which is the last record that was compacted.
Expand All @@ -815,6 +821,7 @@ impl LogService for LogServer {
)
.await
.map_err(|err| {
tracing::info!("FINDME");
Status::new(err.code().into(), format!("Failed to copy log: {}", err))
})?;
let log_reader = LogReader::new(
Expand All @@ -824,9 +831,11 @@ impl LogService for LogServer {
);
// This is the next record to insert, so we'll have to adjust downwards.
let max_offset = log_reader.maximum_log_position().await.map_err(|err| {
tracing::info!("FINDME");
Status::new(err.code().into(), format!("Failed to copy log: {}", err))
})?;
if max_offset < offset {
tracing::info!("FINDME");
return Err(Status::new(
chroma_error::ErrorCodes::Internal.into(),
format!("max_offset={:?} < offset={:?}", max_offset, offset),
Expand Down
4 changes: 2 additions & 2 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl ChromaError for GrpcPullLogsError {
pub enum GrpcPushLogsError {
#[error("Please backoff exponentially and retry")]
Backoff,
#[error("Failed to push logs")]
#[error("Failed to push logs: {0}")]
FailedToPushLogs(#[from] tonic::Status),
#[error("Failed to convert records to proto")]
ConversionError(#[from] RecordConversionError),
Expand All @@ -63,7 +63,7 @@ impl ChromaError for GrpcPushLogsError {
pub enum GrpcForkLogsError {
#[error("Please backoff exponentially and retry")]
Backoff,
#[error("Failed to push logs")]
#[error("Failed to push logs: {0}")]
FailedToForkLogs(#[from] tonic::Status),
}

Expand Down
7 changes: 4 additions & 3 deletions rust/wal3/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ pub async fn copy(
offset: LogPosition,
target: String,
) -> Result<(), Error> {
let Some(manifest) = reader.manifest().await? else {
return Err(Error::UninitializedLog);
};
let manifest = reader
.manifest()
.await?
.unwrap_or(Manifest::new_empty("copy task"));
let mut snapshots = vec![];
for snapshot in &manifest.snapshots {
snapshots.push(copy_snapshot(storage, options, reader, snapshot, offset, &target).await?);
Expand Down
8 changes: 8 additions & 0 deletions rust/wal3/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl Manifest {
}
if snapshots.len() >= snapshot_options.snapshot_rollover_threshold {
let path = unprefixed_snapshot_path(setsum);
tracing::info!("generating snapshot {path}");
return Some(Snapshot {
path,
depth: snapshot_depth + 1,
Expand Down Expand Up @@ -382,6 +383,7 @@ impl Manifest {
}
if fragments.len() >= snapshot_options.fragment_rollover_threshold {
let path = unprefixed_snapshot_path(setsum);
tracing::info!("generating snapshot {path}");
return Some(Snapshot {
path,
depth: 1,
Expand Down Expand Up @@ -642,6 +644,12 @@ impl Manifest {
options.throughput as f64,
options.headroom as f64,
);
tracing::info!(
"installing manifest at {} {:?} {:?}",
prefix,
new.maximum_log_position(),
current,
);
loop {
let payload = serde_json::to_string(&new)
.map_err(|e| {
Expand Down
1 change: 1 addition & 0 deletions rust/wal3/src/manifest_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl ManifestManager {
}

/// Given a fragment, add it to the manifest, batch its application and wait for it to apply.
#[tracing::instrument(skip(self, fragment))]
pub async fn publish_fragment(&self, fragment: Fragment) -> Result<(), Error> {
assert_ne!(fragment.setsum, Setsum::default(), "TODO(rescrv): remove");
let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/tilt_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ query_service:
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
alt_host: "rust-log-service.chroma"
#use_alt_host_for_everything: true
use_alt_host_for_everything: true
dispatcher:
num_worker_threads: 4
dispatcher_queue_size: 100
Expand Down Expand Up @@ -119,7 +119,7 @@ compaction_service:
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
alt_host: "rust-log-service.chroma"
#use_alt_host_for_everything: true
use_alt_host_for_everything: true
dispatcher:
num_worker_threads: 4
dispatcher_queue_size: 100
Expand Down
Loading