1use super::*;
2use rpc::proto::channel_member::Kind;
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self
23 .create_channel(name, None, creator_id)
24 .await?
25 .channel
26 .id)
27 }
28
29 #[cfg(test)]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .channel
40 .id)
41 }
42
43 /// Creates a new channel.
44 pub async fn create_channel(
45 &self,
46 name: &str,
47 parent_channel_id: Option<ChannelId>,
48 admin_id: UserId,
49 ) -> Result<CreateChannelResult> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53
54 if let Some(parent_channel_id) = parent_channel_id {
55 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
56 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
57 .await?;
58 parent = Some(parent_channel);
59 }
60
61 let channel = channel::ActiveModel {
62 id: ActiveValue::NotSet,
63 name: ActiveValue::Set(name.to_string()),
64 visibility: ActiveValue::Set(ChannelVisibility::Members),
65 parent_path: ActiveValue::Set(
66 parent
67 .as_ref()
68 .map_or(String::new(), |parent| parent.path()),
69 ),
70 }
71 .insert(&*tx)
72 .await?;
73
74 let participants_to_update;
75 if let Some(parent) = &parent {
76 participants_to_update = self
77 .participants_to_notify_for_channel_change(parent, &*tx)
78 .await?;
79 } else {
80 participants_to_update = vec![];
81
82 channel_member::ActiveModel {
83 id: ActiveValue::NotSet,
84 channel_id: ActiveValue::Set(channel.id),
85 user_id: ActiveValue::Set(admin_id),
86 accepted: ActiveValue::Set(true),
87 role: ActiveValue::Set(ChannelRole::Admin),
88 }
89 .insert(&*tx)
90 .await?;
91 };
92
93 Ok(CreateChannelResult {
94 channel: Channel::from_model(channel, ChannelRole::Admin),
95 participants_to_update,
96 })
97 })
98 .await
99 }
100
101 /// Adds a user to the specified channel.
102 pub async fn join_channel(
103 &self,
104 channel_id: ChannelId,
105 user_id: UserId,
106 connection: ConnectionId,
107 environment: &str,
108 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
109 self.transaction(move |tx| async move {
110 let channel = self.get_channel_internal(channel_id, &*tx).await?;
111 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
112
113 let mut accept_invite_result = None;
114
115 if role.is_none() {
116 if let Some(invitation) = self
117 .pending_invite_for_channel(&channel, user_id, &*tx)
118 .await?
119 {
120 // note, this may be a parent channel
121 role = Some(invitation.role);
122 channel_member::Entity::update(channel_member::ActiveModel {
123 accepted: ActiveValue::Set(true),
124 ..invitation.into_active_model()
125 })
126 .exec(&*tx)
127 .await?;
128
129 accept_invite_result = Some(
130 self.calculate_membership_updated(&channel, user_id, &*tx)
131 .await?,
132 );
133
134 debug_assert!(
135 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
136 );
137 } else if channel.visibility == ChannelVisibility::Public {
138 role = Some(ChannelRole::Guest);
139 let channel_to_join = self
140 .public_ancestors_including_self(&channel, &*tx)
141 .await?
142 .first()
143 .cloned()
144 .unwrap_or(channel.clone());
145
146 channel_member::Entity::insert(channel_member::ActiveModel {
147 id: ActiveValue::NotSet,
148 channel_id: ActiveValue::Set(channel_to_join.id),
149 user_id: ActiveValue::Set(user_id),
150 accepted: ActiveValue::Set(true),
151 role: ActiveValue::Set(ChannelRole::Guest),
152 })
153 .exec(&*tx)
154 .await?;
155
156 accept_invite_result = Some(
157 self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
158 .await?,
159 );
160
161 debug_assert!(
162 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
163 );
164 }
165 }
166
167 if role.is_none() || role == Some(ChannelRole::Banned) {
168 Err(anyhow!("not allowed"))?
169 }
170 let role = role.unwrap();
171
172 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
173 let room_id = self
174 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
175 .await?;
176
177 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
178 .await
179 .map(|jr| (jr, accept_invite_result, role))
180 })
181 .await
182 }
183
184 /// Sets the visibiltity of the given channel.
185 pub async fn set_channel_visibility(
186 &self,
187 channel_id: ChannelId,
188 visibility: ChannelVisibility,
189 admin_id: UserId,
190 ) -> Result<SetChannelVisibilityResult> {
191 self.transaction(move |tx| async move {
192 let channel = self.get_channel_internal(channel_id, &*tx).await?;
193
194 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
195 .await?;
196
197 let previous_members = self
198 .get_channel_participant_details_internal(&channel, &*tx)
199 .await?;
200
201 let mut model = channel.into_active_model();
202 model.visibility = ActiveValue::Set(visibility);
203 let channel = model.update(&*tx).await?;
204
205 let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
206 .participants_to_notify_for_channel_change(&channel, &*tx)
207 .await?
208 .into_iter()
209 .collect();
210
211 let mut channels_to_remove: Vec<ChannelId> = vec![];
212 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
213 match visibility {
214 ChannelVisibility::Members => {
215 let all_descendents: Vec<ChannelId> = self
216 .get_channel_descendants_including_self(vec![channel_id], &*tx)
217 .await?
218 .into_iter()
219 .map(|channel| channel.id)
220 .collect();
221
222 channels_to_remove = channel::Entity::find()
223 .filter(
224 channel::Column::Id
225 .is_in(all_descendents)
226 .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
227 )
228 .all(&*tx)
229 .await?
230 .into_iter()
231 .map(|channel| channel.id)
232 .collect();
233
234 channels_to_remove.push(channel_id);
235
236 for member in previous_members {
237 if member.role.can_only_see_public_descendants() {
238 participants_to_remove.insert(member.user_id);
239 }
240 }
241 }
242 ChannelVisibility::Public => {
243 if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
244 let parent_updates = self
245 .participants_to_notify_for_channel_change(&public_parent, &*tx)
246 .await?;
247
248 for (user_id, channels) in parent_updates {
249 participants_to_update.insert(user_id, channels);
250 }
251 }
252 }
253 }
254
255 Ok(SetChannelVisibilityResult {
256 participants_to_update,
257 participants_to_remove,
258 channels_to_remove,
259 })
260 })
261 .await
262 }
263
264 /// Deletes the channel with the specified ID.
265 pub async fn delete_channel(
266 &self,
267 channel_id: ChannelId,
268 user_id: UserId,
269 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
270 self.transaction(move |tx| async move {
271 let channel = self.get_channel_internal(channel_id, &*tx).await?;
272 self.check_user_is_channel_admin(&channel, user_id, &*tx)
273 .await?;
274
275 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
276 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
277 .select_only()
278 .column(channel_member::Column::UserId)
279 .distinct()
280 .into_values::<_, QueryUserIds>()
281 .all(&*tx)
282 .await?;
283
284 let channels_to_remove = self
285 .get_channel_descendants_including_self(vec![channel.id], &*tx)
286 .await?
287 .into_iter()
288 .map(|channel| channel.id)
289 .collect::<Vec<_>>();
290
291 channel::Entity::delete_many()
292 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
293 .exec(&*tx)
294 .await?;
295
296 Ok((channels_to_remove, members_to_notify))
297 })
298 .await
299 }
300
301 /// Invites a user to a channel as a member.
302 pub async fn invite_channel_member(
303 &self,
304 channel_id: ChannelId,
305 invitee_id: UserId,
306 inviter_id: UserId,
307 role: ChannelRole,
308 ) -> Result<InviteMemberResult> {
309 self.transaction(move |tx| async move {
310 let channel = self.get_channel_internal(channel_id, &*tx).await?;
311 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
312 .await?;
313
314 channel_member::ActiveModel {
315 id: ActiveValue::NotSet,
316 channel_id: ActiveValue::Set(channel_id),
317 user_id: ActiveValue::Set(invitee_id),
318 accepted: ActiveValue::Set(false),
319 role: ActiveValue::Set(role),
320 }
321 .insert(&*tx)
322 .await?;
323
324 let channel = Channel::from_model(channel, role);
325
326 let notifications = self
327 .create_notification(
328 invitee_id,
329 rpc::Notification::ChannelInvitation {
330 channel_id: channel_id.to_proto(),
331 channel_name: channel.name.clone(),
332 inviter_id: inviter_id.to_proto(),
333 },
334 true,
335 &*tx,
336 )
337 .await?
338 .into_iter()
339 .collect();
340
341 Ok(InviteMemberResult {
342 channel,
343 notifications,
344 })
345 })
346 .await
347 }
348
349 fn sanitize_channel_name(name: &str) -> Result<&str> {
350 let new_name = name.trim().trim_start_matches('#');
351 if new_name == "" {
352 Err(anyhow!("channel name can't be blank"))?;
353 }
354 Ok(new_name)
355 }
356
357 /// Renames the specified channel.
358 pub async fn rename_channel(
359 &self,
360 channel_id: ChannelId,
361 admin_id: UserId,
362 new_name: &str,
363 ) -> Result<RenameChannelResult> {
364 self.transaction(move |tx| async move {
365 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
366
367 let channel = self.get_channel_internal(channel_id, &*tx).await?;
368 let role = self
369 .check_user_is_channel_admin(&channel, admin_id, &*tx)
370 .await?;
371
372 let mut model = channel.into_active_model();
373 model.name = ActiveValue::Set(new_name.clone());
374 let channel = model.update(&*tx).await?;
375
376 let participants = self
377 .get_channel_participant_details_internal(&channel, &*tx)
378 .await?;
379
380 Ok(RenameChannelResult {
381 channel: Channel::from_model(channel.clone(), role),
382 participants_to_update: participants
383 .iter()
384 .map(|participant| {
385 (
386 participant.user_id,
387 Channel::from_model(channel.clone(), participant.role),
388 )
389 })
390 .collect(),
391 })
392 })
393 .await
394 }
395
396 /// accept or decline an invite to join a channel
397 pub async fn respond_to_channel_invite(
398 &self,
399 channel_id: ChannelId,
400 user_id: UserId,
401 accept: bool,
402 ) -> Result<RespondToChannelInvite> {
403 self.transaction(move |tx| async move {
404 let channel = self.get_channel_internal(channel_id, &*tx).await?;
405
406 let membership_update = if accept {
407 let rows_affected = channel_member::Entity::update_many()
408 .set(channel_member::ActiveModel {
409 accepted: ActiveValue::Set(accept),
410 ..Default::default()
411 })
412 .filter(
413 channel_member::Column::ChannelId
414 .eq(channel_id)
415 .and(channel_member::Column::UserId.eq(user_id))
416 .and(channel_member::Column::Accepted.eq(false)),
417 )
418 .exec(&*tx)
419 .await?
420 .rows_affected;
421
422 if rows_affected == 0 {
423 Err(anyhow!("no such invitation"))?;
424 }
425
426 Some(
427 self.calculate_membership_updated(&channel, user_id, &*tx)
428 .await?,
429 )
430 } else {
431 let rows_affected = channel_member::Entity::delete_many()
432 .filter(
433 channel_member::Column::ChannelId
434 .eq(channel_id)
435 .and(channel_member::Column::UserId.eq(user_id))
436 .and(channel_member::Column::Accepted.eq(false)),
437 )
438 .exec(&*tx)
439 .await?
440 .rows_affected;
441 if rows_affected == 0 {
442 Err(anyhow!("no such invitation"))?;
443 }
444
445 None
446 };
447
448 Ok(RespondToChannelInvite {
449 membership_update,
450 notifications: self
451 .mark_notification_as_read_with_response(
452 user_id,
453 &rpc::Notification::ChannelInvitation {
454 channel_id: channel_id.to_proto(),
455 channel_name: Default::default(),
456 inviter_id: Default::default(),
457 },
458 accept,
459 &*tx,
460 )
461 .await?
462 .into_iter()
463 .collect(),
464 })
465 })
466 .await
467 }
468
469 async fn calculate_membership_updated(
470 &self,
471 channel: &channel::Model,
472 user_id: UserId,
473 tx: &DatabaseTransaction,
474 ) -> Result<MembershipUpdated> {
475 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
476 let removed_channels = self
477 .get_channel_descendants_including_self(vec![channel.id], &*tx)
478 .await?
479 .into_iter()
480 .filter_map(|channel| {
481 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
482 Some(channel.id)
483 } else {
484 None
485 }
486 })
487 .collect::<Vec<_>>();
488
489 Ok(MembershipUpdated {
490 channel_id: channel.id,
491 new_channels,
492 removed_channels,
493 })
494 }
495
496 /// Removes a channel member.
497 pub async fn remove_channel_member(
498 &self,
499 channel_id: ChannelId,
500 member_id: UserId,
501 admin_id: UserId,
502 ) -> Result<RemoveChannelMemberResult> {
503 self.transaction(|tx| async move {
504 let channel = self.get_channel_internal(channel_id, &*tx).await?;
505 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
506 .await?;
507
508 let result = channel_member::Entity::delete_many()
509 .filter(
510 channel_member::Column::ChannelId
511 .eq(channel_id)
512 .and(channel_member::Column::UserId.eq(member_id)),
513 )
514 .exec(&*tx)
515 .await?;
516
517 if result.rows_affected == 0 {
518 Err(anyhow!("no such member"))?;
519 }
520
521 Ok(RemoveChannelMemberResult {
522 membership_update: self
523 .calculate_membership_updated(&channel, member_id, &*tx)
524 .await?,
525 notification_id: self
526 .remove_notification(
527 member_id,
528 rpc::Notification::ChannelInvitation {
529 channel_id: channel_id.to_proto(),
530 channel_name: Default::default(),
531 inviter_id: Default::default(),
532 },
533 &*tx,
534 )
535 .await?,
536 })
537 })
538 .await
539 }
540
541 /// Returns all channel invites for the user with the given ID.
542 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
543 self.transaction(|tx| async move {
544 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
545
546 let channel_invites = channel_member::Entity::find()
547 .filter(
548 channel_member::Column::UserId
549 .eq(user_id)
550 .and(channel_member::Column::Accepted.eq(false)),
551 )
552 .all(&*tx)
553 .await?;
554
555 for invite in channel_invites {
556 role_for_channel.insert(invite.channel_id, invite.role);
557 }
558
559 let channels = channel::Entity::find()
560 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
561 .all(&*tx)
562 .await?;
563
564 let channels = channels
565 .into_iter()
566 .filter_map(|channel| {
567 let role = *role_for_channel.get(&channel.id)?;
568 Some(Channel::from_model(channel, role))
569 })
570 .collect();
571
572 Ok(channels)
573 })
574 .await
575 }
576
577 /// Returns all channels for the user with the given ID.
578 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
579 self.transaction(|tx| async move {
580 let tx = tx;
581
582 self.get_user_channels(user_id, None, &tx).await
583 })
584 .await
585 }
586
587 /// Returns all channels for the user with the given ID that are descendants
588 /// of the specified ancestor channel.
589 pub async fn get_user_channels(
590 &self,
591 user_id: UserId,
592 ancestor_channel: Option<&channel::Model>,
593 tx: &DatabaseTransaction,
594 ) -> Result<ChannelsForUser> {
595 let channel_memberships = channel_member::Entity::find()
596 .filter(
597 channel_member::Column::UserId
598 .eq(user_id)
599 .and(channel_member::Column::Accepted.eq(true)),
600 )
601 .all(&*tx)
602 .await?;
603
604 let descendants = self
605 .get_channel_descendants_including_self(
606 channel_memberships.iter().map(|m| m.channel_id),
607 &*tx,
608 )
609 .await?;
610
611 let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
612 for membership in channel_memberships.iter() {
613 roles_by_channel_id.insert(membership.channel_id, membership.role);
614 }
615
616 let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
617
618 let channels: Vec<Channel> = descendants
619 .into_iter()
620 .filter_map(|channel| {
621 let parent_role = channel
622 .parent_id()
623 .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
624
625 let role = if let Some(parent_role) = parent_role {
626 let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
627 existing_role.max(*parent_role)
628 } else {
629 *parent_role
630 };
631 roles_by_channel_id.insert(channel.id, role);
632 role
633 } else {
634 *roles_by_channel_id.get(&channel.id)?
635 };
636
637 let can_see_parent_paths = role.can_see_all_descendants()
638 || role.can_only_see_public_descendants()
639 && channel.visibility == ChannelVisibility::Public;
640 if !can_see_parent_paths {
641 return None;
642 }
643
644 visible_channel_ids.insert(channel.id);
645
646 if let Some(ancestor) = ancestor_channel {
647 if !channel
648 .ancestors_including_self()
649 .any(|id| id == ancestor.id)
650 {
651 return None;
652 }
653 }
654
655 let mut channel = Channel::from_model(channel, role);
656 channel
657 .parent_path
658 .retain(|id| visible_channel_ids.contains(&id));
659
660 Some(channel)
661 })
662 .collect();
663
664 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
665 enum QueryUserIdsAndChannelIds {
666 ChannelId,
667 UserId,
668 }
669
670 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
671 {
672 let mut rows = room_participant::Entity::find()
673 .inner_join(room::Entity)
674 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
675 .select_only()
676 .column(room::Column::ChannelId)
677 .column(room_participant::Column::UserId)
678 .into_values::<_, QueryUserIdsAndChannelIds>()
679 .stream(&*tx)
680 .await?;
681 while let Some(row) = rows.next().await {
682 let row: (ChannelId, UserId) = row?;
683 channel_participants.entry(row.0).or_default().push(row.1)
684 }
685 }
686
687 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
688 let channel_buffer_changes = self
689 .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
690 .await?;
691
692 let unseen_messages = self
693 .unseen_channel_messages(user_id, &channel_ids, &*tx)
694 .await?;
695
696 Ok(ChannelsForUser {
697 channels,
698 channel_participants,
699 unseen_buffer_changes: channel_buffer_changes,
700 channel_messages: unseen_messages,
701 })
702 }
703
704 async fn participants_to_notify_for_channel_change(
705 &self,
706 new_parent: &channel::Model,
707 tx: &DatabaseTransaction,
708 ) -> Result<Vec<(UserId, ChannelsForUser)>> {
709 let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
710
711 let members = self
712 .get_channel_participant_details_internal(new_parent, &*tx)
713 .await?;
714
715 for member in members.iter() {
716 if !member.role.can_see_all_descendants() {
717 continue;
718 }
719 results.push((
720 member.user_id,
721 self.get_user_channels(member.user_id, Some(new_parent), &*tx)
722 .await?,
723 ))
724 }
725
726 let public_parents = self
727 .public_ancestors_including_self(new_parent, &*tx)
728 .await?;
729 let public_parent = public_parents.last();
730
731 let Some(public_parent) = public_parent else {
732 return Ok(results);
733 };
734
735 // could save some time in the common case by skipping this if the
736 // new channel is not public and has no public descendants.
737 let public_members = if public_parent == new_parent {
738 members
739 } else {
740 self.get_channel_participant_details_internal(public_parent, &*tx)
741 .await?
742 };
743
744 for member in public_members {
745 if !member.role.can_only_see_public_descendants() {
746 continue;
747 };
748 results.push((
749 member.user_id,
750 self.get_user_channels(member.user_id, Some(public_parent), &*tx)
751 .await?,
752 ))
753 }
754
755 Ok(results)
756 }
757
758 /// Sets the role for the specified channel member.
759 pub async fn set_channel_member_role(
760 &self,
761 channel_id: ChannelId,
762 admin_id: UserId,
763 for_user: UserId,
764 role: ChannelRole,
765 ) -> Result<SetMemberRoleResult> {
766 self.transaction(|tx| async move {
767 let channel = self.get_channel_internal(channel_id, &*tx).await?;
768 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
769 .await?;
770
771 let membership = channel_member::Entity::find()
772 .filter(
773 channel_member::Column::ChannelId
774 .eq(channel_id)
775 .and(channel_member::Column::UserId.eq(for_user)),
776 )
777 .one(&*tx)
778 .await?;
779
780 let Some(membership) = membership else {
781 Err(anyhow!("no such member"))?
782 };
783
784 let mut update = membership.into_active_model();
785 update.role = ActiveValue::Set(role);
786 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
787
788 if updated.accepted {
789 Ok(SetMemberRoleResult::MembershipUpdated(
790 self.calculate_membership_updated(&channel, for_user, &*tx)
791 .await?,
792 ))
793 } else {
794 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
795 channel, role,
796 )))
797 }
798 })
799 .await
800 }
801
802 /// Returns the details for the specified channel member.
803 pub async fn get_channel_participant_details(
804 &self,
805 channel_id: ChannelId,
806 user_id: UserId,
807 ) -> Result<Vec<proto::ChannelMember>> {
808 let (role, members) = self
809 .transaction(move |tx| async move {
810 let channel = self.get_channel_internal(channel_id, &*tx).await?;
811 let role = self
812 .check_user_is_channel_participant(&channel, user_id, &*tx)
813 .await?;
814 Ok((
815 role,
816 self.get_channel_participant_details_internal(&channel, &*tx)
817 .await?,
818 ))
819 })
820 .await?;
821
822 if role == ChannelRole::Admin {
823 Ok(members
824 .into_iter()
825 .map(|channel_member| channel_member.to_proto())
826 .collect())
827 } else {
828 return Ok(members
829 .into_iter()
830 .filter_map(|member| {
831 if member.kind == proto::channel_member::Kind::Invitee {
832 return None;
833 }
834 Some(ChannelMember {
835 role: member.role,
836 user_id: member.user_id,
837 kind: proto::channel_member::Kind::Member,
838 })
839 })
840 .map(|channel_member| channel_member.to_proto())
841 .collect());
842 }
843 }
844
845 async fn get_channel_participant_details_internal(
846 &self,
847 channel: &channel::Model,
848 tx: &DatabaseTransaction,
849 ) -> Result<Vec<ChannelMember>> {
850 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
851 enum QueryMemberDetails {
852 UserId,
853 Role,
854 IsDirectMember,
855 Accepted,
856 Visibility,
857 }
858
859 let mut stream = channel_member::Entity::find()
860 .left_join(channel::Entity)
861 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
862 .select_only()
863 .column(channel_member::Column::UserId)
864 .column(channel_member::Column::Role)
865 .column_as(
866 channel_member::Column::ChannelId.eq(channel.id),
867 QueryMemberDetails::IsDirectMember,
868 )
869 .column(channel_member::Column::Accepted)
870 .column(channel::Column::Visibility)
871 .into_values::<_, QueryMemberDetails>()
872 .stream(&*tx)
873 .await?;
874
875 let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
876
877 while let Some(user_membership) = stream.next().await {
878 let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
879 UserId,
880 ChannelRole,
881 bool,
882 bool,
883 ChannelVisibility,
884 ) = user_membership?;
885 let kind = match (is_direct_member, is_invite_accepted) {
886 (true, true) => proto::channel_member::Kind::Member,
887 (true, false) => proto::channel_member::Kind::Invitee,
888 (false, true) => proto::channel_member::Kind::AncestorMember,
889 (false, false) => continue,
890 };
891
892 if channel_role == ChannelRole::Guest
893 && visibility != ChannelVisibility::Public
894 && channel.visibility != ChannelVisibility::Public
895 {
896 continue;
897 }
898
899 if let Some(details_mut) = user_details.get_mut(&user_id) {
900 if channel_role.should_override(details_mut.role) {
901 details_mut.role = channel_role;
902 }
903 if kind == Kind::Member {
904 details_mut.kind = kind;
905 // the UI is going to be a bit confusing if you already have permissions
906 // that are greater than or equal to the ones you're being invited to.
907 } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
908 details_mut.kind = kind;
909 }
910 } else {
911 user_details.insert(
912 user_id,
913 ChannelMember {
914 user_id,
915 kind,
916 role: channel_role,
917 },
918 );
919 }
920 }
921
922 Ok(user_details
923 .into_iter()
924 .map(|(_, details)| details)
925 .collect())
926 }
927
928 /// Returns the participants in the given channel.
929 pub async fn get_channel_participants(
930 &self,
931 channel: &channel::Model,
932 tx: &DatabaseTransaction,
933 ) -> Result<Vec<UserId>> {
934 let participants = self
935 .get_channel_participant_details_internal(channel, &*tx)
936 .await?;
937 Ok(participants
938 .into_iter()
939 .map(|member| member.user_id)
940 .collect())
941 }
942
943 /// Returns whether the given user is an admin in the specified channel.
944 pub async fn check_user_is_channel_admin(
945 &self,
946 channel: &channel::Model,
947 user_id: UserId,
948 tx: &DatabaseTransaction,
949 ) -> Result<ChannelRole> {
950 let role = self.channel_role_for_user(channel, user_id, tx).await?;
951 match role {
952 Some(ChannelRole::Admin) => Ok(role.unwrap()),
953 Some(ChannelRole::Member)
954 | Some(ChannelRole::Banned)
955 | Some(ChannelRole::Guest)
956 | None => Err(anyhow!(
957 "user is not a channel admin or channel does not exist"
958 ))?,
959 }
960 }
961
962 /// Returns whether the given user is a member of the specified channel.
963 pub async fn check_user_is_channel_member(
964 &self,
965 channel: &channel::Model,
966 user_id: UserId,
967 tx: &DatabaseTransaction,
968 ) -> Result<ChannelRole> {
969 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
970 match channel_role {
971 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
972 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
973 "user is not a channel member or channel does not exist"
974 ))?,
975 }
976 }
977
978 /// Returns whether the given user is a participant in the specified channel.
979 pub async fn check_user_is_channel_participant(
980 &self,
981 channel: &channel::Model,
982 user_id: UserId,
983 tx: &DatabaseTransaction,
984 ) -> Result<ChannelRole> {
985 let role = self.channel_role_for_user(channel, user_id, tx).await?;
986 match role {
987 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
988 Ok(role.unwrap())
989 }
990 Some(ChannelRole::Banned) | None => Err(anyhow!(
991 "user is not a channel participant or channel does not exist"
992 ))?,
993 }
994 }
995
996 /// Returns a user's pending invite for the given channel, if one exists.
997 pub async fn pending_invite_for_channel(
998 &self,
999 channel: &channel::Model,
1000 user_id: UserId,
1001 tx: &DatabaseTransaction,
1002 ) -> Result<Option<channel_member::Model>> {
1003 let row = channel_member::Entity::find()
1004 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
1005 .filter(channel_member::Column::UserId.eq(user_id))
1006 .filter(channel_member::Column::Accepted.eq(false))
1007 .one(&*tx)
1008 .await?;
1009
1010 Ok(row)
1011 }
1012
1013 async fn public_parent_channel(
1014 &self,
1015 channel: &channel::Model,
1016 tx: &DatabaseTransaction,
1017 ) -> Result<Option<channel::Model>> {
1018 let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1019 if path.last().unwrap().id == channel.id {
1020 path.pop();
1021 }
1022 Ok(path.pop())
1023 }
1024
1025 pub(crate) async fn public_ancestors_including_self(
1026 &self,
1027 channel: &channel::Model,
1028 tx: &DatabaseTransaction,
1029 ) -> Result<Vec<channel::Model>> {
1030 let visible_channels = channel::Entity::find()
1031 .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1032 .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1033 .order_by_asc(channel::Column::ParentPath)
1034 .all(&*tx)
1035 .await?;
1036
1037 Ok(visible_channels)
1038 }
1039
1040 /// Returns the role for a user in the given channel.
1041 pub async fn channel_role_for_user(
1042 &self,
1043 channel: &channel::Model,
1044 user_id: UserId,
1045 tx: &DatabaseTransaction,
1046 ) -> Result<Option<ChannelRole>> {
1047 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1048 enum QueryChannelMembership {
1049 ChannelId,
1050 Role,
1051 Visibility,
1052 }
1053
1054 let mut rows = channel_member::Entity::find()
1055 .left_join(channel::Entity)
1056 .filter(
1057 channel_member::Column::ChannelId
1058 .is_in(channel.ancestors_including_self())
1059 .and(channel_member::Column::UserId.eq(user_id))
1060 .and(channel_member::Column::Accepted.eq(true)),
1061 )
1062 .select_only()
1063 .column(channel_member::Column::ChannelId)
1064 .column(channel_member::Column::Role)
1065 .column(channel::Column::Visibility)
1066 .into_values::<_, QueryChannelMembership>()
1067 .stream(&*tx)
1068 .await?;
1069
1070 let mut user_role: Option<ChannelRole> = None;
1071
1072 let mut is_participant = false;
1073 let mut current_channel_visibility = None;
1074
1075 // note these channels are not iterated in any particular order,
1076 // our current logic takes the highest permission available.
1077 while let Some(row) = rows.next().await {
1078 let (membership_channel, role, visibility): (
1079 ChannelId,
1080 ChannelRole,
1081 ChannelVisibility,
1082 ) = row?;
1083
1084 match role {
1085 ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1086 if let Some(users_role) = user_role {
1087 user_role = Some(users_role.max(role));
1088 } else {
1089 user_role = Some(role)
1090 }
1091 }
1092 ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1093 is_participant = true
1094 }
1095 ChannelRole::Guest => {}
1096 }
1097 if channel.id == membership_channel {
1098 current_channel_visibility = Some(visibility);
1099 }
1100 }
1101 // free up database connection
1102 drop(rows);
1103
1104 if is_participant && user_role.is_none() {
1105 if current_channel_visibility.is_none() {
1106 current_channel_visibility = channel::Entity::find()
1107 .filter(channel::Column::Id.eq(channel.id))
1108 .one(&*tx)
1109 .await?
1110 .map(|channel| channel.visibility);
1111 }
1112 if current_channel_visibility == Some(ChannelVisibility::Public) {
1113 user_role = Some(ChannelRole::Guest);
1114 }
1115 }
1116
1117 Ok(user_role)
1118 }
1119
1120 // Get the descendants of the given set if channels, ordered by their
1121 // path.
1122 async fn get_channel_descendants_including_self(
1123 &self,
1124 channel_ids: impl IntoIterator<Item = ChannelId>,
1125 tx: &DatabaseTransaction,
1126 ) -> Result<Vec<channel::Model>> {
1127 let mut values = String::new();
1128 for id in channel_ids {
1129 if !values.is_empty() {
1130 values.push_str(", ");
1131 }
1132 write!(&mut values, "({})", id).unwrap();
1133 }
1134
1135 if values.is_empty() {
1136 return Ok(vec![]);
1137 }
1138
1139 let sql = format!(
1140 r#"
1141 SELECT DISTINCT
1142 descendant_channels.*,
1143 descendant_channels.parent_path || descendant_channels.id as full_path
1144 FROM
1145 channels parent_channels, channels descendant_channels
1146 WHERE
1147 descendant_channels.id IN ({values}) OR
1148 (
1149 parent_channels.id IN ({values}) AND
1150 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1151 )
1152 ORDER BY
1153 full_path ASC
1154 "#
1155 );
1156
1157 Ok(channel::Entity::find()
1158 .from_raw_sql(Statement::from_string(
1159 self.pool.get_database_backend(),
1160 sql,
1161 ))
1162 .all(tx)
1163 .await?)
1164 }
1165
1166 /// Returns the channel with the given ID.
1167 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1168 self.transaction(|tx| async move {
1169 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1170 let role = self
1171 .check_user_is_channel_participant(&channel, user_id, &*tx)
1172 .await?;
1173
1174 Ok(Channel::from_model(channel, role))
1175 })
1176 .await
1177 }
1178
1179 pub(crate) async fn get_channel_internal(
1180 &self,
1181 channel_id: ChannelId,
1182 tx: &DatabaseTransaction,
1183 ) -> Result<channel::Model> {
1184 Ok(channel::Entity::find_by_id(channel_id)
1185 .one(&*tx)
1186 .await?
1187 .ok_or_else(|| anyhow!("no such channel"))?)
1188 }
1189
1190 pub(crate) async fn get_or_create_channel_room(
1191 &self,
1192 channel_id: ChannelId,
1193 live_kit_room: &str,
1194 environment: &str,
1195 tx: &DatabaseTransaction,
1196 ) -> Result<RoomId> {
1197 let room = room::Entity::find()
1198 .filter(room::Column::ChannelId.eq(channel_id))
1199 .one(&*tx)
1200 .await?;
1201
1202 let room_id = if let Some(room) = room {
1203 if let Some(env) = room.environment {
1204 if &env != environment {
1205 Err(anyhow!("must join using the {} release", env))?;
1206 }
1207 }
1208 room.id
1209 } else {
1210 let result = room::Entity::insert(room::ActiveModel {
1211 channel_id: ActiveValue::Set(Some(channel_id)),
1212 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1213 environment: ActiveValue::Set(Some(environment.to_string())),
1214 ..Default::default()
1215 })
1216 .exec(&*tx)
1217 .await?;
1218
1219 result.last_insert_id
1220 };
1221
1222 Ok(room_id)
1223 }
1224
1225 /// Move a channel from one parent to another
1226 pub async fn move_channel(
1227 &self,
1228 channel_id: ChannelId,
1229 new_parent_id: Option<ChannelId>,
1230 admin_id: UserId,
1231 ) -> Result<Option<MoveChannelResult>> {
1232 self.transaction(|tx| async move {
1233 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1234 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1235 .await?;
1236
1237 let new_parent_path;
1238 let new_parent_channel;
1239 if let Some(new_parent_id) = new_parent_id {
1240 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1241 self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1242 .await?;
1243
1244 if new_parent
1245 .ancestors_including_self()
1246 .any(|id| id == channel.id)
1247 {
1248 Err(anyhow!("cannot move a channel into one of its descendants"))?;
1249 }
1250
1251 new_parent_path = new_parent.path();
1252 new_parent_channel = Some(new_parent);
1253 } else {
1254 new_parent_path = String::new();
1255 new_parent_channel = None;
1256 };
1257
1258 let previous_participants = self
1259 .get_channel_participant_details_internal(&channel, &*tx)
1260 .await?;
1261
1262 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1263 let new_path = format!("{}{}/", new_parent_path, channel.id);
1264
1265 if old_path == new_path {
1266 return Ok(None);
1267 }
1268
1269 let mut model = channel.into_active_model();
1270 model.parent_path = ActiveValue::Set(new_parent_path);
1271 let channel = model.update(&*tx).await?;
1272
1273 if new_parent_channel.is_none() {
1274 channel_member::ActiveModel {
1275 id: ActiveValue::NotSet,
1276 channel_id: ActiveValue::Set(channel_id),
1277 user_id: ActiveValue::Set(admin_id),
1278 accepted: ActiveValue::Set(true),
1279 role: ActiveValue::Set(ChannelRole::Admin),
1280 }
1281 .insert(&*tx)
1282 .await?;
1283 }
1284
1285 let descendent_ids =
1286 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1287 self.pool.get_database_backend(),
1288 "
1289 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1290 WHERE parent_path LIKE $3 || '%'
1291 RETURNING id
1292 ",
1293 [old_path.clone().into(), new_path.into(), old_path.into()],
1294 ))
1295 .all(&*tx)
1296 .await?;
1297
1298 let participants_to_update: HashMap<_, _> = self
1299 .participants_to_notify_for_channel_change(
1300 new_parent_channel.as_ref().unwrap_or(&channel),
1301 &*tx,
1302 )
1303 .await?
1304 .into_iter()
1305 .collect();
1306
1307 let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1308 for id in descendent_ids {
1309 moved_channels.insert(id);
1310 }
1311 moved_channels.insert(channel_id);
1312
1313 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1314 for participant in previous_participants {
1315 if participant.kind == proto::channel_member::Kind::AncestorMember {
1316 if !participants_to_update.contains_key(&participant.user_id) {
1317 participants_to_remove.insert(participant.user_id);
1318 }
1319 }
1320 }
1321
1322 Ok(Some(MoveChannelResult {
1323 participants_to_remove,
1324 participants_to_update,
1325 moved_channels,
1326 }))
1327 })
1328 .await
1329 }
1330}
1331
1332#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1333enum QueryIds {
1334 Id,
1335}
1336
1337#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1338enum QueryUserIds {
1339 UserId,
1340}