1use super::*;
2use rpc::proto::channel_member::Kind;
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self
23 .create_channel(name, None, creator_id)
24 .await?
25 .channel
26 .id)
27 }
28
29 #[cfg(test)]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .channel
40 .id)
41 }
42
43 pub async fn create_channel(
44 &self,
45 name: &str,
46 parent_channel_id: Option<ChannelId>,
47 admin_id: UserId,
48 ) -> Result<CreateChannelResult> {
49 let name = Self::sanitize_channel_name(name)?;
50 self.transaction(move |tx| async move {
51 let mut parent = None;
52
53 if let Some(parent_channel_id) = parent_channel_id {
54 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
55 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
56 .await?;
57 parent = Some(parent_channel);
58 }
59
60 let channel = channel::ActiveModel {
61 id: ActiveValue::NotSet,
62 name: ActiveValue::Set(name.to_string()),
63 visibility: ActiveValue::Set(ChannelVisibility::Members),
64 parent_path: ActiveValue::Set(
65 parent
66 .as_ref()
67 .map_or(String::new(), |parent| parent.path()),
68 ),
69 }
70 .insert(&*tx)
71 .await?;
72
73 let participants_to_update;
74 if let Some(parent) = &parent {
75 participants_to_update = self
76 .participants_to_notify_for_channel_change(parent, &*tx)
77 .await?;
78 } else {
79 participants_to_update = vec![];
80
81 channel_member::ActiveModel {
82 id: ActiveValue::NotSet,
83 channel_id: ActiveValue::Set(channel.id),
84 user_id: ActiveValue::Set(admin_id),
85 accepted: ActiveValue::Set(true),
86 role: ActiveValue::Set(ChannelRole::Admin),
87 }
88 .insert(&*tx)
89 .await?;
90 };
91
92 Ok(CreateChannelResult {
93 channel: Channel::from_model(channel, ChannelRole::Admin),
94 participants_to_update,
95 })
96 })
97 .await
98 }
99
100 pub async fn join_channel(
101 &self,
102 channel_id: ChannelId,
103 user_id: UserId,
104 connection: ConnectionId,
105 environment: &str,
106 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
107 self.transaction(move |tx| async move {
108 let channel = self.get_channel_internal(channel_id, &*tx).await?;
109 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
110
111 let mut accept_invite_result = None;
112
113 if role.is_none() {
114 if let Some(invitation) = self
115 .pending_invite_for_channel(&channel, user_id, &*tx)
116 .await?
117 {
118 // note, this may be a parent channel
119 role = Some(invitation.role);
120 channel_member::Entity::update(channel_member::ActiveModel {
121 accepted: ActiveValue::Set(true),
122 ..invitation.into_active_model()
123 })
124 .exec(&*tx)
125 .await?;
126
127 accept_invite_result = Some(
128 self.calculate_membership_updated(&channel, user_id, &*tx)
129 .await?,
130 );
131
132 debug_assert!(
133 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
134 );
135 } else if channel.visibility == ChannelVisibility::Public {
136 role = Some(ChannelRole::Guest);
137 let channel_to_join = self
138 .public_ancestors_including_self(&channel, &*tx)
139 .await?
140 .first()
141 .cloned()
142 .unwrap_or(channel.clone());
143
144 channel_member::Entity::insert(channel_member::ActiveModel {
145 id: ActiveValue::NotSet,
146 channel_id: ActiveValue::Set(channel_to_join.id),
147 user_id: ActiveValue::Set(user_id),
148 accepted: ActiveValue::Set(true),
149 role: ActiveValue::Set(ChannelRole::Guest),
150 })
151 .exec(&*tx)
152 .await?;
153
154 accept_invite_result = Some(
155 self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
156 .await?,
157 );
158
159 debug_assert!(
160 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
161 );
162 }
163 }
164
165 if role.is_none() || role == Some(ChannelRole::Banned) {
166 Err(anyhow!("not allowed"))?
167 }
168 let role = role.unwrap();
169
170 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
171 let room_id = self
172 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
173 .await?;
174
175 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
176 .await
177 .map(|jr| (jr, accept_invite_result, role))
178 })
179 .await
180 }
181
182 pub async fn set_channel_visibility(
183 &self,
184 channel_id: ChannelId,
185 visibility: ChannelVisibility,
186 admin_id: UserId,
187 ) -> Result<SetChannelVisibilityResult> {
188 self.transaction(move |tx| async move {
189 let channel = self.get_channel_internal(channel_id, &*tx).await?;
190
191 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
192 .await?;
193
194 let previous_members = self
195 .get_channel_participant_details_internal(&channel, &*tx)
196 .await?;
197
198 let mut model = channel.into_active_model();
199 model.visibility = ActiveValue::Set(visibility);
200 let channel = model.update(&*tx).await?;
201
202 let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
203 .participants_to_notify_for_channel_change(&channel, &*tx)
204 .await?
205 .into_iter()
206 .collect();
207
208 let mut channels_to_remove: Vec<ChannelId> = vec![];
209 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
210 match visibility {
211 ChannelVisibility::Members => {
212 let all_descendents: Vec<ChannelId> = self
213 .get_channel_descendants_including_self(vec![channel_id], &*tx)
214 .await?
215 .into_iter()
216 .map(|channel| channel.id)
217 .collect();
218
219 channels_to_remove = channel::Entity::find()
220 .filter(
221 channel::Column::Id
222 .is_in(all_descendents)
223 .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
224 )
225 .all(&*tx)
226 .await?
227 .into_iter()
228 .map(|channel| channel.id)
229 .collect();
230
231 channels_to_remove.push(channel_id);
232
233 for member in previous_members {
234 if member.role.can_only_see_public_descendants() {
235 participants_to_remove.insert(member.user_id);
236 }
237 }
238 }
239 ChannelVisibility::Public => {
240 if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
241 let parent_updates = self
242 .participants_to_notify_for_channel_change(&public_parent, &*tx)
243 .await?;
244
245 for (user_id, channels) in parent_updates {
246 participants_to_update.insert(user_id, channels);
247 }
248 }
249 }
250 }
251
252 Ok(SetChannelVisibilityResult {
253 participants_to_update,
254 participants_to_remove,
255 channels_to_remove,
256 })
257 })
258 .await
259 }
260
261 pub async fn delete_channel(
262 &self,
263 channel_id: ChannelId,
264 user_id: UserId,
265 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
266 self.transaction(move |tx| async move {
267 let channel = self.get_channel_internal(channel_id, &*tx).await?;
268 self.check_user_is_channel_admin(&channel, user_id, &*tx)
269 .await?;
270
271 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
272 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
273 .select_only()
274 .column(channel_member::Column::UserId)
275 .distinct()
276 .into_values::<_, QueryUserIds>()
277 .all(&*tx)
278 .await?;
279
280 let channels_to_remove = self
281 .get_channel_descendants_including_self(vec![channel.id], &*tx)
282 .await?
283 .into_iter()
284 .map(|channel| channel.id)
285 .collect::<Vec<_>>();
286
287 channel::Entity::delete_many()
288 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
289 .exec(&*tx)
290 .await?;
291
292 Ok((channels_to_remove, members_to_notify))
293 })
294 .await
295 }
296
297 pub async fn invite_channel_member(
298 &self,
299 channel_id: ChannelId,
300 invitee_id: UserId,
301 inviter_id: UserId,
302 role: ChannelRole,
303 ) -> Result<InviteMemberResult> {
304 self.transaction(move |tx| async move {
305 let channel = self.get_channel_internal(channel_id, &*tx).await?;
306 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
307 .await?;
308
309 channel_member::ActiveModel {
310 id: ActiveValue::NotSet,
311 channel_id: ActiveValue::Set(channel_id),
312 user_id: ActiveValue::Set(invitee_id),
313 accepted: ActiveValue::Set(false),
314 role: ActiveValue::Set(role),
315 }
316 .insert(&*tx)
317 .await?;
318
319 let channel = Channel::from_model(channel, role);
320
321 let notifications = self
322 .create_notification(
323 invitee_id,
324 rpc::Notification::ChannelInvitation {
325 channel_id: channel_id.to_proto(),
326 channel_name: channel.name.clone(),
327 inviter_id: inviter_id.to_proto(),
328 },
329 true,
330 &*tx,
331 )
332 .await?
333 .into_iter()
334 .collect();
335
336 Ok(InviteMemberResult {
337 channel,
338 notifications,
339 })
340 })
341 .await
342 }
343
344 fn sanitize_channel_name(name: &str) -> Result<&str> {
345 let new_name = name.trim().trim_start_matches('#');
346 if new_name == "" {
347 Err(anyhow!("channel name can't be blank"))?;
348 }
349 Ok(new_name)
350 }
351
352 pub async fn rename_channel(
353 &self,
354 channel_id: ChannelId,
355 admin_id: UserId,
356 new_name: &str,
357 ) -> Result<RenameChannelResult> {
358 self.transaction(move |tx| async move {
359 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
360
361 let channel = self.get_channel_internal(channel_id, &*tx).await?;
362 let role = self
363 .check_user_is_channel_admin(&channel, admin_id, &*tx)
364 .await?;
365
366 let mut model = channel.into_active_model();
367 model.name = ActiveValue::Set(new_name.clone());
368 let channel = model.update(&*tx).await?;
369
370 let participants = self
371 .get_channel_participant_details_internal(&channel, &*tx)
372 .await?;
373
374 Ok(RenameChannelResult {
375 channel: Channel::from_model(channel.clone(), role),
376 participants_to_update: participants
377 .iter()
378 .map(|participant| {
379 (
380 participant.user_id,
381 Channel::from_model(channel.clone(), participant.role),
382 )
383 })
384 .collect(),
385 })
386 })
387 .await
388 }
389
390 pub async fn respond_to_channel_invite(
391 &self,
392 channel_id: ChannelId,
393 user_id: UserId,
394 accept: bool,
395 ) -> Result<RespondToChannelInvite> {
396 self.transaction(move |tx| async move {
397 let channel = self.get_channel_internal(channel_id, &*tx).await?;
398
399 let membership_update = if accept {
400 let rows_affected = channel_member::Entity::update_many()
401 .set(channel_member::ActiveModel {
402 accepted: ActiveValue::Set(accept),
403 ..Default::default()
404 })
405 .filter(
406 channel_member::Column::ChannelId
407 .eq(channel_id)
408 .and(channel_member::Column::UserId.eq(user_id))
409 .and(channel_member::Column::Accepted.eq(false)),
410 )
411 .exec(&*tx)
412 .await?
413 .rows_affected;
414
415 if rows_affected == 0 {
416 Err(anyhow!("no such invitation"))?;
417 }
418
419 Some(
420 self.calculate_membership_updated(&channel, user_id, &*tx)
421 .await?,
422 )
423 } else {
424 let rows_affected = channel_member::Entity::delete_many()
425 .filter(
426 channel_member::Column::ChannelId
427 .eq(channel_id)
428 .and(channel_member::Column::UserId.eq(user_id))
429 .and(channel_member::Column::Accepted.eq(false)),
430 )
431 .exec(&*tx)
432 .await?
433 .rows_affected;
434 if rows_affected == 0 {
435 Err(anyhow!("no such invitation"))?;
436 }
437
438 None
439 };
440
441 Ok(RespondToChannelInvite {
442 membership_update,
443 notifications: self
444 .mark_notification_as_read_with_response(
445 user_id,
446 &rpc::Notification::ChannelInvitation {
447 channel_id: channel_id.to_proto(),
448 channel_name: Default::default(),
449 inviter_id: Default::default(),
450 },
451 accept,
452 &*tx,
453 )
454 .await?
455 .into_iter()
456 .collect(),
457 })
458 })
459 .await
460 }
461
462 async fn calculate_membership_updated(
463 &self,
464 channel: &channel::Model,
465 user_id: UserId,
466 tx: &DatabaseTransaction,
467 ) -> Result<MembershipUpdated> {
468 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
469 let removed_channels = self
470 .get_channel_descendants_including_self(vec![channel.id], &*tx)
471 .await?
472 .into_iter()
473 .filter_map(|channel| {
474 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
475 Some(channel.id)
476 } else {
477 None
478 }
479 })
480 .collect::<Vec<_>>();
481
482 Ok(MembershipUpdated {
483 channel_id: channel.id,
484 new_channels,
485 removed_channels,
486 })
487 }
488
489 pub async fn remove_channel_member(
490 &self,
491 channel_id: ChannelId,
492 member_id: UserId,
493 admin_id: UserId,
494 ) -> Result<RemoveChannelMemberResult> {
495 self.transaction(|tx| async move {
496 let channel = self.get_channel_internal(channel_id, &*tx).await?;
497 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
498 .await?;
499
500 let result = channel_member::Entity::delete_many()
501 .filter(
502 channel_member::Column::ChannelId
503 .eq(channel_id)
504 .and(channel_member::Column::UserId.eq(member_id)),
505 )
506 .exec(&*tx)
507 .await?;
508
509 if result.rows_affected == 0 {
510 Err(anyhow!("no such member"))?;
511 }
512
513 Ok(RemoveChannelMemberResult {
514 membership_update: self
515 .calculate_membership_updated(&channel, member_id, &*tx)
516 .await?,
517 notification_id: self
518 .remove_notification(
519 member_id,
520 rpc::Notification::ChannelInvitation {
521 channel_id: channel_id.to_proto(),
522 channel_name: Default::default(),
523 inviter_id: Default::default(),
524 },
525 &*tx,
526 )
527 .await?,
528 })
529 })
530 .await
531 }
532
533 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
534 self.transaction(|tx| async move {
535 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
536
537 let channel_invites = channel_member::Entity::find()
538 .filter(
539 channel_member::Column::UserId
540 .eq(user_id)
541 .and(channel_member::Column::Accepted.eq(false)),
542 )
543 .all(&*tx)
544 .await?;
545
546 for invite in channel_invites {
547 role_for_channel.insert(invite.channel_id, invite.role);
548 }
549
550 let channels = channel::Entity::find()
551 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
552 .all(&*tx)
553 .await?;
554
555 let channels = channels
556 .into_iter()
557 .filter_map(|channel| {
558 let role = *role_for_channel.get(&channel.id)?;
559 Some(Channel::from_model(channel, role))
560 })
561 .collect();
562
563 Ok(channels)
564 })
565 .await
566 }
567
568 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
569 self.transaction(|tx| async move {
570 let tx = tx;
571
572 self.get_user_channels(user_id, None, &tx).await
573 })
574 .await
575 }
576
577 pub async fn get_user_channels(
578 &self,
579 user_id: UserId,
580 ancestor_channel: Option<&channel::Model>,
581 tx: &DatabaseTransaction,
582 ) -> Result<ChannelsForUser> {
583 let channel_memberships = channel_member::Entity::find()
584 .filter(
585 channel_member::Column::UserId
586 .eq(user_id)
587 .and(channel_member::Column::Accepted.eq(true)),
588 )
589 .all(&*tx)
590 .await?;
591
592 let descendants = self
593 .get_channel_descendants_including_self(
594 channel_memberships.iter().map(|m| m.channel_id),
595 &*tx,
596 )
597 .await?;
598
599 let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
600 for membership in channel_memberships.iter() {
601 roles_by_channel_id.insert(membership.channel_id, membership.role);
602 }
603
604 let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
605
606 let channels: Vec<Channel> = descendants
607 .into_iter()
608 .filter_map(|channel| {
609 let parent_role = channel
610 .parent_id()
611 .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
612
613 let role = if let Some(parent_role) = parent_role {
614 let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
615 existing_role.max(*parent_role)
616 } else {
617 *parent_role
618 };
619 roles_by_channel_id.insert(channel.id, role);
620 role
621 } else {
622 *roles_by_channel_id.get(&channel.id)?
623 };
624
625 let can_see_parent_paths = role.can_see_all_descendants()
626 || role.can_only_see_public_descendants()
627 && channel.visibility == ChannelVisibility::Public;
628 if !can_see_parent_paths {
629 return None;
630 }
631
632 visible_channel_ids.insert(channel.id);
633
634 if let Some(ancestor) = ancestor_channel {
635 if !channel
636 .ancestors_including_self()
637 .any(|id| id == ancestor.id)
638 {
639 return None;
640 }
641 }
642
643 let mut channel = Channel::from_model(channel, role);
644 channel
645 .parent_path
646 .retain(|id| visible_channel_ids.contains(&id));
647
648 Some(channel)
649 })
650 .collect();
651
652 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
653 enum QueryUserIdsAndChannelIds {
654 ChannelId,
655 UserId,
656 }
657
658 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
659 {
660 let mut rows = room_participant::Entity::find()
661 .inner_join(room::Entity)
662 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
663 .select_only()
664 .column(room::Column::ChannelId)
665 .column(room_participant::Column::UserId)
666 .into_values::<_, QueryUserIdsAndChannelIds>()
667 .stream(&*tx)
668 .await?;
669 while let Some(row) = rows.next().await {
670 let row: (ChannelId, UserId) = row?;
671 channel_participants.entry(row.0).or_default().push(row.1)
672 }
673 }
674
675 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
676 let channel_buffer_changes = self
677 .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
678 .await?;
679
680 let unseen_messages = self
681 .unseen_channel_messages(user_id, &channel_ids, &*tx)
682 .await?;
683
684 Ok(ChannelsForUser {
685 channels,
686 channel_participants,
687 unseen_buffer_changes: channel_buffer_changes,
688 channel_messages: unseen_messages,
689 })
690 }
691
692 async fn participants_to_notify_for_channel_change(
693 &self,
694 new_parent: &channel::Model,
695 tx: &DatabaseTransaction,
696 ) -> Result<Vec<(UserId, ChannelsForUser)>> {
697 let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
698
699 let members = self
700 .get_channel_participant_details_internal(new_parent, &*tx)
701 .await?;
702
703 for member in members.iter() {
704 if !member.role.can_see_all_descendants() {
705 continue;
706 }
707 results.push((
708 member.user_id,
709 self.get_user_channels(member.user_id, Some(new_parent), &*tx)
710 .await?,
711 ))
712 }
713
714 let public_parents = self
715 .public_ancestors_including_self(new_parent, &*tx)
716 .await?;
717 let public_parent = public_parents.last();
718
719 let Some(public_parent) = public_parent else {
720 return Ok(results);
721 };
722
723 // could save some time in the common case by skipping this if the
724 // new channel is not public and has no public descendants.
725 let public_members = if public_parent == new_parent {
726 members
727 } else {
728 self.get_channel_participant_details_internal(public_parent, &*tx)
729 .await?
730 };
731
732 for member in public_members {
733 if !member.role.can_only_see_public_descendants() {
734 continue;
735 };
736 results.push((
737 member.user_id,
738 self.get_user_channels(member.user_id, Some(public_parent), &*tx)
739 .await?,
740 ))
741 }
742
743 Ok(results)
744 }
745
746 pub async fn set_channel_member_role(
747 &self,
748 channel_id: ChannelId,
749 admin_id: UserId,
750 for_user: UserId,
751 role: ChannelRole,
752 ) -> Result<SetMemberRoleResult> {
753 self.transaction(|tx| async move {
754 let channel = self.get_channel_internal(channel_id, &*tx).await?;
755 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
756 .await?;
757
758 let membership = channel_member::Entity::find()
759 .filter(
760 channel_member::Column::ChannelId
761 .eq(channel_id)
762 .and(channel_member::Column::UserId.eq(for_user)),
763 )
764 .one(&*tx)
765 .await?;
766
767 let Some(membership) = membership else {
768 Err(anyhow!("no such member"))?
769 };
770
771 let mut update = membership.into_active_model();
772 update.role = ActiveValue::Set(role);
773 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
774
775 if updated.accepted {
776 Ok(SetMemberRoleResult::MembershipUpdated(
777 self.calculate_membership_updated(&channel, for_user, &*tx)
778 .await?,
779 ))
780 } else {
781 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
782 channel, role,
783 )))
784 }
785 })
786 .await
787 }
788
789 pub async fn get_channel_participant_details(
790 &self,
791 channel_id: ChannelId,
792 user_id: UserId,
793 ) -> Result<Vec<proto::ChannelMember>> {
794 let (role, members) = self
795 .transaction(move |tx| async move {
796 let channel = self.get_channel_internal(channel_id, &*tx).await?;
797 let role = self
798 .check_user_is_channel_participant(&channel, user_id, &*tx)
799 .await?;
800 Ok((
801 role,
802 self.get_channel_participant_details_internal(&channel, &*tx)
803 .await?,
804 ))
805 })
806 .await?;
807
808 if role == ChannelRole::Admin {
809 Ok(members
810 .into_iter()
811 .map(|channel_member| channel_member.to_proto())
812 .collect())
813 } else {
814 return Ok(members
815 .into_iter()
816 .filter_map(|member| {
817 if member.kind == proto::channel_member::Kind::Invitee {
818 return None;
819 }
820 Some(ChannelMember {
821 role: member.role,
822 user_id: member.user_id,
823 kind: proto::channel_member::Kind::Member,
824 })
825 })
826 .map(|channel_member| channel_member.to_proto())
827 .collect());
828 }
829 }
830
831 async fn get_channel_participant_details_internal(
832 &self,
833 channel: &channel::Model,
834 tx: &DatabaseTransaction,
835 ) -> Result<Vec<ChannelMember>> {
836 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
837 enum QueryMemberDetails {
838 UserId,
839 Role,
840 IsDirectMember,
841 Accepted,
842 Visibility,
843 }
844
845 let mut stream = channel_member::Entity::find()
846 .left_join(channel::Entity)
847 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
848 .select_only()
849 .column(channel_member::Column::UserId)
850 .column(channel_member::Column::Role)
851 .column_as(
852 channel_member::Column::ChannelId.eq(channel.id),
853 QueryMemberDetails::IsDirectMember,
854 )
855 .column(channel_member::Column::Accepted)
856 .column(channel::Column::Visibility)
857 .into_values::<_, QueryMemberDetails>()
858 .stream(&*tx)
859 .await?;
860
861 let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
862
863 while let Some(user_membership) = stream.next().await {
864 let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
865 UserId,
866 ChannelRole,
867 bool,
868 bool,
869 ChannelVisibility,
870 ) = user_membership?;
871 let kind = match (is_direct_member, is_invite_accepted) {
872 (true, true) => proto::channel_member::Kind::Member,
873 (true, false) => proto::channel_member::Kind::Invitee,
874 (false, true) => proto::channel_member::Kind::AncestorMember,
875 (false, false) => continue,
876 };
877
878 if channel_role == ChannelRole::Guest
879 && visibility != ChannelVisibility::Public
880 && channel.visibility != ChannelVisibility::Public
881 {
882 continue;
883 }
884
885 if let Some(details_mut) = user_details.get_mut(&user_id) {
886 if channel_role.should_override(details_mut.role) {
887 details_mut.role = channel_role;
888 }
889 if kind == Kind::Member {
890 details_mut.kind = kind;
891 // the UI is going to be a bit confusing if you already have permissions
892 // that are greater than or equal to the ones you're being invited to.
893 } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
894 details_mut.kind = kind;
895 }
896 } else {
897 user_details.insert(
898 user_id,
899 ChannelMember {
900 user_id,
901 kind,
902 role: channel_role,
903 },
904 );
905 }
906 }
907
908 Ok(user_details
909 .into_iter()
910 .map(|(_, details)| details)
911 .collect())
912 }
913
914 pub async fn get_channel_participants(
915 &self,
916 channel: &channel::Model,
917 tx: &DatabaseTransaction,
918 ) -> Result<Vec<UserId>> {
919 let participants = self
920 .get_channel_participant_details_internal(channel, &*tx)
921 .await?;
922 Ok(participants
923 .into_iter()
924 .map(|member| member.user_id)
925 .collect())
926 }
927
928 pub async fn check_user_is_channel_admin(
929 &self,
930 channel: &channel::Model,
931 user_id: UserId,
932 tx: &DatabaseTransaction,
933 ) -> Result<ChannelRole> {
934 let role = self.channel_role_for_user(channel, user_id, tx).await?;
935 match role {
936 Some(ChannelRole::Admin) => Ok(role.unwrap()),
937 Some(ChannelRole::Member)
938 | Some(ChannelRole::Banned)
939 | Some(ChannelRole::Guest)
940 | None => Err(anyhow!(
941 "user is not a channel admin or channel does not exist"
942 ))?,
943 }
944 }
945
946 pub async fn check_user_is_channel_member(
947 &self,
948 channel: &channel::Model,
949 user_id: UserId,
950 tx: &DatabaseTransaction,
951 ) -> Result<ChannelRole> {
952 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
953 match channel_role {
954 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
955 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
956 "user is not a channel member or channel does not exist"
957 ))?,
958 }
959 }
960
961 pub async fn check_user_is_channel_participant(
962 &self,
963 channel: &channel::Model,
964 user_id: UserId,
965 tx: &DatabaseTransaction,
966 ) -> Result<ChannelRole> {
967 let role = self.channel_role_for_user(channel, user_id, tx).await?;
968 match role {
969 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
970 Ok(role.unwrap())
971 }
972 Some(ChannelRole::Banned) | None => Err(anyhow!(
973 "user is not a channel participant or channel does not exist"
974 ))?,
975 }
976 }
977
978 pub async fn pending_invite_for_channel(
979 &self,
980 channel: &channel::Model,
981 user_id: UserId,
982 tx: &DatabaseTransaction,
983 ) -> Result<Option<channel_member::Model>> {
984 let row = channel_member::Entity::find()
985 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
986 .filter(channel_member::Column::UserId.eq(user_id))
987 .filter(channel_member::Column::Accepted.eq(false))
988 .one(&*tx)
989 .await?;
990
991 Ok(row)
992 }
993
994 pub async fn public_parent_channel(
995 &self,
996 channel: &channel::Model,
997 tx: &DatabaseTransaction,
998 ) -> Result<Option<channel::Model>> {
999 let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1000 if path.last().unwrap().id == channel.id {
1001 path.pop();
1002 }
1003 Ok(path.pop())
1004 }
1005
1006 pub async fn public_ancestors_including_self(
1007 &self,
1008 channel: &channel::Model,
1009 tx: &DatabaseTransaction,
1010 ) -> Result<Vec<channel::Model>> {
1011 let visible_channels = channel::Entity::find()
1012 .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1013 .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1014 .order_by_asc(channel::Column::ParentPath)
1015 .all(&*tx)
1016 .await?;
1017
1018 Ok(visible_channels)
1019 }
1020
1021 pub async fn channel_role_for_user(
1022 &self,
1023 channel: &channel::Model,
1024 user_id: UserId,
1025 tx: &DatabaseTransaction,
1026 ) -> Result<Option<ChannelRole>> {
1027 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1028 enum QueryChannelMembership {
1029 ChannelId,
1030 Role,
1031 Visibility,
1032 }
1033
1034 let mut rows = channel_member::Entity::find()
1035 .left_join(channel::Entity)
1036 .filter(
1037 channel_member::Column::ChannelId
1038 .is_in(channel.ancestors_including_self())
1039 .and(channel_member::Column::UserId.eq(user_id))
1040 .and(channel_member::Column::Accepted.eq(true)),
1041 )
1042 .select_only()
1043 .column(channel_member::Column::ChannelId)
1044 .column(channel_member::Column::Role)
1045 .column(channel::Column::Visibility)
1046 .into_values::<_, QueryChannelMembership>()
1047 .stream(&*tx)
1048 .await?;
1049
1050 let mut user_role: Option<ChannelRole> = None;
1051
1052 let mut is_participant = false;
1053 let mut current_channel_visibility = None;
1054
1055 // note these channels are not iterated in any particular order,
1056 // our current logic takes the highest permission available.
1057 while let Some(row) = rows.next().await {
1058 let (membership_channel, role, visibility): (
1059 ChannelId,
1060 ChannelRole,
1061 ChannelVisibility,
1062 ) = row?;
1063
1064 match role {
1065 ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1066 if let Some(users_role) = user_role {
1067 user_role = Some(users_role.max(role));
1068 } else {
1069 user_role = Some(role)
1070 }
1071 }
1072 ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1073 is_participant = true
1074 }
1075 ChannelRole::Guest => {}
1076 }
1077 if channel.id == membership_channel {
1078 current_channel_visibility = Some(visibility);
1079 }
1080 }
1081 // free up database connection
1082 drop(rows);
1083
1084 if is_participant && user_role.is_none() {
1085 if current_channel_visibility.is_none() {
1086 current_channel_visibility = channel::Entity::find()
1087 .filter(channel::Column::Id.eq(channel.id))
1088 .one(&*tx)
1089 .await?
1090 .map(|channel| channel.visibility);
1091 }
1092 if current_channel_visibility == Some(ChannelVisibility::Public) {
1093 user_role = Some(ChannelRole::Guest);
1094 }
1095 }
1096
1097 Ok(user_role)
1098 }
1099
1100 // Get the descendants of the given set if channels, ordered by their
1101 // path.
1102 async fn get_channel_descendants_including_self(
1103 &self,
1104 channel_ids: impl IntoIterator<Item = ChannelId>,
1105 tx: &DatabaseTransaction,
1106 ) -> Result<Vec<channel::Model>> {
1107 let mut values = String::new();
1108 for id in channel_ids {
1109 if !values.is_empty() {
1110 values.push_str(", ");
1111 }
1112 write!(&mut values, "({})", id).unwrap();
1113 }
1114
1115 if values.is_empty() {
1116 return Ok(vec![]);
1117 }
1118
1119 let sql = format!(
1120 r#"
1121 SELECT DISTINCT
1122 descendant_channels.*,
1123 descendant_channels.parent_path || descendant_channels.id as full_path
1124 FROM
1125 channels parent_channels, channels descendant_channels
1126 WHERE
1127 descendant_channels.id IN ({values}) OR
1128 (
1129 parent_channels.id IN ({values}) AND
1130 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1131 )
1132 ORDER BY
1133 full_path ASC
1134 "#
1135 );
1136
1137 Ok(channel::Entity::find()
1138 .from_raw_sql(Statement::from_string(
1139 self.pool.get_database_backend(),
1140 sql,
1141 ))
1142 .all(tx)
1143 .await?)
1144 }
1145
1146 /// Returns the channel with the given ID
1147 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1148 self.transaction(|tx| async move {
1149 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1150 let role = self
1151 .check_user_is_channel_participant(&channel, user_id, &*tx)
1152 .await?;
1153
1154 Ok(Channel::from_model(channel, role))
1155 })
1156 .await
1157 }
1158
1159 pub async fn get_channel_internal(
1160 &self,
1161 channel_id: ChannelId,
1162 tx: &DatabaseTransaction,
1163 ) -> Result<channel::Model> {
1164 Ok(channel::Entity::find_by_id(channel_id)
1165 .one(&*tx)
1166 .await?
1167 .ok_or_else(|| anyhow!("no such channel"))?)
1168 }
1169
1170 pub(crate) async fn get_or_create_channel_room(
1171 &self,
1172 channel_id: ChannelId,
1173 live_kit_room: &str,
1174 environment: &str,
1175 tx: &DatabaseTransaction,
1176 ) -> Result<RoomId> {
1177 let room = room::Entity::find()
1178 .filter(room::Column::ChannelId.eq(channel_id))
1179 .one(&*tx)
1180 .await?;
1181
1182 let room_id = if let Some(room) = room {
1183 if let Some(env) = room.environment {
1184 if &env != environment {
1185 Err(anyhow!("must join using the {} release", env))?;
1186 }
1187 }
1188 room.id
1189 } else {
1190 let result = room::Entity::insert(room::ActiveModel {
1191 channel_id: ActiveValue::Set(Some(channel_id)),
1192 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1193 environment: ActiveValue::Set(Some(environment.to_string())),
1194 ..Default::default()
1195 })
1196 .exec(&*tx)
1197 .await?;
1198
1199 result.last_insert_id
1200 };
1201
1202 Ok(room_id)
1203 }
1204
1205 /// Move a channel from one parent to another
1206 pub async fn move_channel(
1207 &self,
1208 channel_id: ChannelId,
1209 new_parent_id: Option<ChannelId>,
1210 admin_id: UserId,
1211 ) -> Result<Option<MoveChannelResult>> {
1212 self.transaction(|tx| async move {
1213 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1214 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1215 .await?;
1216
1217 let new_parent_path;
1218 let new_parent_channel;
1219 if let Some(new_parent_id) = new_parent_id {
1220 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1221 self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1222 .await?;
1223
1224 if new_parent
1225 .ancestors_including_self()
1226 .any(|id| id == channel.id)
1227 {
1228 Err(anyhow!("cannot move a channel into one of its descendants"))?;
1229 }
1230
1231 new_parent_path = new_parent.path();
1232 new_parent_channel = Some(new_parent);
1233 } else {
1234 new_parent_path = String::new();
1235 new_parent_channel = None;
1236 };
1237
1238 let previous_participants = self
1239 .get_channel_participant_details_internal(&channel, &*tx)
1240 .await?;
1241
1242 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1243 let new_path = format!("{}{}/", new_parent_path, channel.id);
1244
1245 if old_path == new_path {
1246 return Ok(None);
1247 }
1248
1249 let mut model = channel.into_active_model();
1250 model.parent_path = ActiveValue::Set(new_parent_path);
1251 let channel = model.update(&*tx).await?;
1252
1253 if new_parent_channel.is_none() {
1254 channel_member::ActiveModel {
1255 id: ActiveValue::NotSet,
1256 channel_id: ActiveValue::Set(channel_id),
1257 user_id: ActiveValue::Set(admin_id),
1258 accepted: ActiveValue::Set(true),
1259 role: ActiveValue::Set(ChannelRole::Admin),
1260 }
1261 .insert(&*tx)
1262 .await?;
1263 }
1264
1265 let descendent_ids =
1266 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1267 self.pool.get_database_backend(),
1268 "
1269 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1270 WHERE parent_path LIKE $3 || '%'
1271 RETURNING id
1272 ",
1273 [old_path.clone().into(), new_path.into(), old_path.into()],
1274 ))
1275 .all(&*tx)
1276 .await?;
1277
1278 let participants_to_update: HashMap<_, _> = self
1279 .participants_to_notify_for_channel_change(
1280 new_parent_channel.as_ref().unwrap_or(&channel),
1281 &*tx,
1282 )
1283 .await?
1284 .into_iter()
1285 .collect();
1286
1287 let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1288 for id in descendent_ids {
1289 moved_channels.insert(id);
1290 }
1291 moved_channels.insert(channel_id);
1292
1293 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1294 for participant in previous_participants {
1295 if participant.kind == proto::channel_member::Kind::AncestorMember {
1296 if !participants_to_update.contains_key(&participant.user_id) {
1297 participants_to_remove.insert(participant.user_id);
1298 }
1299 }
1300 }
1301
1302 Ok(Some(MoveChannelResult {
1303 participants_to_remove,
1304 participants_to_update,
1305 moved_channels,
1306 }))
1307 })
1308 .await
1309 }
1310}
1311
1312#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1313enum QueryIds {
1314 Id,
1315}
1316
1317#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1318enum QueryUserIds {
1319 UserId,
1320}