extensions.rs

  1use chrono::Utc;
  2
  3use super::*;
  4
  5impl Database {
  6    pub async fn get_extensions(
  7        &self,
  8        filter: Option<&str>,
  9        max_schema_version: i32,
 10        limit: usize,
 11    ) -> Result<Vec<ExtensionMetadata>> {
 12        self.transaction(|tx| async move {
 13            let mut condition = Condition::all().add(
 14                extension::Column::LatestVersion
 15                    .into_expr()
 16                    .eq(extension_version::Column::Version.into_expr()),
 17            );
 18            if let Some(filter) = filter {
 19                let fuzzy_name_filter = Self::fuzzy_like_string(filter);
 20                condition = condition.add(Expr::cust_with_expr("name ILIKE $1", fuzzy_name_filter));
 21            }
 22
 23            let extensions = extension::Entity::find()
 24                .inner_join(extension_version::Entity)
 25                .select_also(extension_version::Entity)
 26                .filter(condition)
 27                .filter(extension_version::Column::SchemaVersion.lte(max_schema_version))
 28                .order_by_desc(extension::Column::TotalDownloadCount)
 29                .order_by_asc(extension::Column::Name)
 30                .limit(Some(limit as u64))
 31                .all(&*tx)
 32                .await?;
 33
 34            Ok(extensions
 35                .into_iter()
 36                .filter_map(|(extension, version)| {
 37                    Some(metadata_from_extension_and_version(extension, version?))
 38                })
 39                .collect())
 40        })
 41        .await
 42    }
 43
 44    pub async fn get_extension(&self, extension_id: &str) -> Result<Option<ExtensionMetadata>> {
 45        self.transaction(|tx| async move {
 46            let extension = extension::Entity::find()
 47                .filter(extension::Column::ExternalId.eq(extension_id))
 48                .filter(
 49                    extension::Column::LatestVersion
 50                        .into_expr()
 51                        .eq(extension_version::Column::Version.into_expr()),
 52                )
 53                .inner_join(extension_version::Entity)
 54                .select_also(extension_version::Entity)
 55                .one(&*tx)
 56                .await?;
 57
 58            Ok(extension.and_then(|(extension, version)| {
 59                Some(metadata_from_extension_and_version(extension, version?))
 60            }))
 61        })
 62        .await
 63    }
 64
 65    pub async fn get_extension_version(
 66        &self,
 67        extension_id: &str,
 68        version: &str,
 69    ) -> Result<Option<ExtensionMetadata>> {
 70        self.transaction(|tx| async move {
 71            let extension = extension::Entity::find()
 72                .filter(extension::Column::ExternalId.eq(extension_id))
 73                .filter(extension_version::Column::Version.eq(version))
 74                .inner_join(extension_version::Entity)
 75                .select_also(extension_version::Entity)
 76                .one(&*tx)
 77                .await?;
 78
 79            Ok(extension.and_then(|(extension, version)| {
 80                Some(metadata_from_extension_and_version(extension, version?))
 81            }))
 82        })
 83        .await
 84    }
 85
 86    pub async fn get_known_extension_versions<'a>(&self) -> Result<HashMap<String, Vec<String>>> {
 87        self.transaction(|tx| async move {
 88            let mut extension_external_ids_by_id = HashMap::default();
 89
 90            let mut rows = extension::Entity::find().stream(&*tx).await?;
 91            while let Some(row) = rows.next().await {
 92                let row = row?;
 93                extension_external_ids_by_id.insert(row.id, row.external_id);
 94            }
 95            drop(rows);
 96
 97            let mut known_versions_by_extension_id: HashMap<String, Vec<String>> =
 98                HashMap::default();
 99            let mut rows = extension_version::Entity::find().stream(&*tx).await?;
100            while let Some(row) = rows.next().await {
101                let row = row?;
102
103                let Some(extension_id) = extension_external_ids_by_id.get(&row.extension_id) else {
104                    continue;
105                };
106
107                let versions = known_versions_by_extension_id
108                    .entry(extension_id.clone())
109                    .or_default();
110                if let Err(ix) = versions.binary_search(&row.version) {
111                    versions.insert(ix, row.version);
112                }
113            }
114            drop(rows);
115
116            Ok(known_versions_by_extension_id)
117        })
118        .await
119    }
120
121    pub async fn insert_extension_versions(
122        &self,
123        versions_by_extension_id: &HashMap<&str, Vec<NewExtensionVersion>>,
124    ) -> Result<()> {
125        self.transaction(|tx| async move {
126            for (external_id, versions) in versions_by_extension_id {
127                if versions.is_empty() {
128                    continue;
129                }
130
131                let latest_version = versions
132                    .iter()
133                    .max_by_key(|version| &version.version)
134                    .unwrap();
135
136                let insert = extension::Entity::insert(extension::ActiveModel {
137                    name: ActiveValue::Set(latest_version.name.clone()),
138                    external_id: ActiveValue::Set(external_id.to_string()),
139                    id: ActiveValue::NotSet,
140                    latest_version: ActiveValue::Set(latest_version.version.to_string()),
141                    total_download_count: ActiveValue::NotSet,
142                })
143                .on_conflict(
144                    OnConflict::columns([extension::Column::ExternalId])
145                        .update_column(extension::Column::ExternalId)
146                        .to_owned(),
147                );
148
149                let extension = if tx.support_returning() {
150                    insert.exec_with_returning(&*tx).await?
151                } else {
152                    // Sqlite
153                    insert.exec_without_returning(&*tx).await?;
154                    extension::Entity::find()
155                        .filter(extension::Column::ExternalId.eq(*external_id))
156                        .one(&*tx)
157                        .await?
158                        .ok_or_else(|| anyhow!("failed to insert extension"))?
159                };
160
161                extension_version::Entity::insert_many(versions.iter().map(|version| {
162                    extension_version::ActiveModel {
163                        extension_id: ActiveValue::Set(extension.id),
164                        published_at: ActiveValue::Set(version.published_at),
165                        version: ActiveValue::Set(version.version.to_string()),
166                        authors: ActiveValue::Set(version.authors.join(", ")),
167                        repository: ActiveValue::Set(version.repository.clone()),
168                        description: ActiveValue::Set(version.description.clone()),
169                        schema_version: ActiveValue::Set(version.schema_version),
170                        wasm_api_version: ActiveValue::Set(version.wasm_api_version.clone()),
171                        download_count: ActiveValue::NotSet,
172                    }
173                }))
174                .on_conflict(OnConflict::new().do_nothing().to_owned())
175                .exec_without_returning(&*tx)
176                .await?;
177
178                if let Ok(db_version) = semver::Version::parse(&extension.latest_version) {
179                    if db_version >= latest_version.version {
180                        continue;
181                    }
182                }
183
184                let mut extension = extension.into_active_model();
185                extension.latest_version = ActiveValue::Set(latest_version.version.to_string());
186                extension.name = ActiveValue::set(latest_version.name.clone());
187                extension::Entity::update(extension).exec(&*tx).await?;
188            }
189
190            Ok(())
191        })
192        .await
193    }
194
195    pub async fn record_extension_download(&self, extension: &str, version: &str) -> Result<bool> {
196        self.transaction(|tx| async move {
197            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
198            enum QueryId {
199                Id,
200            }
201
202            let extension_id: Option<ExtensionId> = extension::Entity::find()
203                .filter(extension::Column::ExternalId.eq(extension))
204                .select_only()
205                .column(extension::Column::Id)
206                .into_values::<_, QueryId>()
207                .one(&*tx)
208                .await?;
209            let Some(extension_id) = extension_id else {
210                return Ok(false);
211            };
212
213            extension_version::Entity::update_many()
214                .col_expr(
215                    extension_version::Column::DownloadCount,
216                    extension_version::Column::DownloadCount.into_expr().add(1),
217                )
218                .filter(
219                    extension_version::Column::ExtensionId
220                        .eq(extension_id)
221                        .and(extension_version::Column::Version.eq(version)),
222                )
223                .exec(&*tx)
224                .await?;
225
226            extension::Entity::update_many()
227                .col_expr(
228                    extension::Column::TotalDownloadCount,
229                    extension::Column::TotalDownloadCount.into_expr().add(1),
230                )
231                .filter(extension::Column::Id.eq(extension_id))
232                .exec(&*tx)
233                .await?;
234
235            Ok(true)
236        })
237        .await
238    }
239}
240
241fn metadata_from_extension_and_version(
242    extension: extension::Model,
243    version: extension_version::Model,
244) -> ExtensionMetadata {
245    ExtensionMetadata {
246        id: extension.external_id.into(),
247        manifest: rpc::ExtensionApiManifest {
248            name: extension.name,
249            version: version.version.into(),
250            authors: version
251                .authors
252                .split(',')
253                .map(|author| author.trim().to_string())
254                .collect::<Vec<_>>(),
255            description: Some(version.description),
256            repository: version.repository,
257            schema_version: Some(version.schema_version),
258            wasm_api_version: version.wasm_api_version,
259        },
260
261        published_at: convert_time_to_chrono(version.published_at),
262        download_count: extension.total_download_count as u64,
263    }
264}
265
266pub fn convert_time_to_chrono(time: time::PrimitiveDateTime) -> chrono::DateTime<Utc> {
267    chrono::DateTime::from_naive_utc_and_offset(
268        chrono::NaiveDateTime::from_timestamp_opt(time.assume_utc().unix_timestamp(), 0).unwrap(),
269        Utc,
270    )
271}