From afc0650a490c7849324a0d4b76e58b0d6f60a624 Mon Sep 17 00:00:00 2001 From: Marshall Bowers Date: Mon, 3 Jun 2024 17:17:46 -0400 Subject: [PATCH] Paginate through extensions from the blob store (#12614) This PR updates the background task used to fetch extensions from the blob store to account for the possibility that the result set will be paginated. Will now paginate through all of the results and collect them up before proceeding to determining which extensions need to be synced. Release Notes: - N/A --- crates/collab/src/api/extensions.rs | 79 +++++++++++++++++------------ 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/crates/collab/src/api/extensions.rs b/crates/collab/src/api/extensions.rs index 20e35ec28d56d18f1fbe203fa9c8f26c5b88ab9f..03b07c07dcf97b4be797d60994ca3659a0853ba5 100644 --- a/crates/collab/src/api/extensions.rs +++ b/crates/collab/src/api/extensions.rs @@ -239,61 +239,74 @@ async fn fetch_extensions_from_blob_store( ) -> anyhow::Result<()> { log::info!("fetching extensions from blob store"); - let list = blob_store_client - .list_objects() - .bucket(blob_store_bucket) - .prefix("extensions/") - .send() - .await?; + let mut next_marker = None; + let mut published_versions = HashMap::>::default(); + + loop { + let list = blob_store_client + .list_objects() + .bucket(blob_store_bucket) + .prefix("extensions/") + .set_marker(next_marker.clone()) + .send() + .await?; + let objects = list.contents.unwrap_or_default(); + log::info!("fetched {} object(s) from blob store", objects.len()); + + for object in &objects { + let Some(key) = object.key.as_ref() else { + continue; + }; + let mut parts = key.split('/'); + let Some(_) = parts.next().filter(|part| *part == "extensions") else { + continue; + }; + let Some(extension_id) = parts.next() else { + continue; + }; + let Some(version) = parts.next() else { + continue; + }; + if parts.next() == Some("manifest.json") { + published_versions + .entry(extension_id.to_owned()) + .or_default() + .push(version.to_owned()); + } + } - let objects = list.contents.unwrap_or_default(); - - let mut published_versions = HashMap::<&str, Vec<&str>>::default(); - for object in &objects { - let Some(key) = object.key.as_ref() else { - continue; - }; - let mut parts = key.split('/'); - let Some(_) = parts.next().filter(|part| *part == "extensions") else { - continue; - }; - let Some(extension_id) = parts.next() else { - continue; - }; - let Some(version) = parts.next() else { - continue; - }; - if parts.next() == Some("manifest.json") { - published_versions - .entry(extension_id) - .or_default() - .push(version); + if let (Some(true), Some(last_object)) = (list.is_truncated, objects.last()) { + next_marker.clone_from(&last_object.key); + } else { + break; } } + log::info!("found {} published extensions", published_versions.len()); + let known_versions = app_state.db.get_known_extension_versions().await?; let mut new_versions = HashMap::<&str, Vec>::default(); let empty = Vec::new(); - for (extension_id, published_versions) in published_versions { + for (extension_id, published_versions) in &published_versions { let known_versions = known_versions.get(extension_id).unwrap_or(&empty); for published_version in published_versions { if known_versions - .binary_search_by_key(&published_version, String::as_str) + .binary_search_by_key(&published_version, |known_version| known_version) .is_err() { if let Some(extension) = fetch_extension_manifest( blob_store_client, blob_store_bucket, - extension_id, - published_version, + &extension_id, + &published_version, ) .await .log_err() { new_versions - .entry(extension_id) + .entry(&extension_id) .or_default() .push(extension); }