1use crate::db::ExtensionVersionConstraints;
2use crate::{db::NewExtensionVersion, AppState, Error, Result};
3use anyhow::{anyhow, Context as _};
4use aws_sdk_s3::presigning::PresigningConfig;
5use axum::{
6 extract::{Path, Query},
7 http::StatusCode,
8 response::Redirect,
9 routing::get,
10 Extension, Json, Router,
11};
12use collections::HashMap;
13use rpc::{ExtensionApiManifest, GetExtensionsResponse};
14use semantic_version::SemanticVersion;
15use serde::Deserialize;
16use std::{sync::Arc, time::Duration};
17use time::PrimitiveDateTime;
18use util::{maybe, ResultExt};
19
20pub fn router() -> Router {
21 Router::new()
22 .route("/extensions", get(get_extensions))
23 .route("/extensions/updates", get(get_extension_updates))
24 .route("/extensions/:extension_id", get(get_extension_versions))
25 .route(
26 "/extensions/:extension_id/download",
27 get(download_latest_extension),
28 )
29 .route(
30 "/extensions/:extension_id/:version/download",
31 get(download_extension),
32 )
33}
34
35#[derive(Debug, Deserialize)]
36struct GetExtensionsParams {
37 filter: Option<String>,
38 #[serde(default)]
39 ids: Option<String>,
40 #[serde(default)]
41 max_schema_version: i32,
42}
43
44async fn get_extensions(
45 Extension(app): Extension<Arc<AppState>>,
46 Query(params): Query<GetExtensionsParams>,
47) -> Result<Json<GetExtensionsResponse>> {
48 let extension_ids = params
49 .ids
50 .as_ref()
51 .map(|s| s.split(',').map(|s| s.trim()).collect::<Vec<_>>());
52
53 let extensions = if let Some(extension_ids) = extension_ids {
54 app.db.get_extensions_by_ids(&extension_ids, None).await?
55 } else {
56 app.db
57 .get_extensions(params.filter.as_deref(), params.max_schema_version, 500)
58 .await?
59 };
60
61 Ok(Json(GetExtensionsResponse { data: extensions }))
62}
63
64#[derive(Debug, Deserialize)]
65struct GetExtensionUpdatesParams {
66 ids: String,
67 min_schema_version: i32,
68 max_schema_version: i32,
69 min_wasm_api_version: SemanticVersion,
70 max_wasm_api_version: SemanticVersion,
71}
72
73async fn get_extension_updates(
74 Extension(app): Extension<Arc<AppState>>,
75 Query(params): Query<GetExtensionUpdatesParams>,
76) -> Result<Json<GetExtensionsResponse>> {
77 let constraints = ExtensionVersionConstraints {
78 schema_versions: params.min_schema_version..=params.max_schema_version,
79 wasm_api_versions: params.min_wasm_api_version..=params.max_wasm_api_version,
80 };
81
82 let extension_ids = params.ids.split(',').map(|s| s.trim()).collect::<Vec<_>>();
83
84 let extensions = app
85 .db
86 .get_extensions_by_ids(&extension_ids, Some(&constraints))
87 .await?;
88
89 Ok(Json(GetExtensionsResponse { data: extensions }))
90}
91
92#[derive(Debug, Deserialize)]
93struct GetExtensionVersionsParams {
94 extension_id: String,
95}
96
97async fn get_extension_versions(
98 Extension(app): Extension<Arc<AppState>>,
99 Path(params): Path<GetExtensionVersionsParams>,
100) -> Result<Json<GetExtensionsResponse>> {
101 let extension_versions = app.db.get_extension_versions(¶ms.extension_id).await?;
102
103 Ok(Json(GetExtensionsResponse {
104 data: extension_versions,
105 }))
106}
107
108#[derive(Debug, Deserialize)]
109struct DownloadLatestExtensionPathParams {
110 extension_id: String,
111}
112
113#[derive(Debug, Deserialize)]
114struct DownloadLatestExtensionQueryParams {
115 min_schema_version: Option<i32>,
116 max_schema_version: Option<i32>,
117 min_wasm_api_version: Option<SemanticVersion>,
118 max_wasm_api_version: Option<SemanticVersion>,
119}
120
121async fn download_latest_extension(
122 Extension(app): Extension<Arc<AppState>>,
123 Path(params): Path<DownloadLatestExtensionPathParams>,
124 Query(query): Query<DownloadLatestExtensionQueryParams>,
125) -> Result<Redirect> {
126 let constraints = maybe!({
127 let min_schema_version = query.min_schema_version?;
128 let max_schema_version = query.max_schema_version?;
129 let min_wasm_api_version = query.min_wasm_api_version?;
130 let max_wasm_api_version = query.max_wasm_api_version?;
131
132 Some(ExtensionVersionConstraints {
133 schema_versions: min_schema_version..=max_schema_version,
134 wasm_api_versions: min_wasm_api_version..=max_wasm_api_version,
135 })
136 });
137
138 let extension = app
139 .db
140 .get_extension(¶ms.extension_id, constraints.as_ref())
141 .await?
142 .ok_or_else(|| anyhow!("unknown extension"))?;
143 download_extension(
144 Extension(app),
145 Path(DownloadExtensionParams {
146 extension_id: params.extension_id,
147 version: extension.manifest.version.to_string(),
148 }),
149 )
150 .await
151}
152
153#[derive(Debug, Deserialize)]
154struct DownloadExtensionParams {
155 extension_id: String,
156 version: String,
157}
158
159async fn download_extension(
160 Extension(app): Extension<Arc<AppState>>,
161 Path(params): Path<DownloadExtensionParams>,
162) -> Result<Redirect> {
163 let Some((blob_store_client, bucket)) = app
164 .blob_store_client
165 .clone()
166 .zip(app.config.blob_store_bucket.clone())
167 else {
168 Err(Error::Http(
169 StatusCode::NOT_IMPLEMENTED,
170 "not supported".into(),
171 ))?
172 };
173
174 let DownloadExtensionParams {
175 extension_id,
176 version,
177 } = params;
178
179 let version_exists = app
180 .db
181 .record_extension_download(&extension_id, &version)
182 .await?;
183
184 if !version_exists {
185 Err(Error::Http(
186 StatusCode::NOT_FOUND,
187 "unknown extension version".into(),
188 ))?;
189 }
190
191 let url = blob_store_client
192 .get_object()
193 .bucket(bucket)
194 .key(format!(
195 "extensions/{extension_id}/{version}/archive.tar.gz"
196 ))
197 .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
198 .await
199 .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
200
201 Ok(Redirect::temporary(url.uri()))
202}
203
204const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
205const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
206
207pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>) {
208 let Some(blob_store_client) = app_state.blob_store_client.clone() else {
209 log::info!("no blob store client");
210 return;
211 };
212 let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
213 log::info!("no blob store bucket");
214 return;
215 };
216
217 let executor = app_state.executor.clone();
218 executor.spawn_detached({
219 let executor = executor.clone();
220 async move {
221 loop {
222 fetch_extensions_from_blob_store(
223 &blob_store_client,
224 &blob_store_bucket,
225 &app_state,
226 )
227 .await
228 .log_err();
229 executor.sleep(EXTENSION_FETCH_INTERVAL).await;
230 }
231 }
232 });
233}
234
235async fn fetch_extensions_from_blob_store(
236 blob_store_client: &aws_sdk_s3::Client,
237 blob_store_bucket: &String,
238 app_state: &Arc<AppState>,
239) -> anyhow::Result<()> {
240 log::info!("fetching extensions from blob store");
241
242 let list = blob_store_client
243 .list_objects()
244 .bucket(blob_store_bucket)
245 .prefix("extensions/")
246 .send()
247 .await?;
248
249 let objects = list.contents.unwrap_or_default();
250
251 let mut published_versions = HashMap::<&str, Vec<&str>>::default();
252 for object in &objects {
253 let Some(key) = object.key.as_ref() else {
254 continue;
255 };
256 let mut parts = key.split('/');
257 let Some(_) = parts.next().filter(|part| *part == "extensions") else {
258 continue;
259 };
260 let Some(extension_id) = parts.next() else {
261 continue;
262 };
263 let Some(version) = parts.next() else {
264 continue;
265 };
266 if parts.next() == Some("manifest.json") {
267 published_versions
268 .entry(extension_id)
269 .or_default()
270 .push(version);
271 }
272 }
273
274 let known_versions = app_state.db.get_known_extension_versions().await?;
275
276 let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
277 let empty = Vec::new();
278 for (extension_id, published_versions) in published_versions {
279 let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
280
281 for published_version in published_versions {
282 if known_versions
283 .binary_search_by_key(&published_version, String::as_str)
284 .is_err()
285 {
286 if let Some(extension) = fetch_extension_manifest(
287 blob_store_client,
288 blob_store_bucket,
289 extension_id,
290 published_version,
291 )
292 .await
293 .log_err()
294 {
295 new_versions
296 .entry(extension_id)
297 .or_default()
298 .push(extension);
299 }
300 }
301 }
302 }
303
304 app_state
305 .db
306 .insert_extension_versions(&new_versions)
307 .await?;
308
309 log::info!(
310 "fetched {} new extensions from blob store",
311 new_versions.values().map(|v| v.len()).sum::<usize>()
312 );
313
314 Ok(())
315}
316
317async fn fetch_extension_manifest(
318 blob_store_client: &aws_sdk_s3::Client,
319 blob_store_bucket: &String,
320 extension_id: &str,
321 version: &str,
322) -> Result<NewExtensionVersion, anyhow::Error> {
323 let object = blob_store_client
324 .get_object()
325 .bucket(blob_store_bucket)
326 .key(format!("extensions/{extension_id}/{version}/manifest.json"))
327 .send()
328 .await?;
329 let manifest_bytes = object
330 .body
331 .collect()
332 .await
333 .map(|data| data.into_bytes())
334 .with_context(|| {
335 format!("failed to download manifest for extension {extension_id} version {version}")
336 })?
337 .to_vec();
338 let manifest =
339 serde_json::from_slice::<ExtensionApiManifest>(&manifest_bytes).with_context(|| {
340 format!(
341 "invalid manifest for extension {extension_id} version {version}: {}",
342 String::from_utf8_lossy(&manifest_bytes)
343 )
344 })?;
345 let published_at = object.last_modified.ok_or_else(|| {
346 anyhow!("missing last modified timestamp for extension {extension_id} version {version}")
347 })?;
348 let published_at = time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
349 let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
350 let version = semver::Version::parse(&manifest.version).with_context(|| {
351 format!("invalid version for extension {extension_id} version {version}")
352 })?;
353 Ok(NewExtensionVersion {
354 name: manifest.name,
355 version,
356 description: manifest.description.unwrap_or_default(),
357 authors: manifest.authors,
358 repository: manifest.repository,
359 schema_version: manifest.schema_version.unwrap_or(0),
360 wasm_api_version: manifest.wasm_api_version,
361 published_at,
362 })
363}