1use super::*;
2use time::Duration;
3use time::OffsetDateTime;
4
5impl Database {
6 pub async fn get_embeddings(
7 &self,
8 model: &str,
9 digests: &[Vec<u8>],
10 ) -> Result<HashMap<Vec<u8>, Vec<f32>>> {
11 self.transaction(|tx| async move {
12 let embeddings = {
13 let mut db_embeddings = embedding::Entity::find()
14 .filter(
15 embedding::Column::Model.eq(model).and(
16 embedding::Column::Digest
17 .is_in(digests.iter().map(|digest| digest.as_slice())),
18 ),
19 )
20 .stream(&*tx)
21 .await?;
22
23 let mut embeddings = HashMap::default();
24 while let Some(db_embedding) = db_embeddings.next().await {
25 let db_embedding = db_embedding?;
26 embeddings.insert(db_embedding.digest, db_embedding.dimensions);
27 }
28 embeddings
29 };
30
31 if !embeddings.is_empty() {
32 let now = OffsetDateTime::now_utc();
33 let retrieved_at = PrimitiveDateTime::new(now.date(), now.time());
34
35 embedding::Entity::update_many()
36 .filter(
37 embedding::Column::Digest
38 .is_in(embeddings.keys().map(|digest| digest.as_slice())),
39 )
40 .col_expr(embedding::Column::RetrievedAt, Expr::value(retrieved_at))
41 .exec(&*tx)
42 .await?;
43 }
44
45 Ok(embeddings)
46 })
47 .await
48 }
49
50 pub async fn save_embeddings(
51 &self,
52 model: &str,
53 embeddings: &HashMap<Vec<u8>, Vec<f32>>,
54 ) -> Result<()> {
55 self.transaction(|tx| async move {
56 embedding::Entity::insert_many(embeddings.iter().map(|(digest, dimensions)| {
57 let now_offset_datetime = OffsetDateTime::now_utc();
58 let retrieved_at =
59 PrimitiveDateTime::new(now_offset_datetime.date(), now_offset_datetime.time());
60
61 embedding::ActiveModel {
62 model: ActiveValue::set(model.to_string()),
63 digest: ActiveValue::set(digest.clone()),
64 dimensions: ActiveValue::set(dimensions.clone()),
65 retrieved_at: ActiveValue::set(retrieved_at),
66 }
67 }))
68 .on_conflict(
69 OnConflict::columns([embedding::Column::Model, embedding::Column::Digest])
70 .do_nothing()
71 .to_owned(),
72 )
73 .exec_without_returning(&*tx)
74 .await?;
75 Ok(())
76 })
77 .await
78 }
79
80 pub async fn purge_old_embeddings(&self) -> Result<()> {
81 self.transaction(|tx| async move {
82 embedding::Entity::delete_many()
83 .filter(
84 embedding::Column::RetrievedAt
85 .lte(OffsetDateTime::now_utc() - Duration::days(60)),
86 )
87 .exec(&*tx)
88 .await?;
89
90 Ok(())
91 })
92 .await
93 }
94}