diff --git a/crates/collab/src/api/extensions.rs b/crates/collab/src/api/extensions.rs index 20e35ec28d56d18f1fbe203fa9c8f26c5b88ab9f..03b07c07dcf97b4be797d60994ca3659a0853ba5 100644 --- a/crates/collab/src/api/extensions.rs +++ b/crates/collab/src/api/extensions.rs @@ -239,61 +239,74 @@ async fn fetch_extensions_from_blob_store( ) -> anyhow::Result<()> { log::info!("fetching extensions from blob store"); - let list = blob_store_client - .list_objects() - .bucket(blob_store_bucket) - .prefix("extensions/") - .send() - .await?; + let mut next_marker = None; + let mut published_versions = HashMap::>::default(); + + loop { + let list = blob_store_client + .list_objects() + .bucket(blob_store_bucket) + .prefix("extensions/") + .set_marker(next_marker.clone()) + .send() + .await?; + let objects = list.contents.unwrap_or_default(); + log::info!("fetched {} object(s) from blob store", objects.len()); + + for object in &objects { + let Some(key) = object.key.as_ref() else { + continue; + }; + let mut parts = key.split('/'); + let Some(_) = parts.next().filter(|part| *part == "extensions") else { + continue; + }; + let Some(extension_id) = parts.next() else { + continue; + }; + let Some(version) = parts.next() else { + continue; + }; + if parts.next() == Some("manifest.json") { + published_versions + .entry(extension_id.to_owned()) + .or_default() + .push(version.to_owned()); + } + } - let objects = list.contents.unwrap_or_default(); - - let mut published_versions = HashMap::<&str, Vec<&str>>::default(); - for object in &objects { - let Some(key) = object.key.as_ref() else { - continue; - }; - let mut parts = key.split('/'); - let Some(_) = parts.next().filter(|part| *part == "extensions") else { - continue; - }; - let Some(extension_id) = parts.next() else { - continue; - }; - let Some(version) = parts.next() else { - continue; - }; - if parts.next() == Some("manifest.json") { - published_versions - .entry(extension_id) - .or_default() - .push(version); + if let (Some(true), Some(last_object)) = (list.is_truncated, objects.last()) { + next_marker.clone_from(&last_object.key); + } else { + break; } } + log::info!("found {} published extensions", published_versions.len()); + let known_versions = app_state.db.get_known_extension_versions().await?; let mut new_versions = HashMap::<&str, Vec>::default(); let empty = Vec::new(); - for (extension_id, published_versions) in published_versions { + for (extension_id, published_versions) in &published_versions { let known_versions = known_versions.get(extension_id).unwrap_or(&empty); for published_version in published_versions { if known_versions - .binary_search_by_key(&published_version, String::as_str) + .binary_search_by_key(&published_version, |known_version| known_version) .is_err() { if let Some(extension) = fetch_extension_manifest( blob_store_client, blob_store_bucket, - extension_id, - published_version, + &extension_id, + &published_version, ) .await .log_err() { new_versions - .entry(extension_id) + .entry(&extension_id) .or_default() .push(extension); }