extensions.rs

  1use crate::{
  2    db::{ExtensionMetadata, NewExtensionVersion},
  3    executor::Executor,
  4    AppState, Error, Result,
  5};
  6use anyhow::{anyhow, Context as _};
  7use aws_sdk_s3::presigning::PresigningConfig;
  8use axum::{
  9    extract::{Path, Query},
 10    response::Redirect,
 11    routing::get,
 12    Extension, Json, Router,
 13};
 14use collections::HashMap;
 15use hyper::StatusCode;
 16use serde::{Deserialize, Serialize};
 17use std::{sync::Arc, time::Duration};
 18use time::PrimitiveDateTime;
 19use util::ResultExt;
 20
 21pub fn router() -> Router {
 22    Router::new()
 23        .route("/extensions", get(get_extensions))
 24        .route(
 25            "/extensions/:extension_id/:version/download",
 26            get(download_extension),
 27        )
 28}
 29
 30#[derive(Debug, Deserialize)]
 31struct GetExtensionsParams {
 32    filter: Option<String>,
 33}
 34
 35#[derive(Debug, Deserialize)]
 36struct DownloadExtensionParams {
 37    extension_id: String,
 38    version: String,
 39}
 40
 41#[derive(Debug, Serialize)]
 42struct GetExtensionsResponse {
 43    pub data: Vec<ExtensionMetadata>,
 44}
 45
 46#[derive(Deserialize)]
 47struct ExtensionManifest {
 48    name: String,
 49    version: String,
 50    description: Option<String>,
 51    authors: Vec<String>,
 52    repository: String,
 53}
 54
 55async fn get_extensions(
 56    Extension(app): Extension<Arc<AppState>>,
 57    Query(params): Query<GetExtensionsParams>,
 58) -> Result<Json<GetExtensionsResponse>> {
 59    let extensions = app.db.get_extensions(params.filter.as_deref(), 500).await?;
 60    Ok(Json(GetExtensionsResponse { data: extensions }))
 61}
 62
 63async fn download_extension(
 64    Extension(app): Extension<Arc<AppState>>,
 65    Path(params): Path<DownloadExtensionParams>,
 66) -> Result<Redirect> {
 67    let Some((blob_store_client, bucket)) = app
 68        .blob_store_client
 69        .clone()
 70        .zip(app.config.blob_store_bucket.clone())
 71    else {
 72        Err(Error::Http(
 73            StatusCode::NOT_IMPLEMENTED,
 74            "not supported".into(),
 75        ))?
 76    };
 77
 78    let DownloadExtensionParams {
 79        extension_id,
 80        version,
 81    } = params;
 82
 83    let version_exists = app
 84        .db
 85        .record_extension_download(&extension_id, &version)
 86        .await?;
 87
 88    if !version_exists {
 89        Err(Error::Http(
 90            StatusCode::NOT_FOUND,
 91            "unknown extension version".into(),
 92        ))?;
 93    }
 94
 95    let url = blob_store_client
 96        .get_object()
 97        .bucket(bucket)
 98        .key(format!(
 99            "extensions/{extension_id}/{version}/archive.tar.gz"
100        ))
101        .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
102        .await
103        .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
104
105    Ok(Redirect::temporary(url.uri()))
106}
107
108const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
109const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
110
111pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>, executor: Executor) {
112    let Some(blob_store_client) = app_state.blob_store_client.clone() else {
113        log::info!("no blob store client");
114        return;
115    };
116    let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
117        log::info!("no blob store bucket");
118        return;
119    };
120
121    executor.spawn_detached({
122        let executor = executor.clone();
123        async move {
124            loop {
125                fetch_extensions_from_blob_store(
126                    &blob_store_client,
127                    &blob_store_bucket,
128                    &app_state,
129                )
130                .await
131                .log_err();
132                executor.sleep(EXTENSION_FETCH_INTERVAL).await;
133            }
134        }
135    });
136}
137
138async fn fetch_extensions_from_blob_store(
139    blob_store_client: &aws_sdk_s3::Client,
140    blob_store_bucket: &String,
141    app_state: &Arc<AppState>,
142) -> anyhow::Result<()> {
143    let list = blob_store_client
144        .list_objects()
145        .bucket(blob_store_bucket)
146        .prefix("extensions/")
147        .send()
148        .await?;
149
150    let objects = list
151        .contents
152        .ok_or_else(|| anyhow!("missing bucket contents"))?;
153
154    let mut published_versions = HashMap::<&str, Vec<&str>>::default();
155    for object in &objects {
156        let Some(key) = object.key.as_ref() else {
157            continue;
158        };
159        let mut parts = key.split('/');
160        let Some(_) = parts.next().filter(|part| *part == "extensions") else {
161            continue;
162        };
163        let Some(extension_id) = parts.next() else {
164            continue;
165        };
166        let Some(version) = parts.next() else {
167            continue;
168        };
169        published_versions
170            .entry(extension_id)
171            .or_default()
172            .push(version);
173    }
174
175    let known_versions = app_state.db.get_known_extension_versions().await?;
176
177    let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
178    let empty = Vec::new();
179    for (extension_id, published_versions) in published_versions {
180        let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
181
182        for published_version in published_versions {
183            if known_versions
184                .binary_search_by_key(&published_version, String::as_str)
185                .is_err()
186            {
187                let object = blob_store_client
188                    .get_object()
189                    .bucket(blob_store_bucket)
190                    .key(format!(
191                        "extensions/{extension_id}/{published_version}/manifest.json"
192                    ))
193                    .send()
194                    .await?;
195                let manifest_bytes = object
196                    .body
197                    .collect()
198                    .await
199                    .map(|data| data.into_bytes())
200                    .with_context(|| format!("failed to download manifest for extension {extension_id} version {published_version}"))?
201                    .to_vec();
202                let manifest = serde_json::from_slice::<ExtensionManifest>(&manifest_bytes)
203                    .with_context(|| format!("invalid manifest for extension {extension_id} version {published_version}: {}", String::from_utf8_lossy(&manifest_bytes)))?;
204
205                let published_at = object.last_modified.ok_or_else(|| anyhow!("missing last modified timestamp for extension {extension_id} version {published_version}"))?;
206                let published_at =
207                    time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
208                let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
209
210                let version = semver::Version::parse(&manifest.version).with_context(|| {
211                    format!(
212                        "invalid version for extension {extension_id} version {published_version}"
213                    )
214                })?;
215
216                new_versions
217                    .entry(extension_id)
218                    .or_default()
219                    .push(NewExtensionVersion {
220                        name: manifest.name,
221                        version,
222                        description: manifest.description.unwrap_or_default(),
223                        authors: manifest.authors,
224                        repository: manifest.repository,
225                        published_at,
226                    });
227            }
228        }
229    }
230
231    app_state
232        .db
233        .insert_extension_versions(&new_versions)
234        .await?;
235
236    Ok(())
237}