1use crate::{
2 db::{ExtensionMetadata, NewExtensionVersion},
3 AppState, Error, Result,
4};
5use anyhow::{anyhow, Context as _};
6use aws_sdk_s3::presigning::PresigningConfig;
7use axum::{
8 extract::{Path, Query},
9 http::StatusCode,
10 response::Redirect,
11 routing::get,
12 Extension, Json, Router,
13};
14use collections::HashMap;
15use serde::{Deserialize, Serialize};
16use std::{sync::Arc, time::Duration};
17use time::PrimitiveDateTime;
18use util::ResultExt;
19
20pub fn router() -> Router {
21 Router::new()
22 .route("/extensions", get(get_extensions))
23 .route(
24 "/extensions/:extension_id/download",
25 get(download_latest_extension),
26 )
27 .route(
28 "/extensions/:extension_id/:version/download",
29 get(download_extension),
30 )
31}
32
33#[derive(Debug, Deserialize)]
34struct GetExtensionsParams {
35 filter: Option<String>,
36}
37
38#[derive(Debug, Deserialize)]
39struct DownloadLatestExtensionParams {
40 extension_id: String,
41}
42
43#[derive(Debug, Deserialize)]
44struct DownloadExtensionParams {
45 extension_id: String,
46 version: String,
47}
48
49#[derive(Debug, Serialize)]
50struct GetExtensionsResponse {
51 pub data: Vec<ExtensionMetadata>,
52}
53
54#[derive(Deserialize)]
55struct ExtensionManifest {
56 name: String,
57 version: String,
58 description: Option<String>,
59 authors: Vec<String>,
60 repository: String,
61}
62
63async fn get_extensions(
64 Extension(app): Extension<Arc<AppState>>,
65 Query(params): Query<GetExtensionsParams>,
66) -> Result<Json<GetExtensionsResponse>> {
67 let extensions = app.db.get_extensions(params.filter.as_deref(), 500).await?;
68 Ok(Json(GetExtensionsResponse { data: extensions }))
69}
70
71async fn download_latest_extension(
72 Extension(app): Extension<Arc<AppState>>,
73 Path(params): Path<DownloadLatestExtensionParams>,
74) -> Result<Redirect> {
75 let extension = app
76 .db
77 .get_extension(¶ms.extension_id)
78 .await?
79 .ok_or_else(|| anyhow!("unknown extension"))?;
80 download_extension(
81 Extension(app),
82 Path(DownloadExtensionParams {
83 extension_id: params.extension_id,
84 version: extension.version,
85 }),
86 )
87 .await
88}
89
90async fn download_extension(
91 Extension(app): Extension<Arc<AppState>>,
92 Path(params): Path<DownloadExtensionParams>,
93) -> Result<Redirect> {
94 let Some((blob_store_client, bucket)) = app
95 .blob_store_client
96 .clone()
97 .zip(app.config.blob_store_bucket.clone())
98 else {
99 Err(Error::Http(
100 StatusCode::NOT_IMPLEMENTED,
101 "not supported".into(),
102 ))?
103 };
104
105 let DownloadExtensionParams {
106 extension_id,
107 version,
108 } = params;
109
110 let version_exists = app
111 .db
112 .record_extension_download(&extension_id, &version)
113 .await?;
114
115 if !version_exists {
116 Err(Error::Http(
117 StatusCode::NOT_FOUND,
118 "unknown extension version".into(),
119 ))?;
120 }
121
122 let url = blob_store_client
123 .get_object()
124 .bucket(bucket)
125 .key(format!(
126 "extensions/{extension_id}/{version}/archive.tar.gz"
127 ))
128 .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
129 .await
130 .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
131
132 Ok(Redirect::temporary(url.uri()))
133}
134
135const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
136const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
137
138pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>) {
139 let Some(blob_store_client) = app_state.blob_store_client.clone() else {
140 log::info!("no blob store client");
141 return;
142 };
143 let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
144 log::info!("no blob store bucket");
145 return;
146 };
147
148 let executor = app_state.executor.clone();
149 executor.spawn_detached({
150 let executor = executor.clone();
151 async move {
152 loop {
153 fetch_extensions_from_blob_store(
154 &blob_store_client,
155 &blob_store_bucket,
156 &app_state,
157 )
158 .await
159 .log_err();
160 executor.sleep(EXTENSION_FETCH_INTERVAL).await;
161 }
162 }
163 });
164}
165
166async fn fetch_extensions_from_blob_store(
167 blob_store_client: &aws_sdk_s3::Client,
168 blob_store_bucket: &String,
169 app_state: &Arc<AppState>,
170) -> anyhow::Result<()> {
171 log::info!("fetching extensions from blob store");
172
173 let list = blob_store_client
174 .list_objects()
175 .bucket(blob_store_bucket)
176 .prefix("extensions/")
177 .send()
178 .await?;
179
180 let objects = list.contents.unwrap_or_default();
181
182 let mut published_versions = HashMap::<&str, Vec<&str>>::default();
183 for object in &objects {
184 let Some(key) = object.key.as_ref() else {
185 continue;
186 };
187 let mut parts = key.split('/');
188 let Some(_) = parts.next().filter(|part| *part == "extensions") else {
189 continue;
190 };
191 let Some(extension_id) = parts.next() else {
192 continue;
193 };
194 let Some(version) = parts.next() else {
195 continue;
196 };
197 if parts.next() == Some("manifest.json") {
198 published_versions
199 .entry(extension_id)
200 .or_default()
201 .push(version);
202 }
203 }
204
205 let known_versions = app_state.db.get_known_extension_versions().await?;
206
207 let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
208 let empty = Vec::new();
209 for (extension_id, published_versions) in published_versions {
210 let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
211
212 for published_version in published_versions {
213 if known_versions
214 .binary_search_by_key(&published_version, String::as_str)
215 .is_err()
216 {
217 if let Some(extension) = fetch_extension_manifest(
218 blob_store_client,
219 blob_store_bucket,
220 extension_id,
221 published_version,
222 )
223 .await
224 .log_err()
225 {
226 new_versions
227 .entry(extension_id)
228 .or_default()
229 .push(extension);
230 }
231 }
232 }
233 }
234
235 app_state
236 .db
237 .insert_extension_versions(&new_versions)
238 .await?;
239
240 log::info!(
241 "fetched {} new extensions from blob store",
242 new_versions.values().map(|v| v.len()).sum::<usize>()
243 );
244
245 Ok(())
246}
247
248async fn fetch_extension_manifest(
249 blob_store_client: &aws_sdk_s3::Client,
250 blob_store_bucket: &String,
251 extension_id: &str,
252 version: &str,
253) -> Result<NewExtensionVersion, anyhow::Error> {
254 let object = blob_store_client
255 .get_object()
256 .bucket(blob_store_bucket)
257 .key(format!("extensions/{extension_id}/{version}/manifest.json"))
258 .send()
259 .await?;
260 let manifest_bytes = object
261 .body
262 .collect()
263 .await
264 .map(|data| data.into_bytes())
265 .with_context(|| {
266 format!("failed to download manifest for extension {extension_id} version {version}")
267 })?
268 .to_vec();
269 let manifest =
270 serde_json::from_slice::<ExtensionManifest>(&manifest_bytes).with_context(|| {
271 format!(
272 "invalid manifest for extension {extension_id} version {version}: {}",
273 String::from_utf8_lossy(&manifest_bytes)
274 )
275 })?;
276 let published_at = object.last_modified.ok_or_else(|| {
277 anyhow!("missing last modified timestamp for extension {extension_id} version {version}")
278 })?;
279 let published_at = time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
280 let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
281 let version = semver::Version::parse(&manifest.version).with_context(|| {
282 format!("invalid version for extension {extension_id} version {version}")
283 })?;
284 Ok(NewExtensionVersion {
285 name: manifest.name,
286 version,
287 description: manifest.description.unwrap_or_default(),
288 authors: manifest.authors,
289 repository: manifest.repository,
290 published_at,
291 })
292}