messages.rs

  1use super::*;
  2use time::OffsetDateTime;
  3
  4impl Database {
  5    pub async fn join_channel_chat(
  6        &self,
  7        channel_id: ChannelId,
  8        connection_id: ConnectionId,
  9        user_id: UserId,
 10    ) -> Result<()> {
 11        self.transaction(|tx| async move {
 12            self.check_user_is_channel_member(channel_id, user_id, &*tx)
 13                .await?;
 14            channel_chat_participant::ActiveModel {
 15                id: ActiveValue::NotSet,
 16                channel_id: ActiveValue::Set(channel_id),
 17                user_id: ActiveValue::Set(user_id),
 18                connection_id: ActiveValue::Set(connection_id.id as i32),
 19                connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
 20            }
 21            .insert(&*tx)
 22            .await?;
 23            Ok(())
 24        })
 25        .await
 26    }
 27
 28    pub async fn channel_chat_connection_lost(
 29        &self,
 30        connection_id: ConnectionId,
 31        tx: &DatabaseTransaction,
 32    ) -> Result<()> {
 33        channel_chat_participant::Entity::delete_many()
 34            .filter(
 35                Condition::all()
 36                    .add(
 37                        channel_chat_participant::Column::ConnectionServerId
 38                            .eq(connection_id.owner_id),
 39                    )
 40                    .add(channel_chat_participant::Column::ConnectionId.eq(connection_id.id)),
 41            )
 42            .exec(tx)
 43            .await?;
 44        Ok(())
 45    }
 46
 47    pub async fn leave_channel_chat(
 48        &self,
 49        channel_id: ChannelId,
 50        connection_id: ConnectionId,
 51        _user_id: UserId,
 52    ) -> Result<()> {
 53        self.transaction(|tx| async move {
 54            channel_chat_participant::Entity::delete_many()
 55                .filter(
 56                    Condition::all()
 57                        .add(
 58                            channel_chat_participant::Column::ConnectionServerId
 59                                .eq(connection_id.owner_id),
 60                        )
 61                        .add(channel_chat_participant::Column::ConnectionId.eq(connection_id.id))
 62                        .add(channel_chat_participant::Column::ChannelId.eq(channel_id)),
 63                )
 64                .exec(&*tx)
 65                .await?;
 66
 67            Ok(())
 68        })
 69        .await
 70    }
 71
 72    pub async fn get_channel_messages(
 73        &self,
 74        channel_id: ChannelId,
 75        user_id: UserId,
 76        count: usize,
 77        before_message_id: Option<MessageId>,
 78    ) -> Result<Vec<proto::ChannelMessage>> {
 79        self.transaction(|tx| async move {
 80            self.check_user_is_channel_member(channel_id, user_id, &*tx)
 81                .await?;
 82
 83            let mut condition =
 84                Condition::all().add(channel_message::Column::ChannelId.eq(channel_id));
 85
 86            if let Some(before_message_id) = before_message_id {
 87                condition = condition.add(channel_message::Column::Id.lt(before_message_id));
 88            }
 89
 90            let mut rows = channel_message::Entity::find()
 91                .filter(condition)
 92                .limit(count as u64)
 93                .stream(&*tx)
 94                .await?;
 95
 96            let mut messages = Vec::new();
 97            while let Some(row) = rows.next().await {
 98                let row = row?;
 99                let nonce = row.nonce.as_u64_pair();
100                messages.push(proto::ChannelMessage {
101                    id: row.id.to_proto(),
102                    sender_id: row.sender_id.to_proto(),
103                    body: row.body,
104                    timestamp: row.sent_at.assume_utc().unix_timestamp() as u64,
105                    nonce: Some(proto::Nonce {
106                        upper_half: nonce.0,
107                        lower_half: nonce.1,
108                    }),
109                });
110            }
111
112            Ok(messages)
113        })
114        .await
115    }
116
117    pub async fn create_channel_message(
118        &self,
119        channel_id: ChannelId,
120        user_id: UserId,
121        body: &str,
122        timestamp: OffsetDateTime,
123        nonce: u128,
124    ) -> Result<(MessageId, Vec<ConnectionId>)> {
125        self.transaction(|tx| async move {
126            let mut rows = channel_chat_participant::Entity::find()
127                .filter(channel_chat_participant::Column::ChannelId.eq(channel_id))
128                .stream(&*tx)
129                .await?;
130
131            let mut is_participant = false;
132            let mut participant_connection_ids = Vec::new();
133            while let Some(row) = rows.next().await {
134                let row = row?;
135                if row.user_id == user_id {
136                    is_participant = true;
137                }
138                participant_connection_ids.push(row.connection());
139            }
140            drop(rows);
141
142            if !is_participant {
143                Err(anyhow!("not a chat participant"))?;
144            }
145
146            let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
147            let timestamp = time::PrimitiveDateTime::new(timestamp.date(), timestamp.time());
148
149            let message = channel_message::Entity::insert(channel_message::ActiveModel {
150                channel_id: ActiveValue::Set(channel_id),
151                sender_id: ActiveValue::Set(user_id),
152                body: ActiveValue::Set(body.to_string()),
153                sent_at: ActiveValue::Set(timestamp),
154                nonce: ActiveValue::Set(Uuid::from_u128(nonce)),
155                id: ActiveValue::NotSet,
156            })
157            .on_conflict(
158                OnConflict::column(channel_message::Column::Nonce)
159                    .update_column(channel_message::Column::Nonce)
160                    .to_owned(),
161            )
162            .exec(&*tx)
163            .await?;
164
165            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
166            enum QueryConnectionId {
167                ConnectionId,
168            }
169
170            Ok((message.last_insert_id, participant_connection_ids))
171        })
172        .await
173    }
174
175    pub async fn remove_channel_message(
176        &self,
177        channel_id: ChannelId,
178        message_id: MessageId,
179        user_id: UserId,
180    ) -> Result<Vec<ConnectionId>> {
181        self.transaction(|tx| async move {
182            let mut rows = channel_chat_participant::Entity::find()
183                .filter(channel_chat_participant::Column::ChannelId.eq(channel_id))
184                .stream(&*tx)
185                .await?;
186
187            let mut is_participant = false;
188            let mut participant_connection_ids = Vec::new();
189            while let Some(row) = rows.next().await {
190                let row = row?;
191                if row.user_id == user_id {
192                    is_participant = true;
193                }
194                participant_connection_ids.push(row.connection());
195            }
196            drop(rows);
197
198            if !is_participant {
199                Err(anyhow!("not a chat participant"))?;
200            }
201
202            let result = channel_message::Entity::delete_by_id(message_id)
203                .filter(channel_message::Column::SenderId.eq(user_id))
204                .exec(&*tx)
205                .await?;
206            if result.rows_affected == 0 {
207                Err(anyhow!("no such message"))?;
208            }
209
210            Ok(participant_connection_ids)
211        })
212        .await
213    }
214}