1use super::*;
 2use time::Duration;
 3use time::OffsetDateTime;
 4
 5impl Database {
 6    pub async fn get_embeddings(
 7        &self,
 8        model: &str,
 9        digests: &[Vec<u8>],
10    ) -> Result<HashMap<Vec<u8>, Vec<f32>>> {
11        self.transaction(|tx| async move {
12            let embeddings = {
13                let mut db_embeddings = embedding::Entity::find()
14                    .filter(
15                        embedding::Column::Model.eq(model).and(
16                            embedding::Column::Digest
17                                .is_in(digests.iter().map(|digest| digest.as_slice())),
18                        ),
19                    )
20                    .stream(&*tx)
21                    .await?;
22
23                let mut embeddings = HashMap::default();
24                while let Some(db_embedding) = db_embeddings.next().await {
25                    let db_embedding = db_embedding?;
26                    embeddings.insert(db_embedding.digest, db_embedding.dimensions);
27                }
28                embeddings
29            };
30
31            if !embeddings.is_empty() {
32                let now = OffsetDateTime::now_utc();
33                let retrieved_at = PrimitiveDateTime::new(now.date(), now.time());
34
35                embedding::Entity::update_many()
36                    .filter(
37                        embedding::Column::Digest
38                            .is_in(embeddings.keys().map(|digest| digest.as_slice())),
39                    )
40                    .col_expr(embedding::Column::RetrievedAt, Expr::value(retrieved_at))
41                    .exec(&*tx)
42                    .await?;
43            }
44
45            Ok(embeddings)
46        })
47        .await
48    }
49
50    pub async fn save_embeddings(
51        &self,
52        model: &str,
53        embeddings: &HashMap<Vec<u8>, Vec<f32>>,
54    ) -> Result<()> {
55        self.transaction(|tx| async move {
56            embedding::Entity::insert_many(embeddings.iter().map(|(digest, dimensions)| {
57                let now_offset_datetime = OffsetDateTime::now_utc();
58                let retrieved_at =
59                    PrimitiveDateTime::new(now_offset_datetime.date(), now_offset_datetime.time());
60
61                embedding::ActiveModel {
62                    model: ActiveValue::set(model.to_string()),
63                    digest: ActiveValue::set(digest.clone()),
64                    dimensions: ActiveValue::set(dimensions.clone()),
65                    retrieved_at: ActiveValue::set(retrieved_at),
66                }
67            }))
68            .on_conflict(
69                OnConflict::columns([embedding::Column::Model, embedding::Column::Digest])
70                    .do_nothing()
71                    .to_owned(),
72            )
73            .exec_without_returning(&*tx)
74            .await?;
75            Ok(())
76        })
77        .await
78    }
79
80    pub async fn purge_old_embeddings(&self) -> Result<()> {
81        self.transaction(|tx| async move {
82            embedding::Entity::delete_many()
83                .filter(
84                    embedding::Column::RetrievedAt
85                        .lte(OffsetDateTime::now_utc() - Duration::days(60)),
86                )
87                .exec(&*tx)
88                .await?;
89
90            Ok(())
91        })
92        .await
93    }
94}