extensions.rs

  1use crate::{
  2    db::{ExtensionMetadata, NewExtensionVersion},
  3    executor::Executor,
  4    AppState, Error, Result,
  5};
  6use anyhow::{anyhow, Context as _};
  7use aws_sdk_s3::presigning::PresigningConfig;
  8use axum::{
  9    extract::{Path, Query},
 10    http::StatusCode,
 11    response::Redirect,
 12    routing::get,
 13    Extension, Json, Router,
 14};
 15use collections::HashMap;
 16use serde::{Deserialize, Serialize};
 17use std::{sync::Arc, time::Duration};
 18use time::PrimitiveDateTime;
 19use util::ResultExt;
 20
 21pub fn router() -> Router {
 22    Router::new()
 23        .route("/extensions", get(get_extensions))
 24        .route(
 25            "/extensions/:extension_id/:version/download",
 26            get(download_extension),
 27        )
 28}
 29
 30#[derive(Debug, Deserialize)]
 31struct GetExtensionsParams {
 32    filter: Option<String>,
 33}
 34
 35#[derive(Debug, Deserialize)]
 36struct DownloadExtensionParams {
 37    extension_id: String,
 38    version: String,
 39}
 40
 41#[derive(Debug, Serialize)]
 42struct GetExtensionsResponse {
 43    pub data: Vec<ExtensionMetadata>,
 44}
 45
 46#[derive(Deserialize)]
 47struct ExtensionManifest {
 48    name: String,
 49    version: String,
 50    description: Option<String>,
 51    authors: Vec<String>,
 52    repository: String,
 53}
 54
 55async fn get_extensions(
 56    Extension(app): Extension<Arc<AppState>>,
 57    Query(params): Query<GetExtensionsParams>,
 58) -> Result<Json<GetExtensionsResponse>> {
 59    let extensions = app.db.get_extensions(params.filter.as_deref(), 500).await?;
 60    Ok(Json(GetExtensionsResponse { data: extensions }))
 61}
 62
 63async fn download_extension(
 64    Extension(app): Extension<Arc<AppState>>,
 65    Path(params): Path<DownloadExtensionParams>,
 66) -> Result<Redirect> {
 67    let Some((blob_store_client, bucket)) = app
 68        .blob_store_client
 69        .clone()
 70        .zip(app.config.blob_store_bucket.clone())
 71    else {
 72        Err(Error::Http(
 73            StatusCode::NOT_IMPLEMENTED,
 74            "not supported".into(),
 75        ))?
 76    };
 77
 78    let DownloadExtensionParams {
 79        extension_id,
 80        version,
 81    } = params;
 82
 83    let version_exists = app
 84        .db
 85        .record_extension_download(&extension_id, &version)
 86        .await?;
 87
 88    if !version_exists {
 89        Err(Error::Http(
 90            StatusCode::NOT_FOUND,
 91            "unknown extension version".into(),
 92        ))?;
 93    }
 94
 95    let url = blob_store_client
 96        .get_object()
 97        .bucket(bucket)
 98        .key(format!(
 99            "extensions/{extension_id}/{version}/archive.tar.gz"
100        ))
101        .presigned(PresigningConfig::expires_in(EXTENSION_DOWNLOAD_URL_LIFETIME).unwrap())
102        .await
103        .map_err(|e| anyhow!("failed to create presigned extension download url {e}"))?;
104
105    Ok(Redirect::temporary(url.uri()))
106}
107
108const EXTENSION_FETCH_INTERVAL: Duration = Duration::from_secs(5 * 60);
109const EXTENSION_DOWNLOAD_URL_LIFETIME: Duration = Duration::from_secs(3 * 60);
110
111pub fn fetch_extensions_from_blob_store_periodically(app_state: Arc<AppState>, executor: Executor) {
112    let Some(blob_store_client) = app_state.blob_store_client.clone() else {
113        log::info!("no blob store client");
114        return;
115    };
116    let Some(blob_store_bucket) = app_state.config.blob_store_bucket.clone() else {
117        log::info!("no blob store bucket");
118        return;
119    };
120
121    executor.spawn_detached({
122        let executor = executor.clone();
123        async move {
124            loop {
125                fetch_extensions_from_blob_store(
126                    &blob_store_client,
127                    &blob_store_bucket,
128                    &app_state,
129                )
130                .await
131                .log_err();
132                executor.sleep(EXTENSION_FETCH_INTERVAL).await;
133            }
134        }
135    });
136}
137
138async fn fetch_extensions_from_blob_store(
139    blob_store_client: &aws_sdk_s3::Client,
140    blob_store_bucket: &String,
141    app_state: &Arc<AppState>,
142) -> anyhow::Result<()> {
143    let list = blob_store_client
144        .list_objects()
145        .bucket(blob_store_bucket)
146        .prefix("extensions/")
147        .send()
148        .await?;
149
150    let objects = list.contents.unwrap_or_default();
151
152    let mut published_versions = HashMap::<&str, Vec<&str>>::default();
153    for object in &objects {
154        let Some(key) = object.key.as_ref() else {
155            continue;
156        };
157        let mut parts = key.split('/');
158        let Some(_) = parts.next().filter(|part| *part == "extensions") else {
159            continue;
160        };
161        let Some(extension_id) = parts.next() else {
162            continue;
163        };
164        let Some(version) = parts.next() else {
165            continue;
166        };
167        published_versions
168            .entry(extension_id)
169            .or_default()
170            .push(version);
171    }
172
173    let known_versions = app_state.db.get_known_extension_versions().await?;
174
175    let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
176    let empty = Vec::new();
177    for (extension_id, published_versions) in published_versions {
178        let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
179
180        for published_version in published_versions {
181            if known_versions
182                .binary_search_by_key(&published_version, String::as_str)
183                .is_err()
184            {
185                let object = blob_store_client
186                    .get_object()
187                    .bucket(blob_store_bucket)
188                    .key(format!(
189                        "extensions/{extension_id}/{published_version}/manifest.json"
190                    ))
191                    .send()
192                    .await?;
193                let manifest_bytes = object
194                    .body
195                    .collect()
196                    .await
197                    .map(|data| data.into_bytes())
198                    .with_context(|| format!("failed to download manifest for extension {extension_id} version {published_version}"))?
199                    .to_vec();
200                let manifest = serde_json::from_slice::<ExtensionManifest>(&manifest_bytes)
201                    .with_context(|| format!("invalid manifest for extension {extension_id} version {published_version}: {}", String::from_utf8_lossy(&manifest_bytes)))?;
202
203                let published_at = object.last_modified.ok_or_else(|| anyhow!("missing last modified timestamp for extension {extension_id} version {published_version}"))?;
204                let published_at =
205                    time::OffsetDateTime::from_unix_timestamp_nanos(published_at.as_nanos())?;
206                let published_at = PrimitiveDateTime::new(published_at.date(), published_at.time());
207
208                let version = semver::Version::parse(&manifest.version).with_context(|| {
209                    format!(
210                        "invalid version for extension {extension_id} version {published_version}"
211                    )
212                })?;
213
214                new_versions
215                    .entry(extension_id)
216                    .or_default()
217                    .push(NewExtensionVersion {
218                        name: manifest.name,
219                        version,
220                        description: manifest.description.unwrap_or_default(),
221                        authors: manifest.authors,
222                        repository: manifest.repository,
223                        published_at,
224                    });
225            }
226        }
227    }
228
229    app_state
230        .db
231        .insert_extension_versions(&new_versions)
232        .await?;
233
234    Ok(())
235}