extensions.rs

  1use crate::{
  2    db::{ExtensionMetadata, NewExtensionVersion},
  3    AppState, Error, Result,
  4};
  5use anyhow::{anyhow, Context as _};
  6use aws_sdk_s3::presigning::PresigningConfig;
  7use axum::{
  8    extract::{Path, Query},
  9    http::StatusCode,
 10    response::Redirect,
 11    routing::get,
 12    Extension, Json, Router,
 13};
 14use collections::HashMap;
 15use serde::{Deserialize, Serialize};
 16use std::{sync::Arc, time::Duration};
 17use time::PrimitiveDateTime;
 18use util::ResultExt;
 19
 20pub fn router() -> Router {
 21    Router::new()
 22        .route("/extensions", get(get_extensions))
 23        .route(
 24            "/extensions/:extension_id/download",
 25            get(download_latest_extension),
 26        )
 27        .route(
 28            "/extensions/:extension_id/:version/download",
 29            get(download_extension),
 30        )
 31}
 32
 33#[derive(Debug, Deserialize)]
 34struct GetExtensionsParams {
 35    filter: Option<String>,
 36}
 37
 38#[derive(Debug, Deserialize)]
 39struct DownloadLatestExtensionParams {
 40    extension_id: String,
 41}
 42
 43#[derive(Debug, Deserialize)]
 44struct DownloadExtensionParams {
 45    extension_id: String,
 46    version: String,
 47}
 48
 49#[derive(Debug, Serialize)]
 50struct GetExtensionsResponse {
 51    pub data: Vec<ExtensionMetadata>,
 52}
 53
 54#[derive(Deserialize)]
 55struct ExtensionManifest {
 56    name: String,
 57    version: String,
 58    description: Option<String>,
 59    authors: Vec<String>,
 60    repository: String,
 61}
 62
 63async fn get_extensions(
 64    Extension(app): Extension<Arc<AppState>>,
 65    Query(params): Query<GetExtensionsParams>,
 66) -> Result<Json<GetExtensionsResponse>> {
 67    let extensions = app.db.get_extensions(params.filter.as_deref(), 500).await?;
 68    Ok(Json(GetExtensionsResponse { data: extensions }))
 69}
 70
 71async fn download_latest_extension(
 72    Extension(app): Extension<Arc<AppState>>,
 73    Path(params): Path<DownloadLatestExtensionParams>,
 74) -> Result<Redirect> {
 75    let extension = app
 76        .db
 77        .get_extension(&params.extension_id)
 78        .await?
 79        .ok_or_else(|| anyhow!("unknown extension"))?;
 80    download_extension(
 81        Extension(app),
 82        Path(DownloadExtensionParams {
 83            extension_id: params.extension_id,
 84            version: extension.version,
 85        }),
 86    )
 87    .await
 88}
 89
 90async fn download_extension(
 91    Extension(app): Extension<Arc<AppState>>,
 92    Path(params): Path<DownloadExtensionParams>,
 93) -> Result<Redirect> {
 94    let Some((blob_store_client, bucket)) = app
 95        .blob_store_client
 96        .clone()
 97        .zip(app.config.blob_store_bucket.clone())
 98    else {
 99        Err(Error::Http(
100            StatusCode::NOT_IMPLEMENTED,
101            "not supported".into(),
102        ))?
103    };
104
105    let DownloadExtensionParams {
106        extension_id,
107        version,
108    } = params;
109
110    let version_exists = app
111        .db
112        .record_extension_download(&extension_id, &version)
113        .await?;
114
115    if !version_exists {
116        Err(Error::Http(
117            StatusCode::NOT_FOUND,
118            "unknown extension version".into(),
119        ))?;
120    }
121
122    let url = blob_store_client
123        .get_object()
124        .bucket(bucket)
125        .key(format!(
126            "extensions/{extension_id}/{version}/archive.tar.gz"
127        ))
128        .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
129        .await
130        .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
131
132    Ok(Redirect::temporary(url.uri()))
133}
134
135const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
136const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
137
138pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>) {
139    let Some(blob_store_client) = app_state.blob_store_client.clone() else {
140        log::info!("no blob store client");
141        return;
142    };
143    let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
144        log::info!("no blob store bucket");
145        return;
146    };
147
148    let executor = app_state.executor.clone();
149    executor.spawn_detached({
150        let executor = executor.clone();
151        async move {
152            loop {
153                fetch_extensions_from_blob_store(
154                    &blob_store_client,
155                    &blob_store_bucket,
156                    &app_state,
157                )
158                .await
159                .log_err();
160                executor.sleep(EXTENSION_FETCH_INTERVAL).await;
161            }
162        }
163    });
164}
165
166async fn fetch_extensions_from_blob_store(
167    blob_store_client: &aws_sdk_s3::Client,
168    blob_store_bucket: &String,
169    app_state: &Arc<AppState>,
170) -> anyhow::Result<()> {
171    log::info!("fetching extensions from blob store");
172
173    let list = blob_store_client
174        .list_objects()
175        .bucket(blob_store_bucket)
176        .prefix("extensions/")
177        .send()
178        .await?;
179
180    let objects = list.contents.unwrap_or_default();
181
182    let mut published_versions = HashMap::<&str, Vec<&str>>::default();
183    for object in &objects {
184        let Some(key) = object.key.as_ref() else {
185            continue;
186        };
187        let mut parts = key.split('/');
188        let Some(_) = parts.next().filter(|part| *part == "extensions") else {
189            continue;
190        };
191        let Some(extension_id) = parts.next() else {
192            continue;
193        };
194        let Some(version) = parts.next() else {
195            continue;
196        };
197        if parts.next() == Some("manifest.json") {
198            published_versions
199                .entry(extension_id)
200                .or_default()
201                .push(version);
202        }
203    }
204
205    let known_versions = app_state.db.get_known_extension_versions().await?;
206
207    let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
208    let empty = Vec::new();
209    for (extension_id, published_versions) in published_versions {
210        let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
211
212        for published_version in published_versions {
213            if known_versions
214                .binary_search_by_key(&published_version, String::as_str)
215                .is_err()
216            {
217                if let Some(extension) = fetch_extension_manifest(
218                    blob_store_client,
219                    blob_store_bucket,
220                    extension_id,
221                    published_version,
222                )
223                .await
224                .log_err()
225                {
226                    new_versions
227                        .entry(extension_id)
228                        .or_default()
229                        .push(extension);
230                }
231            }
232        }
233    }
234
235    app_state
236        .db
237        .insert_extension_versions(&new_versions)
238        .await?;
239
240    log::info!(
241        "fetched {} new extensions from blob store",
242        new_versions.values().map(|v| v.len()).sum::<usize>()
243    );
244
245    Ok(())
246}
247
248async fn fetch_extension_manifest(
249    blob_store_client: &aws_sdk_s3::Client,
250    blob_store_bucket: &String,
251    extension_id: &str,
252    version: &str,
253) -> Result<NewExtensionVersion, anyhow::Error> {
254    let object = blob_store_client
255        .get_object()
256        .bucket(blob_store_bucket)
257        .key(format!("extensions/{extension_id}/{version}/manifest.json"))
258        .send()
259        .await?;
260    let manifest_bytes = object
261        .body
262        .collect()
263        .await
264        .map(|data| data.into_bytes())
265        .with_context(|| {
266            format!("failed to download manifest for extension {extension_id} version {version}")
267        })?
268        .to_vec();
269    let manifest =
270        serde_json::from_slice::<ExtensionManifest>(&manifest_bytes).with_context(|| {
271            format!(
272                "invalid manifest for extension {extension_id} version {version}: {}",
273                String::from_utf8_lossy(&manifest_bytes)
274            )
275        })?;
276    let published_at = object.last_modified.ok_or_else(|| {
277        anyhow!("missing last modified timestamp for extension {extension_id} version {version}")
278    })?;
279    let published_at = time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
280    let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
281    let version = semver::Version::parse(&manifest.version).with_context(|| {
282        format!("invalid version for extension {extension_id} version {version}")
283    })?;
284    Ok(NewExtensionVersion {
285        name: manifest.name,
286        version,
287        description: manifest.description.unwrap_or_default(),
288        authors: manifest.authors,
289        repository: manifest.repository,
290        published_at,
291    })
292}