1use super::*;
2use time::OffsetDateTime;
3
4impl Database {
5 pub async fn join_channel_chat(
6 &self,
7 channel_id: ChannelId,
8 connection_id: ConnectionId,
9 user_id: UserId,
10 ) -> Result<()> {
11 self.transaction(|tx| async move {
12 self.check_user_is_channel_member(channel_id, user_id, &*tx)
13 .await?;
14 channel_chat_participant::ActiveModel {
15 id: ActiveValue::NotSet,
16 channel_id: ActiveValue::Set(channel_id),
17 user_id: ActiveValue::Set(user_id),
18 connection_id: ActiveValue::Set(connection_id.id as i32),
19 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
20 }
21 .insert(&*tx)
22 .await?;
23 Ok(())
24 })
25 .await
26 }
27
28 pub async fn channel_chat_connection_lost(
29 &self,
30 connection_id: ConnectionId,
31 tx: &DatabaseTransaction,
32 ) -> Result<()> {
33 channel_chat_participant::Entity::delete_many()
34 .filter(
35 Condition::all()
36 .add(
37 channel_chat_participant::Column::ConnectionServerId
38 .eq(connection_id.owner_id),
39 )
40 .add(channel_chat_participant::Column::ConnectionId.eq(connection_id.id)),
41 )
42 .exec(tx)
43 .await?;
44 Ok(())
45 }
46
47 pub async fn leave_channel_chat(
48 &self,
49 channel_id: ChannelId,
50 connection_id: ConnectionId,
51 _user_id: UserId,
52 ) -> Result<()> {
53 self.transaction(|tx| async move {
54 channel_chat_participant::Entity::delete_many()
55 .filter(
56 Condition::all()
57 .add(
58 channel_chat_participant::Column::ConnectionServerId
59 .eq(connection_id.owner_id),
60 )
61 .add(channel_chat_participant::Column::ConnectionId.eq(connection_id.id))
62 .add(channel_chat_participant::Column::ChannelId.eq(channel_id)),
63 )
64 .exec(&*tx)
65 .await?;
66
67 Ok(())
68 })
69 .await
70 }
71
72 pub async fn get_channel_messages(
73 &self,
74 channel_id: ChannelId,
75 user_id: UserId,
76 count: usize,
77 before_message_id: Option<MessageId>,
78 ) -> Result<Vec<proto::ChannelMessage>> {
79 self.transaction(|tx| async move {
80 self.check_user_is_channel_member(channel_id, user_id, &*tx)
81 .await?;
82
83 let mut condition =
84 Condition::all().add(channel_message::Column::ChannelId.eq(channel_id));
85
86 if let Some(before_message_id) = before_message_id {
87 condition = condition.add(channel_message::Column::Id.lt(before_message_id));
88 }
89
90 let mut rows = channel_message::Entity::find()
91 .filter(condition)
92 .limit(count as u64)
93 .stream(&*tx)
94 .await?;
95
96 let mut messages = Vec::new();
97 while let Some(row) = rows.next().await {
98 let row = row?;
99 let nonce = row.nonce.as_u64_pair();
100 messages.push(proto::ChannelMessage {
101 id: row.id.to_proto(),
102 sender_id: row.sender_id.to_proto(),
103 body: row.body,
104 timestamp: row.sent_at.assume_utc().unix_timestamp() as u64,
105 nonce: Some(proto::Nonce {
106 upper_half: nonce.0,
107 lower_half: nonce.1,
108 }),
109 });
110 }
111
112 Ok(messages)
113 })
114 .await
115 }
116
117 pub async fn create_channel_message(
118 &self,
119 channel_id: ChannelId,
120 user_id: UserId,
121 body: &str,
122 timestamp: OffsetDateTime,
123 nonce: u128,
124 ) -> Result<(MessageId, Vec<ConnectionId>)> {
125 self.transaction(|tx| async move {
126 let mut rows = channel_chat_participant::Entity::find()
127 .filter(channel_chat_participant::Column::ChannelId.eq(channel_id))
128 .stream(&*tx)
129 .await?;
130
131 let mut is_participant = false;
132 let mut participant_connection_ids = Vec::new();
133 while let Some(row) = rows.next().await {
134 let row = row?;
135 if row.user_id == user_id {
136 is_participant = true;
137 }
138 participant_connection_ids.push(row.connection());
139 }
140 drop(rows);
141
142 if !is_participant {
143 Err(anyhow!("not a chat participant"))?;
144 }
145
146 let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
147 let timestamp = time::PrimitiveDateTime::new(timestamp.date(), timestamp.time());
148
149 let message = channel_message::Entity::insert(channel_message::ActiveModel {
150 channel_id: ActiveValue::Set(channel_id),
151 sender_id: ActiveValue::Set(user_id),
152 body: ActiveValue::Set(body.to_string()),
153 sent_at: ActiveValue::Set(timestamp),
154 nonce: ActiveValue::Set(Uuid::from_u128(nonce)),
155 id: ActiveValue::NotSet,
156 })
157 .on_conflict(
158 OnConflict::column(channel_message::Column::Nonce)
159 .update_column(channel_message::Column::Nonce)
160 .to_owned(),
161 )
162 .exec(&*tx)
163 .await?;
164
165 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
166 enum QueryConnectionId {
167 ConnectionId,
168 }
169
170 Ok((message.last_insert_id, participant_connection_ids))
171 })
172 .await
173 }
174
175 pub async fn remove_channel_message(
176 &self,
177 channel_id: ChannelId,
178 message_id: MessageId,
179 user_id: UserId,
180 ) -> Result<Vec<ConnectionId>> {
181 self.transaction(|tx| async move {
182 let mut rows = channel_chat_participant::Entity::find()
183 .filter(channel_chat_participant::Column::ChannelId.eq(channel_id))
184 .stream(&*tx)
185 .await?;
186
187 let mut is_participant = false;
188 let mut participant_connection_ids = Vec::new();
189 while let Some(row) = rows.next().await {
190 let row = row?;
191 if row.user_id == user_id {
192 is_participant = true;
193 }
194 participant_connection_ids.push(row.connection());
195 }
196 drop(rows);
197
198 if !is_participant {
199 Err(anyhow!("not a chat participant"))?;
200 }
201
202 let result = channel_message::Entity::delete_by_id(message_id)
203 .filter(channel_message::Column::SenderId.eq(user_id))
204 .exec(&*tx)
205 .await?;
206 if result.rows_affected == 0 {
207 Err(anyhow!("no such message"))?;
208 }
209
210 Ok(participant_connection_ids)
211 })
212 .await
213 }
214}