Skip to content

Commit 1e6a027

Browse files
committed
[ENH] Add backoff to chroma-load upsert rate limit retry
1 parent ffe19e5 commit 1e6a027

File tree

3 files changed

+23
-4
lines changed

3 files changed

+23
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/load/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ async-trait = { workspace = true }
88
chrono = { workspace = true }
99
clap = { workspace = true }
1010
figment = { workspace = true }
11+
rand = { workspace = true }
1112
serde = { workspace = true }
1213
serde_json = { workspace = true }
1314
tokio = { workspace = true }

rust/load/src/data_sets.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,12 @@ impl DataSet for VerifyingDataSet {
12611261
return Err(Box::new(Error::InvalidRequest("No documents".into())));
12621262
}
12631263

1264+
let max_retries = 10;
1265+
let base_delay = std::time::Duration::from_millis(10);
1266+
let max_delay = std::time::Duration::from_secs(10);
1267+
let mut retry_count = 0;
1268+
let mut delay = base_delay;
1269+
12641270
loop {
12651271
let entries = CollectionEntries {
12661272
ids: keys.clone(),
@@ -1271,12 +1277,23 @@ impl DataSet for VerifyingDataSet {
12711277
let result = collection.upsert(entries, None).await;
12721278
if let Err(err) = result {
12731279
if format!("{err:?}").contains("429") {
1280+
if retry_count >= max_retries {
1281+
return Err(Box::new(Error::InvalidRequest(format!(
1282+
"UPSERT for {} failed after {} retries: RATE LIMITED {err:?}",
1283+
key_start_index, max_retries
1284+
))));
1285+
}
1286+
12741287
tracing::warn!(
1275-
"UPSERT for {} failed: RATE LIMITED {err:?}",
1276-
key_start_index
1288+
"UPSERT for {} failed: RATE LIMITED {err:?}, retry {}/{}, sleeping for {:?}",
1289+
key_start_index, retry_count + 1, max_retries, delay
12771290
);
1278-
// sleep for 0.01 seconds, retry
1279-
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1291+
1292+
tokio::time::sleep(delay).await;
1293+
1294+
// Exponential backoff with max delay
1295+
delay = std::cmp::min(delay * 2, max_delay);
1296+
retry_count += 1;
12801297
continue;
12811298
} else {
12821299
return Err(Box::new(Error::InvalidRequest(format!(

0 commit comments

Comments
 (0)