notifications.rs

  1use super::*;
  2use rpc::{Notification, NotificationEntityKind, NotificationKind};
  3
  4impl Database {
  5    pub async fn ensure_notification_kinds(&self) -> Result<()> {
  6        self.transaction(|tx| async move {
  7            notification_kind::Entity::insert_many(NotificationKind::all().map(|kind| {
  8                notification_kind::ActiveModel {
  9                    id: ActiveValue::Set(kind as i32),
 10                    name: ActiveValue::Set(kind.to_string()),
 11                }
 12            }))
 13            .on_conflict(OnConflict::new().do_nothing().to_owned())
 14            .exec(&*tx)
 15            .await?;
 16            Ok(())
 17        })
 18        .await
 19    }
 20
 21    pub async fn get_notifications(
 22        &self,
 23        recipient_id: UserId,
 24        limit: usize,
 25    ) -> Result<proto::AddNotifications> {
 26        self.transaction(|tx| async move {
 27            let mut result = proto::AddNotifications::default();
 28
 29            let mut rows = notification::Entity::find()
 30                .filter(notification::Column::RecipientId.eq(recipient_id))
 31                .order_by_desc(notification::Column::Id)
 32                .limit(limit as u64)
 33                .stream(&*tx)
 34                .await?;
 35
 36            let mut user_ids = Vec::new();
 37            let mut channel_ids = Vec::new();
 38            let mut message_ids = Vec::new();
 39            while let Some(row) = rows.next().await {
 40                let row = row?;
 41
 42                let Some(kind) = NotificationKind::from_i32(row.kind) else {
 43                    continue;
 44                };
 45                let Some(notification) = Notification::from_parts(
 46                    kind,
 47                    [
 48                        row.entity_id_1.map(|id| id as u64),
 49                        row.entity_id_2.map(|id| id as u64),
 50                        row.entity_id_3.map(|id| id as u64),
 51                    ],
 52                ) else {
 53                    continue;
 54                };
 55
 56                // Gather the ids of all associated entities.
 57                let (_, associated_entities) = notification.to_parts();
 58                for entity in associated_entities {
 59                    let Some((id, kind)) = entity else {
 60                        break;
 61                    };
 62                    match kind {
 63                        NotificationEntityKind::User => &mut user_ids,
 64                        NotificationEntityKind::Channel => &mut channel_ids,
 65                        NotificationEntityKind::ChannelMessage => &mut message_ids,
 66                    }
 67                    .push(id);
 68                }
 69
 70                result.notifications.push(proto::Notification {
 71                    kind: row.kind as u32,
 72                    timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
 73                    is_read: row.is_read,
 74                    entity_id_1: row.entity_id_1.map(|id| id as u64),
 75                    entity_id_2: row.entity_id_2.map(|id| id as u64),
 76                    entity_id_3: row.entity_id_3.map(|id| id as u64),
 77                });
 78            }
 79
 80            let users = user::Entity::find()
 81                .filter(user::Column::Id.is_in(user_ids))
 82                .all(&*tx)
 83                .await?;
 84            let channels = channel::Entity::find()
 85                .filter(user::Column::Id.is_in(channel_ids))
 86                .all(&*tx)
 87                .await?;
 88            let messages = channel_message::Entity::find()
 89                .filter(user::Column::Id.is_in(message_ids))
 90                .all(&*tx)
 91                .await?;
 92
 93            for user in users {
 94                result.users.push(proto::User {
 95                    id: user.id.to_proto(),
 96                    github_login: user.github_login,
 97                    avatar_url: String::new(),
 98                });
 99            }
100            for channel in channels {
101                result.channels.push(proto::Channel {
102                    id: channel.id.to_proto(),
103                    name: channel.name,
104                });
105            }
106            for message in messages {
107                result.messages.push(proto::ChannelMessage {
108                    id: message.id.to_proto(),
109                    body: message.body,
110                    timestamp: message.sent_at.assume_utc().unix_timestamp() as u64,
111                    sender_id: message.sender_id.to_proto(),
112                    nonce: None,
113                });
114            }
115
116            Ok(result)
117        })
118        .await
119    }
120
121    pub async fn create_notification(
122        &self,
123        recipient_id: UserId,
124        notification: Notification,
125        tx: &DatabaseTransaction,
126    ) -> Result<()> {
127        let (kind, associated_entities) = notification.to_parts();
128        notification::ActiveModel {
129            recipient_id: ActiveValue::Set(recipient_id),
130            kind: ActiveValue::Set(kind as i32),
131            entity_id_1: ActiveValue::Set(associated_entities[0].map(|(id, _)| id as i32)),
132            entity_id_2: ActiveValue::Set(associated_entities[1].map(|(id, _)| id as i32)),
133            entity_id_3: ActiveValue::Set(associated_entities[2].map(|(id, _)| id as i32)),
134            ..Default::default()
135        }
136        .save(&*tx)
137        .await?;
138        Ok(())
139    }
140}