1use crate::{
2 db::{ExtensionMetadata, NewExtensionVersion},
3 executor::Executor,
4 AppState, Error, Result,
5};
6use anyhow::{anyhow, Context as _};
7use aws_sdk_s3::presigning::PresigningConfig;
8use axum::{
9 extract::{Path, Query},
10 http::StatusCode,
11 response::Redirect,
12 routing::get,
13 Extension, Json, Router,
14};
15use collections::HashMap;
16use serde::{Deserialize, Serialize};
17use std::{sync::Arc, time::Duration};
18use time::PrimitiveDateTime;
19use util::ResultExt;
20
21pub fn router() -> Router {
22 Router::new()
23 .route("/extensions", get(get_extensions))
24 .route(
25 "/extensions/:extension_id/:version/download",
26 get(download_extension),
27 )
28}
29
30#[derive(Debug, Deserialize)]
31struct GetExtensionsParams {
32 filter: Option<String>,
33}
34
35#[derive(Debug, Deserialize)]
36struct DownloadExtensionParams {
37 extension_id: String,
38 version: String,
39}
40
41#[derive(Debug, Serialize)]
42struct GetExtensionsResponse {
43 pub data: Vec<ExtensionMetadata>,
44}
45
46#[derive(Deserialize)]
47struct ExtensionManifest {
48 name: String,
49 version: String,
50 description: Option<String>,
51 authors: Vec<String>,
52 repository: String,
53}
54
55async fn get_extensions(
56 Extension(app): Extension<Arc<AppState>>,
57 Query(params): Query<GetExtensionsParams>,
58) -> Result<Json<GetExtensionsResponse>> {
59 let extensions = app.db.get_extensions(params.filter.as_deref(), 500).await?;
60 Ok(Json(GetExtensionsResponse { data: extensions }))
61}
62
63async fn download_extension(
64 Extension(app): Extension<Arc<AppState>>,
65 Path(params): Path<DownloadExtensionParams>,
66) -> Result<Redirect> {
67 let Some((blob_store_client, bucket)) = app
68 .blob_store_client
69 .clone()
70 .zip(app.config.blob_store_bucket.clone())
71 else {
72 Err(Error::Http(
73 StatusCode::NOT_IMPLEMENTED,
74 "not supported".into(),
75 ))?
76 };
77
78 let DownloadExtensionParams {
79 extension_id,
80 version,
81 } = params;
82
83 let version_exists = app
84 .db
85 .record_extension_download(&extension_id, &version)
86 .await?;
87
88 if !version_exists {
89 Err(Error::Http(
90 StatusCode::NOT_FOUND,
91 "unknown extension version".into(),
92 ))?;
93 }
94
95 let url = blob_store_client
96 .get_object()
97 .bucket(bucket)
98 .key(format!(
99 "extensions/{extension_id}/{version}/archive.tar.gz"
100 ))
101 .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
102 .await
103 .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
104
105 Ok(Redirect::temporary(url.uri()))
106}
107
108const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
109const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
110
111pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>, executor: Executor) {
112 let Some(blob_store_client) = app_state.blob_store_client.clone() else {
113 log::info!("no blob store client");
114 return;
115 };
116 let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
117 log::info!("no blob store bucket");
118 return;
119 };
120
121 executor.spawn_detached({
122 let executor = executor.clone();
123 async move {
124 loop {
125 fetch_extensions_from_blob_store(
126 &blob_store_client,
127 &blob_store_bucket,
128 &app_state,
129 )
130 .await
131 .log_err();
132 executor.sleep(EXTENSION_FETCH_INTERVAL).await;
133 }
134 }
135 });
136}
137
138async fn fetch_extensions_from_blob_store(
139 blob_store_client: &aws_sdk_s3::Client,
140 blob_store_bucket: &String,
141 app_state: &Arc<AppState>,
142) -> anyhow::Result<()> {
143 log::info!("fetching extensions from blob store");
144
145 let list = blob_store_client
146 .list_objects()
147 .bucket(blob_store_bucket)
148 .prefix("extensions/")
149 .send()
150 .await?;
151
152 let objects = list.contents.unwrap_or_default();
153
154 let mut published_versions = HashMap::<&str, Vec<&str>>::default();
155 for object in &objects {
156 let Some(key) = object.key.as_ref() else {
157 continue;
158 };
159 let mut parts = key.split('/');
160 let Some(_) = parts.next().filter(|part| *part == "extensions") else {
161 continue;
162 };
163 let Some(extension_id) = parts.next() else {
164 continue;
165 };
166 let Some(version) = parts.next() else {
167 continue;
168 };
169 if parts.next() == Some("manifest.json") {
170 published_versions
171 .entry(extension_id)
172 .or_default()
173 .push(version);
174 }
175 }
176
177 let known_versions = app_state.db.get_known_extension_versions().await?;
178
179 let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
180 let empty = Vec::new();
181 for (extension_id, published_versions) in published_versions {
182 let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
183
184 for published_version in published_versions {
185 if known_versions
186 .binary_search_by_key(&published_version, String::as_str)
187 .is_err()
188 {
189 if let Some(extension) = fetch_extension_manifest(
190 blob_store_client,
191 blob_store_bucket,
192 extension_id,
193 published_version,
194 )
195 .await
196 .log_err()
197 {
198 new_versions
199 .entry(extension_id)
200 .or_default()
201 .push(extension);
202 }
203 }
204 }
205 }
206
207 app_state
208 .db
209 .insert_extension_versions(&new_versions)
210 .await?;
211
212 log::info!(
213 "fetched {} new extensions from blob store",
214 new_versions.values().map(|v| v.len()).sum::<usize>()
215 );
216
217 Ok(())
218}
219
220async fn fetch_extension_manifest(
221 blob_store_client: &aws_sdk_s3::Client,
222 blob_store_bucket: &String,
223 extension_id: &str,
224 version: &str,
225) -> Result<NewExtensionVersion, anyhow::Error> {
226 let object = blob_store_client
227 .get_object()
228 .bucket(blob_store_bucket)
229 .key(format!("extensions/{extension_id}/{version}/manifest.json"))
230 .send()
231 .await?;
232 let manifest_bytes = object
233 .body
234 .collect()
235 .await
236 .map(|data| data.into_bytes())
237 .with_context(|| {
238 format!("failed to download manifest for extension {extension_id} version {version}")
239 })?
240 .to_vec();
241 let manifest =
242 serde_json::from_slice::<ExtensionManifest>(&manifest_bytes).with_context(|| {
243 format!(
244 "invalid manifest for extension {extension_id} version {version}: {}",
245 String::from_utf8_lossy(&manifest_bytes)
246 )
247 })?;
248 let published_at = object.last_modified.ok_or_else(|| {
249 anyhow!("missing last modified timestamp for extension {extension_id} version {version}")
250 })?;
251 let published_at = time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
252 let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
253 let version = semver::Version::parse(&manifest.version).with_context(|| {
254 format!("invalid version for extension {extension_id} version {version}")
255 })?;
256 Ok(NewExtensionVersion {
257 name: manifest.name,
258 version,
259 description: manifest.description.unwrap_or_default(),
260 authors: manifest.authors,
261 repository: manifest.repository,
262 published_at,
263 })
264}