1use super::*;
2use rpc::{
3 ErrorCode, ErrorCodeExt,
4 proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
5};
6use sea_orm::{DbBackend, TryGetableMany};
7
8impl Database {
9 #[cfg(test)]
10 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
11 self.transaction(move |tx| async move {
12 let mut channels = Vec::new();
13 let mut rows = channel::Entity::find().stream(&*tx).await?;
14 while let Some(row) = rows.next().await {
15 let row = row?;
16 channels.push((row.id, row.name));
17 }
18 Ok(channels)
19 })
20 .await
21 }
22
23 #[cfg(test)]
24 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
25 Ok(self.create_channel(name, None, creator_id).await?.0.id)
26 }
27
28 #[cfg(test)]
29 pub async fn create_sub_channel(
30 &self,
31 name: &str,
32 parent: ChannelId,
33 creator_id: UserId,
34 ) -> Result<ChannelId> {
35 Ok(self
36 .create_channel(name, Some(parent), creator_id)
37 .await?
38 .0
39 .id)
40 }
41
42 /// Creates a new channel.
43 pub async fn create_channel(
44 &self,
45 name: &str,
46 parent_channel_id: Option<ChannelId>,
47 admin_id: UserId,
48 ) -> Result<(channel::Model, Option<channel_member::Model>)> {
49 let name = Self::sanitize_channel_name(name)?;
50 self.transaction(move |tx| async move {
51 let mut parent = None;
52 let mut membership = None;
53
54 if let Some(parent_channel_id) = parent_channel_id {
55 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
56 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
57 .await?;
58 parent = Some(parent_channel);
59 }
60
61 let channel = channel::ActiveModel {
62 id: ActiveValue::NotSet,
63 name: ActiveValue::Set(name.to_string()),
64 visibility: ActiveValue::Set(ChannelVisibility::Members),
65 parent_path: ActiveValue::Set(
66 parent
67 .as_ref()
68 .map_or(String::new(), |parent| parent.path()),
69 ),
70 requires_zed_cla: ActiveValue::NotSet,
71 }
72 .insert(&*tx)
73 .await?;
74
75 if parent.is_none() {
76 membership = Some(
77 channel_member::ActiveModel {
78 id: ActiveValue::NotSet,
79 channel_id: ActiveValue::Set(channel.id),
80 user_id: ActiveValue::Set(admin_id),
81 accepted: ActiveValue::Set(true),
82 role: ActiveValue::Set(ChannelRole::Admin),
83 }
84 .insert(&*tx)
85 .await?,
86 );
87 }
88
89 Ok((channel, membership))
90 })
91 .await
92 }
93
94 /// Adds a user to the specified channel.
95 pub async fn join_channel(
96 &self,
97 channel_id: ChannelId,
98 user_id: UserId,
99 connection: ConnectionId,
100 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
101 self.transaction(move |tx| async move {
102 let channel = self.get_channel_internal(channel_id, &tx).await?;
103 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
104
105 let mut accept_invite_result = None;
106
107 if role.is_none() {
108 if let Some(invitation) = self
109 .pending_invite_for_channel(&channel, user_id, &tx)
110 .await?
111 {
112 // note, this may be a parent channel
113 role = Some(invitation.role);
114 channel_member::Entity::update(channel_member::ActiveModel {
115 accepted: ActiveValue::Set(true),
116 ..invitation.into_active_model()
117 })
118 .exec(&*tx)
119 .await?;
120
121 accept_invite_result = Some(
122 self.calculate_membership_updated(&channel, user_id, &tx)
123 .await?,
124 );
125
126 debug_assert!(
127 self.channel_role_for_user(&channel, user_id, &tx).await? == role
128 );
129 } else if channel.visibility == ChannelVisibility::Public {
130 role = Some(ChannelRole::Guest);
131 channel_member::Entity::insert(channel_member::ActiveModel {
132 id: ActiveValue::NotSet,
133 channel_id: ActiveValue::Set(channel.root_id()),
134 user_id: ActiveValue::Set(user_id),
135 accepted: ActiveValue::Set(true),
136 role: ActiveValue::Set(ChannelRole::Guest),
137 })
138 .exec(&*tx)
139 .await?;
140
141 accept_invite_result = Some(
142 self.calculate_membership_updated(&channel, user_id, &tx)
143 .await?,
144 );
145
146 debug_assert!(
147 self.channel_role_for_user(&channel, user_id, &tx).await? == role
148 );
149 }
150 }
151
152 if role.is_none() || role == Some(ChannelRole::Banned) {
153 Err(ErrorCode::Forbidden.anyhow())?
154 }
155 let role = role.unwrap();
156
157 let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
158 let room_id = self
159 .get_or_create_channel_room(channel_id, &livekit_room, &tx)
160 .await?;
161
162 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
163 .await
164 .map(|jr| (jr, accept_invite_result, role))
165 })
166 .await
167 }
168
169 /// Sets the visibility of the given channel.
170 pub async fn set_channel_visibility(
171 &self,
172 channel_id: ChannelId,
173 visibility: ChannelVisibility,
174 admin_id: UserId,
175 ) -> Result<channel::Model> {
176 self.transaction(move |tx| async move {
177 let channel = self.get_channel_internal(channel_id, &tx).await?;
178 self.check_user_is_channel_admin(&channel, admin_id, &tx)
179 .await?;
180
181 if visibility == ChannelVisibility::Public {
182 if let Some(parent_id) = channel.parent_id() {
183 let parent = self.get_channel_internal(parent_id, &tx).await?;
184
185 if parent.visibility != ChannelVisibility::Public {
186 Err(ErrorCode::BadPublicNesting
187 .with_tag("direction", "parent")
188 .anyhow())?;
189 }
190 }
191 } else if visibility == ChannelVisibility::Members
192 && self
193 .get_channel_descendants_excluding_self([&channel], &tx)
194 .await?
195 .into_iter()
196 .any(|channel| channel.visibility == ChannelVisibility::Public)
197 {
198 Err(ErrorCode::BadPublicNesting
199 .with_tag("direction", "children")
200 .anyhow())?;
201 }
202
203 let mut model = channel.into_active_model();
204 model.visibility = ActiveValue::Set(visibility);
205 let channel = model.update(&*tx).await?;
206
207 Ok(channel)
208 })
209 .await
210 }
211
212 #[cfg(test)]
213 pub async fn set_channel_requires_zed_cla(
214 &self,
215 channel_id: ChannelId,
216 requires_zed_cla: bool,
217 ) -> Result<()> {
218 self.transaction(move |tx| async move {
219 let channel = self.get_channel_internal(channel_id, &tx).await?;
220 let mut model = channel.into_active_model();
221 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
222 model.update(&*tx).await?;
223 Ok(())
224 })
225 .await
226 }
227
228 /// Deletes the channel with the specified ID.
229 pub async fn delete_channel(
230 &self,
231 channel_id: ChannelId,
232 user_id: UserId,
233 ) -> Result<(ChannelId, Vec<ChannelId>)> {
234 self.transaction(move |tx| async move {
235 let channel = self.get_channel_internal(channel_id, &tx).await?;
236 self.check_user_is_channel_admin(&channel, user_id, &tx)
237 .await?;
238
239 let channels_to_remove = self
240 .get_channel_descendants_excluding_self([&channel], &tx)
241 .await?
242 .into_iter()
243 .map(|channel| channel.id)
244 .chain(Some(channel_id))
245 .collect::<Vec<_>>();
246
247 channel::Entity::delete_many()
248 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
249 .exec(&*tx)
250 .await?;
251
252 Ok((channel.root_id(), channels_to_remove))
253 })
254 .await
255 }
256
257 /// Invites a user to a channel as a member.
258 pub async fn invite_channel_member(
259 &self,
260 channel_id: ChannelId,
261 invitee_id: UserId,
262 inviter_id: UserId,
263 role: ChannelRole,
264 ) -> Result<InviteMemberResult> {
265 self.transaction(move |tx| async move {
266 let channel = self.get_channel_internal(channel_id, &tx).await?;
267 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
268 .await?;
269 if !channel.is_root() {
270 Err(ErrorCode::NotARootChannel.anyhow())?
271 }
272
273 channel_member::ActiveModel {
274 id: ActiveValue::NotSet,
275 channel_id: ActiveValue::Set(channel_id),
276 user_id: ActiveValue::Set(invitee_id),
277 accepted: ActiveValue::Set(false),
278 role: ActiveValue::Set(role),
279 }
280 .insert(&*tx)
281 .await?;
282
283 let channel = Channel::from_model(channel);
284
285 let notifications = self
286 .create_notification(
287 invitee_id,
288 rpc::Notification::ChannelInvitation {
289 channel_id: channel_id.to_proto(),
290 channel_name: channel.name.clone(),
291 inviter_id: inviter_id.to_proto(),
292 },
293 true,
294 &tx,
295 )
296 .await?
297 .into_iter()
298 .collect();
299
300 Ok(InviteMemberResult {
301 channel,
302 notifications,
303 })
304 })
305 .await
306 }
307
308 fn sanitize_channel_name(name: &str) -> Result<&str> {
309 let new_name = name.trim().trim_start_matches('#');
310 if new_name.is_empty() {
311 Err(anyhow!("channel name can't be blank"))?;
312 }
313 Ok(new_name)
314 }
315
316 /// Renames the specified channel.
317 pub async fn rename_channel(
318 &self,
319 channel_id: ChannelId,
320 admin_id: UserId,
321 new_name: &str,
322 ) -> Result<channel::Model> {
323 self.transaction(move |tx| async move {
324 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
325
326 let channel = self.get_channel_internal(channel_id, &tx).await?;
327 self.check_user_is_channel_admin(&channel, admin_id, &tx)
328 .await?;
329
330 let mut model = channel.into_active_model();
331 model.name = ActiveValue::Set(new_name.clone());
332 let channel = model.update(&*tx).await?;
333
334 Ok(channel)
335 })
336 .await
337 }
338
339 /// accept or decline an invite to join a channel
340 pub async fn respond_to_channel_invite(
341 &self,
342 channel_id: ChannelId,
343 user_id: UserId,
344 accept: bool,
345 ) -> Result<RespondToChannelInvite> {
346 self.transaction(move |tx| async move {
347 let channel = self.get_channel_internal(channel_id, &tx).await?;
348
349 let membership_update = if accept {
350 let rows_affected = channel_member::Entity::update_many()
351 .set(channel_member::ActiveModel {
352 accepted: ActiveValue::Set(accept),
353 ..Default::default()
354 })
355 .filter(
356 channel_member::Column::ChannelId
357 .eq(channel_id)
358 .and(channel_member::Column::UserId.eq(user_id))
359 .and(channel_member::Column::Accepted.eq(false)),
360 )
361 .exec(&*tx)
362 .await?
363 .rows_affected;
364
365 if rows_affected == 0 {
366 Err(anyhow!("no such invitation"))?;
367 }
368
369 Some(
370 self.calculate_membership_updated(&channel, user_id, &tx)
371 .await?,
372 )
373 } else {
374 let rows_affected = channel_member::Entity::delete_many()
375 .filter(
376 channel_member::Column::ChannelId
377 .eq(channel_id)
378 .and(channel_member::Column::UserId.eq(user_id))
379 .and(channel_member::Column::Accepted.eq(false)),
380 )
381 .exec(&*tx)
382 .await?
383 .rows_affected;
384 if rows_affected == 0 {
385 Err(anyhow!("no such invitation"))?;
386 }
387
388 None
389 };
390
391 Ok(RespondToChannelInvite {
392 membership_update,
393 notifications: self
394 .mark_notification_as_read_with_response(
395 user_id,
396 &rpc::Notification::ChannelInvitation {
397 channel_id: channel_id.to_proto(),
398 channel_name: Default::default(),
399 inviter_id: Default::default(),
400 },
401 accept,
402 &tx,
403 )
404 .await?
405 .into_iter()
406 .collect(),
407 })
408 })
409 .await
410 }
411
412 async fn calculate_membership_updated(
413 &self,
414 channel: &channel::Model,
415 user_id: UserId,
416 tx: &DatabaseTransaction,
417 ) -> Result<MembershipUpdated> {
418 let new_channels = self
419 .get_user_channels(user_id, Some(channel), false, tx)
420 .await?;
421 let removed_channels = self
422 .get_channel_descendants_excluding_self([channel], tx)
423 .await?
424 .into_iter()
425 .map(|channel| channel.id)
426 .chain([channel.id])
427 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
428 .collect::<Vec<_>>();
429
430 Ok(MembershipUpdated {
431 channel_id: channel.id,
432 new_channels,
433 removed_channels,
434 })
435 }
436
437 /// Removes a channel member.
438 pub async fn remove_channel_member(
439 &self,
440 channel_id: ChannelId,
441 member_id: UserId,
442 admin_id: UserId,
443 ) -> Result<RemoveChannelMemberResult> {
444 self.transaction(|tx| async move {
445 let channel = self.get_channel_internal(channel_id, &tx).await?;
446
447 if member_id != admin_id {
448 self.check_user_is_channel_admin(&channel, admin_id, &tx)
449 .await?;
450 }
451
452 let result = channel_member::Entity::delete_many()
453 .filter(
454 channel_member::Column::ChannelId
455 .eq(channel_id)
456 .and(channel_member::Column::UserId.eq(member_id)),
457 )
458 .exec(&*tx)
459 .await?;
460
461 if result.rows_affected == 0 {
462 Err(anyhow!("no such member"))?;
463 }
464
465 Ok(RemoveChannelMemberResult {
466 membership_update: self
467 .calculate_membership_updated(&channel, member_id, &tx)
468 .await?,
469 notification_id: self
470 .remove_notification(
471 member_id,
472 rpc::Notification::ChannelInvitation {
473 channel_id: channel_id.to_proto(),
474 channel_name: Default::default(),
475 inviter_id: Default::default(),
476 },
477 &tx,
478 )
479 .await?,
480 })
481 })
482 .await
483 }
484
485 /// Returns all channels for the user with the given ID.
486 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
487 self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
488 .await
489 }
490
491 /// Returns all channels for the user with the given ID that are descendants
492 /// of the specified ancestor channel.
493 pub async fn get_user_channels(
494 &self,
495 user_id: UserId,
496 ancestor_channel: Option<&channel::Model>,
497 include_invites: bool,
498 tx: &DatabaseTransaction,
499 ) -> Result<ChannelsForUser> {
500 let mut filter = channel_member::Column::UserId.eq(user_id);
501 if !include_invites {
502 filter = filter.and(channel_member::Column::Accepted.eq(true))
503 }
504 if let Some(ancestor) = ancestor_channel {
505 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
506 }
507
508 let mut channels = Vec::<channel::Model>::new();
509 let mut invited_channels = Vec::<Channel>::new();
510 let mut channel_memberships = Vec::<channel_member::Model>::new();
511 let mut rows = channel_member::Entity::find()
512 .filter(filter)
513 .inner_join(channel::Entity)
514 .select_also(channel::Entity)
515 .stream(tx)
516 .await?;
517 while let Some(row) = rows.next().await {
518 if let (membership, Some(channel)) = row? {
519 if membership.accepted {
520 channel_memberships.push(membership);
521 channels.push(channel);
522 } else {
523 invited_channels.push(Channel::from_model(channel));
524 }
525 }
526 }
527 drop(rows);
528
529 let mut descendants = self
530 .get_channel_descendants_excluding_self(channels.iter(), tx)
531 .await?;
532
533 for channel in channels {
534 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
535 descendants.insert(ix, channel);
536 }
537 }
538
539 let roles_by_channel_id = channel_memberships
540 .iter()
541 .map(|membership| (membership.channel_id, membership.role))
542 .collect::<HashMap<_, _>>();
543
544 let channels: Vec<Channel> = descendants
545 .into_iter()
546 .filter_map(|channel| {
547 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
548 if parent_role.can_see_channel(channel.visibility) {
549 Some(Channel::from_model(channel))
550 } else {
551 None
552 }
553 })
554 .collect();
555
556 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
557 enum QueryUserIdsAndChannelIds {
558 ChannelId,
559 UserId,
560 }
561
562 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
563 {
564 let mut rows = room_participant::Entity::find()
565 .inner_join(room::Entity)
566 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
567 .select_only()
568 .column(room::Column::ChannelId)
569 .column(room_participant::Column::UserId)
570 .into_values::<_, QueryUserIdsAndChannelIds>()
571 .stream(tx)
572 .await?;
573 while let Some(row) = rows.next().await {
574 let row: (ChannelId, UserId) = row?;
575 channel_participants.entry(row.0).or_default().push(row.1)
576 }
577 }
578
579 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
580
581 let mut channel_ids_by_buffer_id = HashMap::default();
582 let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
583 let mut rows = buffer::Entity::find()
584 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
585 .stream(tx)
586 .await?;
587 while let Some(row) = rows.next().await {
588 let row = row?;
589 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
590 latest_buffer_versions.push(ChannelBufferVersion {
591 channel_id: row.channel_id.0 as u64,
592 epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
593 version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
594 .latest_operation_lamport_timestamp
595 .zip(row.latest_operation_replica_id)
596 {
597 vec![VectorClockEntry {
598 timestamp: latest_lamport_timestamp as u32,
599 replica_id: latest_replica_id as u32,
600 }]
601 } else {
602 vec![]
603 },
604 });
605 }
606 drop(rows);
607
608 let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
609
610 let observed_buffer_versions = self
611 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
612 .await?;
613
614 let observed_channel_messages = self
615 .observed_channel_messages(&channel_ids, user_id, tx)
616 .await?;
617
618 Ok(ChannelsForUser {
619 channel_memberships,
620 channels,
621 invited_channels,
622 channel_participants,
623 latest_buffer_versions,
624 latest_channel_messages,
625 observed_buffer_versions,
626 observed_channel_messages,
627 })
628 }
629
630 /// Sets the role for the specified channel member.
631 pub async fn set_channel_member_role(
632 &self,
633 channel_id: ChannelId,
634 admin_id: UserId,
635 for_user: UserId,
636 role: ChannelRole,
637 ) -> Result<SetMemberRoleResult> {
638 self.transaction(|tx| async move {
639 let channel = self.get_channel_internal(channel_id, &tx).await?;
640 self.check_user_is_channel_admin(&channel, admin_id, &tx)
641 .await?;
642
643 let membership = channel_member::Entity::find()
644 .filter(
645 channel_member::Column::ChannelId
646 .eq(channel_id)
647 .and(channel_member::Column::UserId.eq(for_user)),
648 )
649 .one(&*tx)
650 .await?;
651
652 let Some(membership) = membership else {
653 Err(anyhow!("no such member"))?
654 };
655
656 let mut update = membership.into_active_model();
657 update.role = ActiveValue::Set(role);
658 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
659
660 if updated.accepted {
661 Ok(SetMemberRoleResult::MembershipUpdated(
662 self.calculate_membership_updated(&channel, for_user, &tx)
663 .await?,
664 ))
665 } else {
666 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
667 channel,
668 )))
669 }
670 })
671 .await
672 }
673
674 /// Returns the details for the specified channel member.
675 pub async fn get_channel_participant_details(
676 &self,
677 channel_id: ChannelId,
678 filter: &str,
679 limit: u64,
680 user_id: UserId,
681 ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
682 let members = self
683 .transaction(move |tx| async move {
684 let channel = self.get_channel_internal(channel_id, &tx).await?;
685 self.check_user_is_channel_participant(&channel, user_id, &tx)
686 .await?;
687 let mut query = channel_member::Entity::find()
688 .find_also_related(user::Entity)
689 .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
690
691 if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
692 query = query.filter(Expr::cust_with_values(
693 "UPPER(github_login) LIKE ?",
694 [Self::fuzzy_like_string(&filter.to_uppercase())],
695 ))
696 } else {
697 query = query.filter(Expr::cust_with_values(
698 "github_login ILIKE $1",
699 [Self::fuzzy_like_string(filter)],
700 ))
701 }
702 let members = query.order_by(
703 Expr::cust(
704 "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
705 ),
706 sea_orm::Order::Asc,
707 )
708 .limit(limit)
709 .all(&*tx)
710 .await?;
711
712 Ok(members)
713 })
714 .await?;
715
716 let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
717
718 let members = members
719 .into_iter()
720 .map(|(member, user)| {
721 if let Some(user) = user {
722 users.push(proto::User {
723 id: user.id.to_proto(),
724 avatar_url: format!(
725 "https://github.com/{}.png?size=128",
726 user.github_login
727 ),
728 github_login: user.github_login,
729 name: user.name,
730 email: user.email_address,
731 })
732 }
733 proto::ChannelMember {
734 role: member.role.into(),
735 user_id: member.user_id.to_proto(),
736 kind: if member.accepted {
737 Kind::Member
738 } else {
739 Kind::Invitee
740 }
741 .into(),
742 }
743 })
744 .collect();
745
746 Ok((members, users))
747 }
748
749 /// Returns whether the given user is an admin in the specified channel.
750 pub async fn check_user_is_channel_admin(
751 &self,
752 channel: &channel::Model,
753 user_id: UserId,
754 tx: &DatabaseTransaction,
755 ) -> Result<ChannelRole> {
756 let role = self.channel_role_for_user(channel, user_id, tx).await?;
757 match role {
758 Some(ChannelRole::Admin) => Ok(role.unwrap()),
759 Some(ChannelRole::Member)
760 | Some(ChannelRole::Talker)
761 | Some(ChannelRole::Banned)
762 | Some(ChannelRole::Guest)
763 | None => Err(anyhow!(
764 "user is not a channel admin or channel does not exist"
765 ))?,
766 }
767 }
768
769 /// Returns whether the given user is a member of the specified channel.
770 pub async fn check_user_is_channel_member(
771 &self,
772 channel: &channel::Model,
773 user_id: UserId,
774 tx: &DatabaseTransaction,
775 ) -> Result<ChannelRole> {
776 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
777 match channel_role {
778 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
779 Some(ChannelRole::Banned)
780 | Some(ChannelRole::Guest)
781 | Some(ChannelRole::Talker)
782 | None => Err(anyhow!(
783 "user is not a channel member or channel does not exist"
784 ))?,
785 }
786 }
787
788 /// Returns whether the given user is a participant in the specified channel.
789 pub async fn check_user_is_channel_participant(
790 &self,
791 channel: &channel::Model,
792 user_id: UserId,
793 tx: &DatabaseTransaction,
794 ) -> Result<ChannelRole> {
795 let role = self.channel_role_for_user(channel, user_id, tx).await?;
796 match role {
797 Some(ChannelRole::Admin)
798 | Some(ChannelRole::Member)
799 | Some(ChannelRole::Guest)
800 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
801 Some(ChannelRole::Banned) | None => Err(anyhow!(
802 "user is not a channel participant or channel does not exist"
803 ))?,
804 }
805 }
806
807 /// Returns a user's pending invite for the given channel, if one exists.
808 pub async fn pending_invite_for_channel(
809 &self,
810 channel: &channel::Model,
811 user_id: UserId,
812 tx: &DatabaseTransaction,
813 ) -> Result<Option<channel_member::Model>> {
814 let row = channel_member::Entity::find()
815 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
816 .filter(channel_member::Column::UserId.eq(user_id))
817 .filter(channel_member::Column::Accepted.eq(false))
818 .one(tx)
819 .await?;
820
821 Ok(row)
822 }
823
824 /// Returns the role for a user in the given channel.
825 pub async fn channel_role_for_user(
826 &self,
827 channel: &channel::Model,
828 user_id: UserId,
829 tx: &DatabaseTransaction,
830 ) -> Result<Option<ChannelRole>> {
831 let membership = channel_member::Entity::find()
832 .filter(
833 channel_member::Column::ChannelId
834 .eq(channel.root_id())
835 .and(channel_member::Column::UserId.eq(user_id))
836 .and(channel_member::Column::Accepted.eq(true)),
837 )
838 .one(tx)
839 .await?;
840
841 let Some(membership) = membership else {
842 return Ok(None);
843 };
844
845 if !membership.role.can_see_channel(channel.visibility) {
846 return Ok(None);
847 }
848
849 Ok(Some(membership.role))
850 }
851
852 // Get the descendants of the given set if channels, ordered by their
853 // path.
854 pub(crate) async fn get_channel_descendants_excluding_self(
855 &self,
856 channels: impl IntoIterator<Item = &channel::Model>,
857 tx: &DatabaseTransaction,
858 ) -> Result<Vec<channel::Model>> {
859 let mut filter = Condition::any();
860 for channel in channels.into_iter() {
861 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
862 }
863
864 if filter.is_empty() {
865 return Ok(vec![]);
866 }
867
868 Ok(channel::Entity::find()
869 .filter(filter)
870 .order_by_asc(Expr::cust("parent_path || id || '/'"))
871 .all(tx)
872 .await?)
873 }
874
875 /// Returns the channel with the given ID.
876 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
877 self.transaction(|tx| async move {
878 let channel = self.get_channel_internal(channel_id, &tx).await?;
879 self.check_user_is_channel_participant(&channel, user_id, &tx)
880 .await?;
881
882 Ok(Channel::from_model(channel))
883 })
884 .await
885 }
886
887 pub(crate) async fn get_channel_internal(
888 &self,
889 channel_id: ChannelId,
890 tx: &DatabaseTransaction,
891 ) -> Result<channel::Model> {
892 Ok(channel::Entity::find_by_id(channel_id)
893 .one(tx)
894 .await?
895 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
896 }
897
898 pub(crate) async fn get_or_create_channel_room(
899 &self,
900 channel_id: ChannelId,
901 livekit_room: &str,
902 tx: &DatabaseTransaction,
903 ) -> Result<RoomId> {
904 let room = room::Entity::find()
905 .filter(room::Column::ChannelId.eq(channel_id))
906 .one(tx)
907 .await?;
908
909 let room_id = if let Some(room) = room {
910 room.id
911 } else {
912 let result = room::Entity::insert(room::ActiveModel {
913 channel_id: ActiveValue::Set(Some(channel_id)),
914 live_kit_room: ActiveValue::Set(livekit_room.to_string()),
915 ..Default::default()
916 })
917 .exec(tx)
918 .await?;
919
920 result.last_insert_id
921 };
922
923 Ok(room_id)
924 }
925
926 /// Move a channel from one parent to another
927 pub async fn move_channel(
928 &self,
929 channel_id: ChannelId,
930 new_parent_id: ChannelId,
931 admin_id: UserId,
932 ) -> Result<(ChannelId, Vec<Channel>)> {
933 self.transaction(|tx| async move {
934 let channel = self.get_channel_internal(channel_id, &tx).await?;
935 self.check_user_is_channel_admin(&channel, admin_id, &tx)
936 .await?;
937 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
938
939 if new_parent.root_id() != channel.root_id() {
940 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
941 }
942
943 if new_parent
944 .ancestors_including_self()
945 .any(|id| id == channel.id)
946 {
947 Err(anyhow!(ErrorCode::CircularNesting))?;
948 }
949
950 if channel.visibility == ChannelVisibility::Public
951 && new_parent.visibility != ChannelVisibility::Public
952 {
953 Err(anyhow!(ErrorCode::BadPublicNesting))?;
954 }
955
956 let root_id = channel.root_id();
957 let old_path = format!("{}{}/", channel.parent_path, channel.id);
958 let new_path = format!("{}{}/", new_parent.path(), channel.id);
959
960 let mut model = channel.into_active_model();
961 model.parent_path = ActiveValue::Set(new_parent.path());
962 let channel = model.update(&*tx).await?;
963
964 let descendent_ids =
965 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
966 self.pool.get_database_backend(),
967 "
968 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
969 WHERE parent_path LIKE $3 || '%'
970 RETURNING id
971 ",
972 [old_path.clone().into(), new_path.into(), old_path.into()],
973 ))
974 .all(&*tx)
975 .await?;
976
977 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
978
979 let channels = channel::Entity::find()
980 .filter(channel::Column::Id.is_in(all_moved_ids))
981 .all(&*tx)
982 .await?
983 .into_iter()
984 .map(Channel::from_model)
985 .collect::<Vec<_>>();
986
987 Ok((root_id, channels))
988 })
989 .await
990 }
991}
992
993#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
994enum QueryIds {
995 Id,
996}
997
998#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
999enum QueryUserIds {
1000 UserId,
1001}