1use crate::db::ExtensionVersionConstraints;
2use crate::{db::NewExtensionVersion, AppState, Error, Result};
3use anyhow::{anyhow, Context as _};
4use aws_sdk_s3::presigning::PresigningConfig;
5use axum::{
6 extract::{Path, Query},
7 http::StatusCode,
8 response::Redirect,
9 routing::get,
10 Extension, Json, Router,
11};
12use collections::HashMap;
13use rpc::{ExtensionApiManifest, GetExtensionsResponse};
14use semantic_version::SemanticVersion;
15use serde::Deserialize;
16use std::{sync::Arc, time::Duration};
17use time::PrimitiveDateTime;
18use util::{maybe, ResultExt};
19
20pub fn router() -> Router {
21 Router::new()
22 .route("/extensions", get(get_extensions))
23 .route("/extensions/updates", get(get_extension_updates))
24 .route("/extensions/:extension_id", get(get_extension_versions))
25 .route(
26 "/extensions/:extension_id/download",
27 get(download_latest_extension),
28 )
29 .route(
30 "/extensions/:extension_id/:version/download",
31 get(download_extension),
32 )
33}
34
35#[derive(Debug, Deserialize)]
36struct GetExtensionsParams {
37 filter: Option<String>,
38 #[serde(default)]
39 ids: Option<String>,
40 #[serde(default)]
41 max_schema_version: i32,
42}
43
44async fn get_extensions(
45 Extension(app): Extension<Arc<AppState>>,
46 Query(params): Query<GetExtensionsParams>,
47) -> Result<Json<GetExtensionsResponse>> {
48 let extension_ids = params
49 .ids
50 .as_ref()
51 .map(|s| s.split(',').map(|s| s.trim()).collect::<Vec<_>>());
52
53 let extensions = if let Some(extension_ids) = extension_ids {
54 app.db.get_extensions_by_ids(&extension_ids, None).await?
55 } else {
56 app.db
57 .get_extensions(params.filter.as_deref(), params.max_schema_version, 500)
58 .await?
59 };
60
61 Ok(Json(GetExtensionsResponse { data: extensions }))
62}
63
64#[derive(Debug, Deserialize)]
65struct GetExtensionUpdatesParams {
66 ids: String,
67 min_schema_version: i32,
68 max_schema_version: i32,
69 min_wasm_api_version: SemanticVersion,
70 max_wasm_api_version: SemanticVersion,
71}
72
73async fn get_extension_updates(
74 Extension(app): Extension<Arc<AppState>>,
75 Query(params): Query<GetExtensionUpdatesParams>,
76) -> Result<Json<GetExtensionsResponse>> {
77 let constraints = ExtensionVersionConstraints {
78 schema_versions: params.min_schema_version..=params.max_schema_version,
79 wasm_api_versions: params.min_wasm_api_version..=params.max_wasm_api_version,
80 };
81
82 let extension_ids = params.ids.split(',').map(|s| s.trim()).collect::<Vec<_>>();
83
84 let extensions = app
85 .db
86 .get_extensions_by_ids(&extension_ids, Some(&constraints))
87 .await?;
88
89 Ok(Json(GetExtensionsResponse { data: extensions }))
90}
91
92#[derive(Debug, Deserialize)]
93struct GetExtensionVersionsParams {
94 extension_id: String,
95}
96
97async fn get_extension_versions(
98 Extension(app): Extension<Arc<AppState>>,
99 Path(params): Path<GetExtensionVersionsParams>,
100) -> Result<Json<GetExtensionsResponse>> {
101 let extension_versions = app.db.get_extension_versions(¶ms.extension_id).await?;
102
103 Ok(Json(GetExtensionsResponse {
104 data: extension_versions,
105 }))
106}
107
108#[derive(Debug, Deserialize)]
109struct DownloadLatestExtensionParams {
110 extension_id: String,
111 min_schema_version: Option<i32>,
112 max_schema_version: Option<i32>,
113 min_wasm_api_version: Option<SemanticVersion>,
114 max_wasm_api_version: Option<SemanticVersion>,
115}
116
117async fn download_latest_extension(
118 Extension(app): Extension<Arc<AppState>>,
119 Path(params): Path<DownloadLatestExtensionParams>,
120) -> Result<Redirect> {
121 let constraints = maybe!({
122 let min_schema_version = params.min_schema_version?;
123 let max_schema_version = params.max_schema_version?;
124 let min_wasm_api_version = params.min_wasm_api_version?;
125 let max_wasm_api_version = params.max_wasm_api_version?;
126
127 Some(ExtensionVersionConstraints {
128 schema_versions: min_schema_version..=max_schema_version,
129 wasm_api_versions: min_wasm_api_version..=max_wasm_api_version,
130 })
131 });
132
133 let extension = app
134 .db
135 .get_extension(¶ms.extension_id, constraints.as_ref())
136 .await?
137 .ok_or_else(|| anyhow!("unknown extension"))?;
138 download_extension(
139 Extension(app),
140 Path(DownloadExtensionParams {
141 extension_id: params.extension_id,
142 version: extension.manifest.version.to_string(),
143 }),
144 )
145 .await
146}
147
148#[derive(Debug, Deserialize)]
149struct DownloadExtensionParams {
150 extension_id: String,
151 version: String,
152}
153
154async fn download_extension(
155 Extension(app): Extension<Arc<AppState>>,
156 Path(params): Path<DownloadExtensionParams>,
157) -> Result<Redirect> {
158 let Some((blob_store_client, bucket)) = app
159 .blob_store_client
160 .clone()
161 .zip(app.config.blob_store_bucket.clone())
162 else {
163 Err(Error::Http(
164 StatusCode::NOT_IMPLEMENTED,
165 "not supported".into(),
166 ))?
167 };
168
169 let DownloadExtensionParams {
170 extension_id,
171 version,
172 } = params;
173
174 let version_exists = app
175 .db
176 .record_extension_download(&extension_id, &version)
177 .await?;
178
179 if !version_exists {
180 Err(Error::Http(
181 StatusCode::NOT_FOUND,
182 "unknown extension version".into(),
183 ))?;
184 }
185
186 let url = blob_store_client
187 .get_object()
188 .bucket(bucket)
189 .key(format!(
190 "extensions/{extension_id}/{version}/archive.tar.gz"
191 ))
192 .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
193 .await
194 .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
195
196 Ok(Redirect::temporary(url.uri()))
197}
198
199const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
200const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
201
202pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>) {
203 let Some(blob_store_client) = app_state.blob_store_client.clone() else {
204 log::info!("no blob store client");
205 return;
206 };
207 let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
208 log::info!("no blob store bucket");
209 return;
210 };
211
212 let executor = app_state.executor.clone();
213 executor.spawn_detached({
214 let executor = executor.clone();
215 async move {
216 loop {
217 fetch_extensions_from_blob_store(
218 &blob_store_client,
219 &blob_store_bucket,
220 &app_state,
221 )
222 .await
223 .log_err();
224 executor.sleep(EXTENSION_FETCH_INTERVAL).await;
225 }
226 }
227 });
228}
229
230async fn fetch_extensions_from_blob_store(
231 blob_store_client: &aws_sdk_s3::Client,
232 blob_store_bucket: &String,
233 app_state: &Arc<AppState>,
234) -> anyhow::Result<()> {
235 log::info!("fetching extensions from blob store");
236
237 let list = blob_store_client
238 .list_objects()
239 .bucket(blob_store_bucket)
240 .prefix("extensions/")
241 .send()
242 .await?;
243
244 let objects = list.contents.unwrap_or_default();
245
246 let mut published_versions = HashMap::<&str, Vec<&str>>::default();
247 for object in &objects {
248 let Some(key) = object.key.as_ref() else {
249 continue;
250 };
251 let mut parts = key.split('/');
252 let Some(_) = parts.next().filter(|part| *part == "extensions") else {
253 continue;
254 };
255 let Some(extension_id) = parts.next() else {
256 continue;
257 };
258 let Some(version) = parts.next() else {
259 continue;
260 };
261 if parts.next() == Some("manifest.json") {
262 published_versions
263 .entry(extension_id)
264 .or_default()
265 .push(version);
266 }
267 }
268
269 let known_versions = app_state.db.get_known_extension_versions().await?;
270
271 let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
272 let empty = Vec::new();
273 for (extension_id, published_versions) in published_versions {
274 let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
275
276 for published_version in published_versions {
277 if known_versions
278 .binary_search_by_key(&published_version, String::as_str)
279 .is_err()
280 {
281 if let Some(extension) = fetch_extension_manifest(
282 blob_store_client,
283 blob_store_bucket,
284 extension_id,
285 published_version,
286 )
287 .await
288 .log_err()
289 {
290 new_versions
291 .entry(extension_id)
292 .or_default()
293 .push(extension);
294 }
295 }
296 }
297 }
298
299 app_state
300 .db
301 .insert_extension_versions(&new_versions)
302 .await?;
303
304 log::info!(
305 "fetched {} new extensions from blob store",
306 new_versions.values().map(|v| v.len()).sum::<usize>()
307 );
308
309 Ok(())
310}
311
312async fn fetch_extension_manifest(
313 blob_store_client: &aws_sdk_s3::Client,
314 blob_store_bucket: &String,
315 extension_id: &str,
316 version: &str,
317) -> Result<NewExtensionVersion, anyhow::Error> {
318 let object = blob_store_client
319 .get_object()
320 .bucket(blob_store_bucket)
321 .key(format!("extensions/{extension_id}/{version}/manifest.json"))
322 .send()
323 .await?;
324 let manifest_bytes = object
325 .body
326 .collect()
327 .await
328 .map(|data| data.into_bytes())
329 .with_context(|| {
330 format!("failed to download manifest for extension {extension_id} version {version}")
331 })?
332 .to_vec();
333 let manifest =
334 serde_json::from_slice::<ExtensionApiManifest>(&manifest_bytes).with_context(|| {
335 format!(
336 "invalid manifest for extension {extension_id} version {version}: {}",
337 String::from_utf8_lossy(&manifest_bytes)
338 )
339 })?;
340 let published_at = object.last_modified.ok_or_else(|| {
341 anyhow!("missing last modified timestamp for extension {extension_id} version {version}")
342 })?;
343 let published_at = time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
344 let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
345 let version = semver::Version::parse(&manifest.version).with_context(|| {
346 format!("invalid version for extension {extension_id} version {version}")
347 })?;
348 Ok(NewExtensionVersion {
349 name: manifest.name,
350 version,
351 description: manifest.description.unwrap_or_default(),
352 authors: manifest.authors,
353 repository: manifest.repository,
354 schema_version: manifest.schema_version.unwrap_or(0),
355 wasm_api_version: manifest.wasm_api_version,
356 published_at,
357 })
358}