Skip to content

Commit 258f7d6

Browse files
authored
[ENH] Clean up client manager into manager/assigner - make log client use it (#4640)
## Description of changes _Summarize the changes made by this PR._ - Remove `set_system` pattern in lieu of passing system into try_from_config - aligning with the common. pattern since we touched this - Refactor client manager into client manager + assigner thats generic over its clients - Use this in the old call site and in the log, the log uses assignment for write path, and reads all for read path ## Test plan _How are these changes tested?_ - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None
1 parent a6f30a3 commit 258f7d6

File tree

19 files changed

+534
-292
lines changed

19 files changed

+534
-292
lines changed

Cargo.lock

Lines changed: 9 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ tokio = { version = "1.41", features = ["fs", "macros", "rt-multi-thread"] }
6969
tokio-util = "0.7.12"
7070
tonic = "0.12"
7171
tonic-health = "0.12.3"
72-
tower = "0.5"
72+
tower = { version = "0.4.13", features = ["discover"] }
7373
backon = "1.3.0"
7474
tracing = { version = "0.1" }
7575
tracing-bunyan-formatter = "0.3"

rust/frontend/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ regex = { workspace = true }
2828
futures = { workspace = true }
2929
backon = { workspace = true }
3030
tracing-opentelemetry = { workspace = true }
31-
tower = { version = "0.4.13", features = ["discover"] }
31+
tower = { workspace = true }
3232
mdac = { workspace = true }
3333
opentelemetry.workspace = true
3434
validator = { workspace = true }

rust/frontend/src/executor/distributed.rs

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
1-
use super::client_manager::NodeNameToClient;
2-
use super::{client_manager::ClientManager, config};
1+
use super::config;
32
use async_trait::async_trait;
43
use backon::ExponentialBuilder;
54
use backon::Retryable;
65
use chroma_config::registry;
76
use chroma_config::{assignment::assignment_policy::AssignmentPolicy, Configurable};
87
use chroma_error::ChromaError;
8+
use chroma_memberlist::client_manager::ClientAssigner;
9+
use chroma_memberlist::client_manager::{ClientManager, ClientOptions};
910
use chroma_memberlist::{
1011
config::MemberlistProviderConfig,
1112
memberlist_provider::{CustomResourceMemberlistProvider, MemberlistProvider},
1213
};
1314
use chroma_system::System;
15+
use chroma_types::chroma_proto::query_executor_client::QueryExecutorClient;
1416
use chroma_types::SegmentType;
1517
use chroma_types::{
16-
chroma_proto::query_executor_client::QueryExecutorClient,
1718
operator::{CountResult, GetResult, KnnBatchResult},
1819
plan::{Count, Get, Knn},
19-
CollectionUuid, ExecutorError,
20+
ExecutorError,
2021
};
2122
use rand::seq::SliceRandom;
22-
use std::cmp::min;
2323
use tonic::Request;
2424

25-
type Client = QueryExecutorClient<chroma_tracing::GrpcTraceService<tonic::transport::Channel>>;
25+
// Convenience type alias for the gRPC query client used by the DistributedExecutor
26+
type QueryClient = QueryExecutorClient<chroma_tracing::GrpcTraceService<tonic::transport::Channel>>;
2627

2728
/// A distributed executor that routes requests to the appropriate node based on the assignment policy
2829
/// # Fields
@@ -36,8 +37,7 @@ type Client = QueryExecutorClient<chroma_tracing::GrpcTraceService<tonic::transp
3637
/// outside.
3738
#[derive(Clone, Debug)]
3839
pub struct DistributedExecutor {
39-
node_name_to_client: NodeNameToClient,
40-
assignment_policy: Box<dyn AssignmentPolicy>,
40+
client_assigner: ClientAssigner<QueryClient>,
4141
replication_factor: usize,
4242
backoff: ExponentialBuilder,
4343
}
@@ -50,13 +50,13 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
5050
) -> Result<Self, Box<dyn ChromaError>> {
5151
let assignment_policy =
5252
Box::<dyn AssignmentPolicy>::try_from_config(&config.assignment, registry).await?;
53-
let node_name_to_client = NodeNameToClient::default();
53+
let client_assigner = ClientAssigner::new(assignment_policy, config.replication_factor);
5454
let client_manager = ClientManager::new(
55-
node_name_to_client.clone(),
55+
client_assigner.clone(),
5656
config.connections_per_node,
5757
config.connect_timeout_ms,
5858
config.request_timeout_ms,
59-
config.max_query_service_response_size_bytes,
59+
ClientOptions::new(Some(config.max_query_service_response_size_bytes)),
6060
);
6161
let client_manager_handle = system.start_component(client_manager);
6262

@@ -75,8 +75,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
7575
let retry_config = &config.retry;
7676
let backoff = retry_config.into();
7777
Ok(Self {
78-
node_name_to_client,
79-
assignment_policy,
78+
client_assigner,
8079
replication_factor: config.replication_factor,
8180
backoff,
8281
})
@@ -97,7 +96,17 @@ impl DistributedExecutor {
9796
impl DistributedExecutor {
9897
///////////////////////// Plan Operations /////////////////////////
9998
pub async fn count(&mut self, plan: Count) -> Result<CountResult, ExecutorError> {
100-
let clients = self.clients(plan.scan.collection_and_segments.collection.collection_id)?;
99+
let clients = self
100+
.client_assigner
101+
.clients(
102+
&plan
103+
.scan
104+
.collection_and_segments
105+
.collection
106+
.collection_id
107+
.to_string(),
108+
)
109+
.map_err(|e| ExecutorError::Internal(e.boxed()))?;
101110
let plan: chroma_types::chroma_proto::CountPlan = plan.clone().try_into()?;
102111
let res = (|| async {
103112
choose_client(clients.as_slice())?
@@ -111,7 +120,17 @@ impl DistributedExecutor {
111120
}
112121

113122
pub async fn get(&mut self, plan: Get) -> Result<GetResult, ExecutorError> {
114-
let clients = self.clients(plan.scan.collection_and_segments.collection.collection_id)?;
123+
let clients = self
124+
.client_assigner
125+
.clients(
126+
&plan
127+
.scan
128+
.collection_and_segments
129+
.collection
130+
.collection_id
131+
.to_string(),
132+
)
133+
.map_err(|e| ExecutorError::Internal(e.boxed()))?;
115134
let res = (|| async {
116135
choose_client(clients.as_slice())?
117136
.get(Request::new(plan.clone().try_into()?))
@@ -124,7 +143,17 @@ impl DistributedExecutor {
124143
}
125144

126145
pub async fn knn(&mut self, plan: Knn) -> Result<KnnBatchResult, ExecutorError> {
127-
let clients = self.clients(plan.scan.collection_and_segments.collection.collection_id)?;
146+
let clients = self
147+
.client_assigner
148+
.clients(
149+
&plan
150+
.scan
151+
.collection_and_segments
152+
.collection
153+
.collection_id
154+
.to_string(),
155+
)
156+
.map_err(|e| ExecutorError::Internal(e.boxed()))?;
128157
let res = (|| async {
129158
choose_client(clients.as_slice())?
130159
.knn(Request::new(plan.clone().try_into()?))
@@ -137,38 +166,15 @@ impl DistributedExecutor {
137166
}
138167

139168
pub async fn is_ready(&self) -> bool {
140-
!self.node_name_to_client.read().is_empty()
169+
!self.client_assigner.is_empty()
141170
}
171+
}
142172

143-
///////////////////////// Helpers /////////////////////////
144-
145-
/// Get the gRPC clients for the given collection id by performing the assignment policy
146-
/// # Arguments
147-
/// - `collection_id` - The collection id for which the client is to be fetched
148-
/// # Returns
149-
/// - The gRPC clients for the given collection id in the order of the assignment policy
150-
/// # Errors
151-
/// - If no client is found for the given collection id
152-
/// - If the assignment policy fails to assign the collection id
153-
fn clients(&mut self, collection_id: CollectionUuid) -> Result<Vec<Client>, ExecutorError> {
154-
let node_name_to_client_guard = self.node_name_to_client.read();
155-
let members: Vec<String> = node_name_to_client_guard.keys().cloned().collect();
156-
let target_replication_factor = min(self.replication_factor, members.len());
157-
self.assignment_policy.set_members(members);
158-
let assigned = self
159-
.assignment_policy
160-
.assign(&collection_id.to_string(), target_replication_factor)?;
161-
let clients = assigned
162-
.iter()
163-
.map(|node_name| {
164-
node_name_to_client_guard
165-
.get(node_name)
166-
.ok_or_else(|| ExecutorError::NoClientFound(node_name.clone()))
167-
.cloned()
168-
})
169-
.collect::<Result<Vec<_>, _>>()?;
170-
Ok(clients)
171-
}
173+
fn choose_client(clients: &[QueryClient]) -> Result<QueryClient, tonic::Status> {
174+
Ok(clients
175+
.choose(&mut rand::thread_rng())
176+
.ok_or(no_clients_found_status())?
177+
.clone())
172178
}
173179

174180
fn is_retryable_error(e: &tonic::Status) -> bool {
@@ -181,10 +187,3 @@ fn is_retryable_error(e: &tonic::Status) -> bool {
181187
fn no_clients_found_status() -> tonic::Status {
182188
tonic::Status::internal("No clients found")
183189
}
184-
185-
fn choose_client(clients: &[Client]) -> Result<Client, tonic::Status> {
186-
Ok(clients
187-
.choose(&mut rand::thread_rng())
188-
.ok_or(no_clients_found_status())?
189-
.clone())
190-
}

rust/frontend/src/executor/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use distributed::DistributedExecutor;
77
use local::LocalExecutor;
88

99
//////////////////////// Exposed Modules ////////////////////////
10-
pub(super) mod client_manager;
1110
pub mod config;
1211
// TODO: This should be private once we fix dep injection
1312
mod distributed;

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1458,7 +1458,7 @@ impl Configurable<(FrontendConfig, System)> for ServiceBasedFrontend {
14581458
};
14591459

14601460
let sysdb = SysDb::try_from_config(&config.sysdb, registry).await?;
1461-
let mut log = Log::try_from_config(&config.log, registry).await?;
1461+
let mut log = Log::try_from_config(&(config.log.clone(), system.clone()), registry).await?;
14621462
let max_batch_size = log.get_max_batch_size().await?;
14631463

14641464
// Create compation manager and pass handle to log service if configured

rust/log/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ chroma-system = { workspace = true }
3535
chroma-sysdb = { workspace = true }
3636
chroma-types = { workspace = true }
3737
chroma-sqlite = { workspace = true }
38+
chroma-memberlist = { workspace = true }
3839
wal3 = { workspace = true }
3940
figment.workspace = true
4041

rust/log/src/config.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use chroma_memberlist::config::CustomResourceMemberlistProviderConfig;
12
use serde::{Deserialize, Serialize};
23

34
#[derive(Deserialize, Clone, Serialize, Debug)]
@@ -28,6 +29,10 @@ pub struct GrpcLogConfig {
2829
pub use_alt_for_collections: Vec<String>,
2930
#[serde(default = "Option::default")]
3031
pub alt_host_threshold: Option<String>,
32+
#[serde(default = "GrpcLogConfig::default_memberlist_provider")]
33+
pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
34+
#[serde(default = "GrpcLogConfig::default_assignment")]
35+
pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig,
3136
}
3237

3338
impl GrpcLogConfig {
@@ -54,6 +59,24 @@ impl GrpcLogConfig {
5459
fn default_max_decoding_message_size() -> usize {
5560
32_000_000
5661
}
62+
63+
fn default_memberlist_provider() -> chroma_memberlist::config::MemberlistProviderConfig {
64+
chroma_memberlist::config::MemberlistProviderConfig::CustomResource(
65+
CustomResourceMemberlistProviderConfig {
66+
kube_namespace: "chroma".to_string(),
67+
memberlist_name: "rust-log-service-memberlist".to_string(),
68+
queue_size: 100,
69+
},
70+
)
71+
}
72+
73+
fn default_assignment() -> chroma_config::assignment::config::AssignmentPolicyConfig {
74+
chroma_config::assignment::config::AssignmentPolicyConfig::RendezvousHashing(
75+
chroma_config::assignment::config::RendezvousHashingAssignmentPolicyConfig {
76+
hasher: chroma_config::assignment::config::HasherType::Murmur3,
77+
},
78+
)
79+
}
5780
}
5881

5982
impl Default for GrpcLogConfig {
@@ -69,6 +92,8 @@ impl Default for GrpcLogConfig {
6992
use_alt_for_tenants: vec![],
7093
use_alt_for_collections: vec![],
7194
alt_host_threshold: None,
95+
memberlist_provider: GrpcLogConfig::default_memberlist_provider(),
96+
assignment: GrpcLogConfig::default_assignment(),
7297
}
7398
}
7499
}

0 commit comments

Comments
 (0)