1use super::*;
2use rpc::proto::channel_member::Kind;
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self
23 .create_channel(name, None, creator_id)
24 .await?
25 .channel
26 .id)
27 }
28
29 #[cfg(test)]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .channel
40 .id)
41 }
42
43 pub async fn create_channel(
44 &self,
45 name: &str,
46 parent_channel_id: Option<ChannelId>,
47 admin_id: UserId,
48 ) -> Result<CreateChannelResult> {
49 let name = Self::sanitize_channel_name(name)?;
50 self.transaction(move |tx| async move {
51 let mut parent = None;
52
53 if let Some(parent_channel_id) = parent_channel_id {
54 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
55 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
56 .await?;
57 parent = Some(parent_channel);
58 }
59
60 let channel = channel::ActiveModel {
61 id: ActiveValue::NotSet,
62 name: ActiveValue::Set(name.to_string()),
63 visibility: ActiveValue::Set(ChannelVisibility::Members),
64 parent_path: ActiveValue::Set(
65 parent
66 .as_ref()
67 .map_or(String::new(), |parent| parent.path()),
68 ),
69 }
70 .insert(&*tx)
71 .await?;
72
73 let participants_to_update;
74 if let Some(parent) = &parent {
75 participants_to_update = self
76 .participants_to_notify_for_channel_change(parent, &*tx)
77 .await?;
78 } else {
79 participants_to_update = vec![];
80
81 channel_member::ActiveModel {
82 id: ActiveValue::NotSet,
83 channel_id: ActiveValue::Set(channel.id),
84 user_id: ActiveValue::Set(admin_id),
85 accepted: ActiveValue::Set(true),
86 role: ActiveValue::Set(ChannelRole::Admin),
87 }
88 .insert(&*tx)
89 .await?;
90 };
91
92 Ok(CreateChannelResult {
93 channel: Channel::from_model(channel, ChannelRole::Admin),
94 participants_to_update,
95 })
96 })
97 .await
98 }
99
100 pub async fn join_channel(
101 &self,
102 channel_id: ChannelId,
103 user_id: UserId,
104 connection: ConnectionId,
105 environment: &str,
106 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
107 self.transaction(move |tx| async move {
108 let channel = self.get_channel_internal(channel_id, &*tx).await?;
109 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
110
111 let mut accept_invite_result = None;
112
113 if role.is_none() {
114 if let Some(invitation) = self
115 .pending_invite_for_channel(&channel, user_id, &*tx)
116 .await?
117 {
118 // note, this may be a parent channel
119 role = Some(invitation.role);
120 channel_member::Entity::update(channel_member::ActiveModel {
121 accepted: ActiveValue::Set(true),
122 ..invitation.into_active_model()
123 })
124 .exec(&*tx)
125 .await?;
126
127 accept_invite_result = Some(
128 self.calculate_membership_updated(&channel, user_id, &*tx)
129 .await?,
130 );
131
132 debug_assert!(
133 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
134 );
135 }
136 }
137
138 if channel.visibility == ChannelVisibility::Public {
139 role = Some(ChannelRole::Guest);
140 let channel_to_join = self
141 .public_ancestors_including_self(&channel, &*tx)
142 .await?
143 .first()
144 .cloned()
145 .unwrap_or(channel.clone());
146
147 channel_member::Entity::insert(channel_member::ActiveModel {
148 id: ActiveValue::NotSet,
149 channel_id: ActiveValue::Set(channel_to_join.id),
150 user_id: ActiveValue::Set(user_id),
151 accepted: ActiveValue::Set(true),
152 role: ActiveValue::Set(ChannelRole::Guest),
153 })
154 .exec(&*tx)
155 .await?;
156
157 accept_invite_result = Some(
158 self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
159 .await?,
160 );
161
162 debug_assert!(self.channel_role_for_user(&channel, user_id, &*tx).await? == role);
163 }
164
165 if role.is_none() || role == Some(ChannelRole::Banned) {
166 Err(anyhow!("not allowed"))?
167 }
168
169 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
170 let room_id = self
171 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
172 .await?;
173
174 self.join_channel_room_internal(room_id, user_id, connection, &*tx)
175 .await
176 .map(|jr| (jr, accept_invite_result, role.unwrap()))
177 })
178 .await
179 }
180
181 pub async fn set_channel_visibility(
182 &self,
183 channel_id: ChannelId,
184 visibility: ChannelVisibility,
185 admin_id: UserId,
186 ) -> Result<SetChannelVisibilityResult> {
187 self.transaction(move |tx| async move {
188 let channel = self.get_channel_internal(channel_id, &*tx).await?;
189
190 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
191 .await?;
192
193 let previous_members = self
194 .get_channel_participant_details_internal(&channel, &*tx)
195 .await?;
196
197 let mut model = channel.into_active_model();
198 model.visibility = ActiveValue::Set(visibility);
199 let channel = model.update(&*tx).await?;
200
201 let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
202 .participants_to_notify_for_channel_change(&channel, &*tx)
203 .await?
204 .into_iter()
205 .collect();
206
207 let mut channels_to_remove: Vec<ChannelId> = vec![];
208 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
209 match visibility {
210 ChannelVisibility::Members => {
211 let all_descendents: Vec<ChannelId> = self
212 .get_channel_descendants_including_self(vec![channel_id], &*tx)
213 .await?
214 .into_iter()
215 .map(|channel| channel.id)
216 .collect();
217
218 channels_to_remove = channel::Entity::find()
219 .filter(
220 channel::Column::Id
221 .is_in(all_descendents)
222 .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
223 )
224 .all(&*tx)
225 .await?
226 .into_iter()
227 .map(|channel| channel.id)
228 .collect();
229
230 channels_to_remove.push(channel_id);
231
232 for member in previous_members {
233 if member.role.can_only_see_public_descendants() {
234 participants_to_remove.insert(member.user_id);
235 }
236 }
237 }
238 ChannelVisibility::Public => {
239 if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
240 let parent_updates = self
241 .participants_to_notify_for_channel_change(&public_parent, &*tx)
242 .await?;
243
244 for (user_id, channels) in parent_updates {
245 participants_to_update.insert(user_id, channels);
246 }
247 }
248 }
249 }
250
251 Ok(SetChannelVisibilityResult {
252 participants_to_update,
253 participants_to_remove,
254 channels_to_remove,
255 })
256 })
257 .await
258 }
259
260 pub async fn delete_channel(
261 &self,
262 channel_id: ChannelId,
263 user_id: UserId,
264 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
265 self.transaction(move |tx| async move {
266 let channel = self.get_channel_internal(channel_id, &*tx).await?;
267 self.check_user_is_channel_admin(&channel, user_id, &*tx)
268 .await?;
269
270 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
271 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
272 .select_only()
273 .column(channel_member::Column::UserId)
274 .distinct()
275 .into_values::<_, QueryUserIds>()
276 .all(&*tx)
277 .await?;
278
279 let channels_to_remove = self
280 .get_channel_descendants_including_self(vec![channel.id], &*tx)
281 .await?
282 .into_iter()
283 .map(|channel| channel.id)
284 .collect::<Vec<_>>();
285
286 channel::Entity::delete_many()
287 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
288 .exec(&*tx)
289 .await?;
290
291 Ok((channels_to_remove, members_to_notify))
292 })
293 .await
294 }
295
296 pub async fn invite_channel_member(
297 &self,
298 channel_id: ChannelId,
299 invitee_id: UserId,
300 inviter_id: UserId,
301 role: ChannelRole,
302 ) -> Result<InviteMemberResult> {
303 self.transaction(move |tx| async move {
304 let channel = self.get_channel_internal(channel_id, &*tx).await?;
305 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
306 .await?;
307
308 channel_member::ActiveModel {
309 id: ActiveValue::NotSet,
310 channel_id: ActiveValue::Set(channel_id),
311 user_id: ActiveValue::Set(invitee_id),
312 accepted: ActiveValue::Set(false),
313 role: ActiveValue::Set(role),
314 }
315 .insert(&*tx)
316 .await?;
317
318 let channel = Channel::from_model(channel, role);
319
320 let notifications = self
321 .create_notification(
322 invitee_id,
323 rpc::Notification::ChannelInvitation {
324 channel_id: channel_id.to_proto(),
325 channel_name: channel.name.clone(),
326 inviter_id: inviter_id.to_proto(),
327 },
328 true,
329 &*tx,
330 )
331 .await?
332 .into_iter()
333 .collect();
334
335 Ok(InviteMemberResult {
336 channel,
337 notifications,
338 })
339 })
340 .await
341 }
342
343 fn sanitize_channel_name(name: &str) -> Result<&str> {
344 let new_name = name.trim().trim_start_matches('#');
345 if new_name == "" {
346 Err(anyhow!("channel name can't be blank"))?;
347 }
348 Ok(new_name)
349 }
350
351 pub async fn rename_channel(
352 &self,
353 channel_id: ChannelId,
354 admin_id: UserId,
355 new_name: &str,
356 ) -> Result<RenameChannelResult> {
357 self.transaction(move |tx| async move {
358 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
359
360 let channel = self.get_channel_internal(channel_id, &*tx).await?;
361 let role = self
362 .check_user_is_channel_admin(&channel, admin_id, &*tx)
363 .await?;
364
365 let mut model = channel.into_active_model();
366 model.name = ActiveValue::Set(new_name.clone());
367 let channel = model.update(&*tx).await?;
368
369 let participants = self
370 .get_channel_participant_details_internal(&channel, &*tx)
371 .await?;
372
373 Ok(RenameChannelResult {
374 channel: Channel::from_model(channel.clone(), role),
375 participants_to_update: participants
376 .iter()
377 .map(|participant| {
378 (
379 participant.user_id,
380 Channel::from_model(channel.clone(), participant.role),
381 )
382 })
383 .collect(),
384 })
385 })
386 .await
387 }
388
389 pub async fn respond_to_channel_invite(
390 &self,
391 channel_id: ChannelId,
392 user_id: UserId,
393 accept: bool,
394 ) -> Result<RespondToChannelInvite> {
395 self.transaction(move |tx| async move {
396 let channel = self.get_channel_internal(channel_id, &*tx).await?;
397
398 let membership_update = if accept {
399 let rows_affected = channel_member::Entity::update_many()
400 .set(channel_member::ActiveModel {
401 accepted: ActiveValue::Set(accept),
402 ..Default::default()
403 })
404 .filter(
405 channel_member::Column::ChannelId
406 .eq(channel_id)
407 .and(channel_member::Column::UserId.eq(user_id))
408 .and(channel_member::Column::Accepted.eq(false)),
409 )
410 .exec(&*tx)
411 .await?
412 .rows_affected;
413
414 if rows_affected == 0 {
415 Err(anyhow!("no such invitation"))?;
416 }
417
418 Some(
419 self.calculate_membership_updated(&channel, user_id, &*tx)
420 .await?,
421 )
422 } else {
423 let rows_affected = channel_member::Entity::delete_many()
424 .filter(
425 channel_member::Column::ChannelId
426 .eq(channel_id)
427 .and(channel_member::Column::UserId.eq(user_id))
428 .and(channel_member::Column::Accepted.eq(false)),
429 )
430 .exec(&*tx)
431 .await?
432 .rows_affected;
433 if rows_affected == 0 {
434 Err(anyhow!("no such invitation"))?;
435 }
436
437 None
438 };
439
440 Ok(RespondToChannelInvite {
441 membership_update,
442 notifications: self
443 .mark_notification_as_read_with_response(
444 user_id,
445 &rpc::Notification::ChannelInvitation {
446 channel_id: channel_id.to_proto(),
447 channel_name: Default::default(),
448 inviter_id: Default::default(),
449 },
450 accept,
451 &*tx,
452 )
453 .await?
454 .into_iter()
455 .collect(),
456 })
457 })
458 .await
459 }
460
461 async fn calculate_membership_updated(
462 &self,
463 channel: &channel::Model,
464 user_id: UserId,
465 tx: &DatabaseTransaction,
466 ) -> Result<MembershipUpdated> {
467 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
468 let removed_channels = self
469 .get_channel_descendants_including_self(vec![channel.id], &*tx)
470 .await?
471 .into_iter()
472 .filter_map(|channel| {
473 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
474 Some(channel.id)
475 } else {
476 None
477 }
478 })
479 .collect::<Vec<_>>();
480
481 Ok(MembershipUpdated {
482 channel_id: channel.id,
483 new_channels,
484 removed_channels,
485 })
486 }
487
488 pub async fn remove_channel_member(
489 &self,
490 channel_id: ChannelId,
491 member_id: UserId,
492 admin_id: UserId,
493 ) -> Result<RemoveChannelMemberResult> {
494 self.transaction(|tx| async move {
495 let channel = self.get_channel_internal(channel_id, &*tx).await?;
496 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
497 .await?;
498
499 let result = channel_member::Entity::delete_many()
500 .filter(
501 channel_member::Column::ChannelId
502 .eq(channel_id)
503 .and(channel_member::Column::UserId.eq(member_id)),
504 )
505 .exec(&*tx)
506 .await?;
507
508 if result.rows_affected == 0 {
509 Err(anyhow!("no such member"))?;
510 }
511
512 Ok(RemoveChannelMemberResult {
513 membership_update: self
514 .calculate_membership_updated(&channel, member_id, &*tx)
515 .await?,
516 notification_id: self
517 .remove_notification(
518 member_id,
519 rpc::Notification::ChannelInvitation {
520 channel_id: channel_id.to_proto(),
521 channel_name: Default::default(),
522 inviter_id: Default::default(),
523 },
524 &*tx,
525 )
526 .await?,
527 })
528 })
529 .await
530 }
531
532 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
533 self.transaction(|tx| async move {
534 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
535
536 let channel_invites = channel_member::Entity::find()
537 .filter(
538 channel_member::Column::UserId
539 .eq(user_id)
540 .and(channel_member::Column::Accepted.eq(false)),
541 )
542 .all(&*tx)
543 .await?;
544
545 for invite in channel_invites {
546 role_for_channel.insert(invite.channel_id, invite.role);
547 }
548
549 let channels = channel::Entity::find()
550 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
551 .all(&*tx)
552 .await?;
553
554 let channels = channels
555 .into_iter()
556 .filter_map(|channel| {
557 let role = *role_for_channel.get(&channel.id)?;
558 Some(Channel::from_model(channel, role))
559 })
560 .collect();
561
562 Ok(channels)
563 })
564 .await
565 }
566
567 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
568 self.transaction(|tx| async move {
569 let tx = tx;
570
571 self.get_user_channels(user_id, None, &tx).await
572 })
573 .await
574 }
575
576 pub async fn get_user_channels(
577 &self,
578 user_id: UserId,
579 ancestor_channel: Option<&channel::Model>,
580 tx: &DatabaseTransaction,
581 ) -> Result<ChannelsForUser> {
582 let channel_memberships = channel_member::Entity::find()
583 .filter(
584 channel_member::Column::UserId
585 .eq(user_id)
586 .and(channel_member::Column::Accepted.eq(true)),
587 )
588 .all(&*tx)
589 .await?;
590
591 let descendants = self
592 .get_channel_descendants_including_self(
593 channel_memberships.iter().map(|m| m.channel_id),
594 &*tx,
595 )
596 .await?;
597
598 let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
599 for membership in channel_memberships.iter() {
600 roles_by_channel_id.insert(membership.channel_id, membership.role);
601 }
602
603 let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
604
605 let channels: Vec<Channel> = descendants
606 .into_iter()
607 .filter_map(|channel| {
608 let parent_role = channel
609 .parent_id()
610 .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
611
612 let role = if let Some(parent_role) = parent_role {
613 let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
614 existing_role.max(*parent_role)
615 } else {
616 *parent_role
617 };
618 roles_by_channel_id.insert(channel.id, role);
619 role
620 } else {
621 *roles_by_channel_id.get(&channel.id)?
622 };
623
624 let can_see_parent_paths = role.can_see_all_descendants()
625 || role.can_only_see_public_descendants()
626 && channel.visibility == ChannelVisibility::Public;
627 if !can_see_parent_paths {
628 return None;
629 }
630
631 visible_channel_ids.insert(channel.id);
632
633 if let Some(ancestor) = ancestor_channel {
634 if !channel
635 .ancestors_including_self()
636 .any(|id| id == ancestor.id)
637 {
638 return None;
639 }
640 }
641
642 let mut channel = Channel::from_model(channel, role);
643 channel
644 .parent_path
645 .retain(|id| visible_channel_ids.contains(&id));
646
647 Some(channel)
648 })
649 .collect();
650
651 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
652 enum QueryUserIdsAndChannelIds {
653 ChannelId,
654 UserId,
655 }
656
657 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
658 {
659 let mut rows = room_participant::Entity::find()
660 .inner_join(room::Entity)
661 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
662 .select_only()
663 .column(room::Column::ChannelId)
664 .column(room_participant::Column::UserId)
665 .into_values::<_, QueryUserIdsAndChannelIds>()
666 .stream(&*tx)
667 .await?;
668 while let Some(row) = rows.next().await {
669 let row: (ChannelId, UserId) = row?;
670 channel_participants.entry(row.0).or_default().push(row.1)
671 }
672 }
673
674 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
675 let channel_buffer_changes = self
676 .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
677 .await?;
678
679 let unseen_messages = self
680 .unseen_channel_messages(user_id, &channel_ids, &*tx)
681 .await?;
682
683 Ok(ChannelsForUser {
684 channels,
685 channel_participants,
686 unseen_buffer_changes: channel_buffer_changes,
687 channel_messages: unseen_messages,
688 })
689 }
690
691 async fn participants_to_notify_for_channel_change(
692 &self,
693 new_parent: &channel::Model,
694 tx: &DatabaseTransaction,
695 ) -> Result<Vec<(UserId, ChannelsForUser)>> {
696 let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
697
698 let members = self
699 .get_channel_participant_details_internal(new_parent, &*tx)
700 .await?;
701
702 for member in members.iter() {
703 if !member.role.can_see_all_descendants() {
704 continue;
705 }
706 results.push((
707 member.user_id,
708 self.get_user_channels(member.user_id, Some(new_parent), &*tx)
709 .await?,
710 ))
711 }
712
713 let public_parents = self
714 .public_ancestors_including_self(new_parent, &*tx)
715 .await?;
716 let public_parent = public_parents.last();
717
718 let Some(public_parent) = public_parent else {
719 return Ok(results);
720 };
721
722 // could save some time in the common case by skipping this if the
723 // new channel is not public and has no public descendants.
724 let public_members = if public_parent == new_parent {
725 members
726 } else {
727 self.get_channel_participant_details_internal(public_parent, &*tx)
728 .await?
729 };
730
731 for member in public_members {
732 if !member.role.can_only_see_public_descendants() {
733 continue;
734 };
735 results.push((
736 member.user_id,
737 self.get_user_channels(member.user_id, Some(public_parent), &*tx)
738 .await?,
739 ))
740 }
741
742 Ok(results)
743 }
744
745 pub async fn set_channel_member_role(
746 &self,
747 channel_id: ChannelId,
748 admin_id: UserId,
749 for_user: UserId,
750 role: ChannelRole,
751 ) -> Result<SetMemberRoleResult> {
752 self.transaction(|tx| async move {
753 let channel = self.get_channel_internal(channel_id, &*tx).await?;
754 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
755 .await?;
756
757 let membership = channel_member::Entity::find()
758 .filter(
759 channel_member::Column::ChannelId
760 .eq(channel_id)
761 .and(channel_member::Column::UserId.eq(for_user)),
762 )
763 .one(&*tx)
764 .await?;
765
766 let Some(membership) = membership else {
767 Err(anyhow!("no such member"))?
768 };
769
770 let mut update = membership.into_active_model();
771 update.role = ActiveValue::Set(role);
772 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
773
774 if updated.accepted {
775 Ok(SetMemberRoleResult::MembershipUpdated(
776 self.calculate_membership_updated(&channel, for_user, &*tx)
777 .await?,
778 ))
779 } else {
780 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
781 channel, role,
782 )))
783 }
784 })
785 .await
786 }
787
788 pub async fn get_channel_participant_details(
789 &self,
790 channel_id: ChannelId,
791 user_id: UserId,
792 ) -> Result<Vec<proto::ChannelMember>> {
793 let (role, members) = self
794 .transaction(move |tx| async move {
795 let channel = self.get_channel_internal(channel_id, &*tx).await?;
796 let role = self
797 .check_user_is_channel_participant(&channel, user_id, &*tx)
798 .await?;
799 Ok((
800 role,
801 self.get_channel_participant_details_internal(&channel, &*tx)
802 .await?,
803 ))
804 })
805 .await?;
806
807 if role == ChannelRole::Admin {
808 Ok(members
809 .into_iter()
810 .map(|channel_member| channel_member.to_proto())
811 .collect())
812 } else {
813 return Ok(members
814 .into_iter()
815 .filter_map(|member| {
816 if member.kind == proto::channel_member::Kind::Invitee {
817 return None;
818 }
819 Some(ChannelMember {
820 role: member.role,
821 user_id: member.user_id,
822 kind: proto::channel_member::Kind::Member,
823 })
824 })
825 .map(|channel_member| channel_member.to_proto())
826 .collect());
827 }
828 }
829
830 async fn get_channel_participant_details_internal(
831 &self,
832 channel: &channel::Model,
833 tx: &DatabaseTransaction,
834 ) -> Result<Vec<ChannelMember>> {
835 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
836 enum QueryMemberDetails {
837 UserId,
838 Role,
839 IsDirectMember,
840 Accepted,
841 Visibility,
842 }
843
844 let mut stream = channel_member::Entity::find()
845 .left_join(channel::Entity)
846 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
847 .select_only()
848 .column(channel_member::Column::UserId)
849 .column(channel_member::Column::Role)
850 .column_as(
851 channel_member::Column::ChannelId.eq(channel.id),
852 QueryMemberDetails::IsDirectMember,
853 )
854 .column(channel_member::Column::Accepted)
855 .column(channel::Column::Visibility)
856 .into_values::<_, QueryMemberDetails>()
857 .stream(&*tx)
858 .await?;
859
860 let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
861
862 while let Some(user_membership) = stream.next().await {
863 let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
864 UserId,
865 ChannelRole,
866 bool,
867 bool,
868 ChannelVisibility,
869 ) = user_membership?;
870 let kind = match (is_direct_member, is_invite_accepted) {
871 (true, true) => proto::channel_member::Kind::Member,
872 (true, false) => proto::channel_member::Kind::Invitee,
873 (false, true) => proto::channel_member::Kind::AncestorMember,
874 (false, false) => continue,
875 };
876
877 if channel_role == ChannelRole::Guest
878 && visibility != ChannelVisibility::Public
879 && channel.visibility != ChannelVisibility::Public
880 {
881 continue;
882 }
883
884 if let Some(details_mut) = user_details.get_mut(&user_id) {
885 if channel_role.should_override(details_mut.role) {
886 details_mut.role = channel_role;
887 }
888 if kind == Kind::Member {
889 details_mut.kind = kind;
890 // the UI is going to be a bit confusing if you already have permissions
891 // that are greater than or equal to the ones you're being invited to.
892 } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
893 details_mut.kind = kind;
894 }
895 } else {
896 user_details.insert(
897 user_id,
898 ChannelMember {
899 user_id,
900 kind,
901 role: channel_role,
902 },
903 );
904 }
905 }
906
907 Ok(user_details
908 .into_iter()
909 .map(|(_, details)| details)
910 .collect())
911 }
912
913 pub async fn get_channel_participants(
914 &self,
915 channel: &channel::Model,
916 tx: &DatabaseTransaction,
917 ) -> Result<Vec<UserId>> {
918 let participants = self
919 .get_channel_participant_details_internal(channel, &*tx)
920 .await?;
921 Ok(participants
922 .into_iter()
923 .map(|member| member.user_id)
924 .collect())
925 }
926
927 pub async fn check_user_is_channel_admin(
928 &self,
929 channel: &channel::Model,
930 user_id: UserId,
931 tx: &DatabaseTransaction,
932 ) -> Result<ChannelRole> {
933 let role = self.channel_role_for_user(channel, user_id, tx).await?;
934 match role {
935 Some(ChannelRole::Admin) => Ok(role.unwrap()),
936 Some(ChannelRole::Member)
937 | Some(ChannelRole::Banned)
938 | Some(ChannelRole::Guest)
939 | None => Err(anyhow!(
940 "user is not a channel admin or channel does not exist"
941 ))?,
942 }
943 }
944
945 pub async fn check_user_is_channel_member(
946 &self,
947 channel: &channel::Model,
948 user_id: UserId,
949 tx: &DatabaseTransaction,
950 ) -> Result<ChannelRole> {
951 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
952 match channel_role {
953 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
954 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
955 "user is not a channel member or channel does not exist"
956 ))?,
957 }
958 }
959
960 pub async fn check_user_is_channel_participant(
961 &self,
962 channel: &channel::Model,
963 user_id: UserId,
964 tx: &DatabaseTransaction,
965 ) -> Result<ChannelRole> {
966 let role = self.channel_role_for_user(channel, user_id, tx).await?;
967 match role {
968 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
969 Ok(role.unwrap())
970 }
971 Some(ChannelRole::Banned) | None => Err(anyhow!(
972 "user is not a channel participant or channel does not exist"
973 ))?,
974 }
975 }
976
977 pub async fn pending_invite_for_channel(
978 &self,
979 channel: &channel::Model,
980 user_id: UserId,
981 tx: &DatabaseTransaction,
982 ) -> Result<Option<channel_member::Model>> {
983 let row = channel_member::Entity::find()
984 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
985 .filter(channel_member::Column::UserId.eq(user_id))
986 .filter(channel_member::Column::Accepted.eq(false))
987 .one(&*tx)
988 .await?;
989
990 Ok(row)
991 }
992
993 pub async fn public_parent_channel(
994 &self,
995 channel: &channel::Model,
996 tx: &DatabaseTransaction,
997 ) -> Result<Option<channel::Model>> {
998 let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
999 if path.last().unwrap().id == channel.id {
1000 path.pop();
1001 }
1002 Ok(path.pop())
1003 }
1004
1005 pub async fn public_ancestors_including_self(
1006 &self,
1007 channel: &channel::Model,
1008 tx: &DatabaseTransaction,
1009 ) -> Result<Vec<channel::Model>> {
1010 let visible_channels = channel::Entity::find()
1011 .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1012 .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1013 .order_by_asc(channel::Column::ParentPath)
1014 .all(&*tx)
1015 .await?;
1016
1017 Ok(visible_channels)
1018 }
1019
1020 pub async fn channel_role_for_user(
1021 &self,
1022 channel: &channel::Model,
1023 user_id: UserId,
1024 tx: &DatabaseTransaction,
1025 ) -> Result<Option<ChannelRole>> {
1026 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1027 enum QueryChannelMembership {
1028 ChannelId,
1029 Role,
1030 Visibility,
1031 }
1032
1033 let mut rows = channel_member::Entity::find()
1034 .left_join(channel::Entity)
1035 .filter(
1036 channel_member::Column::ChannelId
1037 .is_in(channel.ancestors_including_self())
1038 .and(channel_member::Column::UserId.eq(user_id))
1039 .and(channel_member::Column::Accepted.eq(true)),
1040 )
1041 .select_only()
1042 .column(channel_member::Column::ChannelId)
1043 .column(channel_member::Column::Role)
1044 .column(channel::Column::Visibility)
1045 .into_values::<_, QueryChannelMembership>()
1046 .stream(&*tx)
1047 .await?;
1048
1049 let mut user_role: Option<ChannelRole> = None;
1050
1051 let mut is_participant = false;
1052 let mut current_channel_visibility = None;
1053
1054 // note these channels are not iterated in any particular order,
1055 // our current logic takes the highest permission available.
1056 while let Some(row) = rows.next().await {
1057 let (membership_channel, role, visibility): (
1058 ChannelId,
1059 ChannelRole,
1060 ChannelVisibility,
1061 ) = row?;
1062
1063 match role {
1064 ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1065 if let Some(users_role) = user_role {
1066 user_role = Some(users_role.max(role));
1067 } else {
1068 user_role = Some(role)
1069 }
1070 }
1071 ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1072 is_participant = true
1073 }
1074 ChannelRole::Guest => {}
1075 }
1076 if channel.id == membership_channel {
1077 current_channel_visibility = Some(visibility);
1078 }
1079 }
1080 // free up database connection
1081 drop(rows);
1082
1083 if is_participant && user_role.is_none() {
1084 if current_channel_visibility.is_none() {
1085 current_channel_visibility = channel::Entity::find()
1086 .filter(channel::Column::Id.eq(channel.id))
1087 .one(&*tx)
1088 .await?
1089 .map(|channel| channel.visibility);
1090 }
1091 if current_channel_visibility == Some(ChannelVisibility::Public) {
1092 user_role = Some(ChannelRole::Guest);
1093 }
1094 }
1095
1096 Ok(user_role)
1097 }
1098
1099 // Get the descendants of the given set if channels, ordered by their
1100 // path.
1101 async fn get_channel_descendants_including_self(
1102 &self,
1103 channel_ids: impl IntoIterator<Item = ChannelId>,
1104 tx: &DatabaseTransaction,
1105 ) -> Result<Vec<channel::Model>> {
1106 let mut values = String::new();
1107 for id in channel_ids {
1108 if !values.is_empty() {
1109 values.push_str(", ");
1110 }
1111 write!(&mut values, "({})", id).unwrap();
1112 }
1113
1114 if values.is_empty() {
1115 return Ok(vec![]);
1116 }
1117
1118 let sql = format!(
1119 r#"
1120 SELECT DISTINCT
1121 descendant_channels.*,
1122 descendant_channels.parent_path || descendant_channels.id as full_path
1123 FROM
1124 channels parent_channels, channels descendant_channels
1125 WHERE
1126 descendant_channels.id IN ({values}) OR
1127 (
1128 parent_channels.id IN ({values}) AND
1129 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1130 )
1131 ORDER BY
1132 full_path ASC
1133 "#
1134 );
1135
1136 Ok(channel::Entity::find()
1137 .from_raw_sql(Statement::from_string(
1138 self.pool.get_database_backend(),
1139 sql,
1140 ))
1141 .all(tx)
1142 .await?)
1143 }
1144
1145 /// Returns the channel with the given ID
1146 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1147 self.transaction(|tx| async move {
1148 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1149 let role = self
1150 .check_user_is_channel_participant(&channel, user_id, &*tx)
1151 .await?;
1152
1153 Ok(Channel::from_model(channel, role))
1154 })
1155 .await
1156 }
1157
1158 pub async fn get_channel_internal(
1159 &self,
1160 channel_id: ChannelId,
1161 tx: &DatabaseTransaction,
1162 ) -> Result<channel::Model> {
1163 Ok(channel::Entity::find_by_id(channel_id)
1164 .one(&*tx)
1165 .await?
1166 .ok_or_else(|| anyhow!("no such channel"))?)
1167 }
1168
1169 pub(crate) async fn get_or_create_channel_room(
1170 &self,
1171 channel_id: ChannelId,
1172 live_kit_room: &str,
1173 environment: &str,
1174 tx: &DatabaseTransaction,
1175 ) -> Result<RoomId> {
1176 let room = room::Entity::find()
1177 .filter(room::Column::ChannelId.eq(channel_id))
1178 .one(&*tx)
1179 .await?;
1180
1181 let room_id = if let Some(room) = room {
1182 if let Some(env) = room.enviroment {
1183 if &env != environment {
1184 Err(anyhow!("must join using the {} release", env))?;
1185 }
1186 }
1187 room.id
1188 } else {
1189 let result = room::Entity::insert(room::ActiveModel {
1190 channel_id: ActiveValue::Set(Some(channel_id)),
1191 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1192 enviroment: ActiveValue::Set(Some(environment.to_string())),
1193 ..Default::default()
1194 })
1195 .exec(&*tx)
1196 .await?;
1197
1198 result.last_insert_id
1199 };
1200
1201 Ok(room_id)
1202 }
1203
1204 /// Move a channel from one parent to another
1205 pub async fn move_channel(
1206 &self,
1207 channel_id: ChannelId,
1208 new_parent_id: Option<ChannelId>,
1209 admin_id: UserId,
1210 ) -> Result<Option<MoveChannelResult>> {
1211 self.transaction(|tx| async move {
1212 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1213 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1214 .await?;
1215
1216 let new_parent_path;
1217 let new_parent_channel;
1218 if let Some(new_parent_id) = new_parent_id {
1219 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1220 self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1221 .await?;
1222
1223 if new_parent
1224 .ancestors_including_self()
1225 .any(|id| id == channel.id)
1226 {
1227 Err(anyhow!("cannot move a channel into one of its descendants"))?;
1228 }
1229
1230 new_parent_path = new_parent.path();
1231 new_parent_channel = Some(new_parent);
1232 } else {
1233 new_parent_path = String::new();
1234 new_parent_channel = None;
1235 };
1236
1237 let previous_participants = self
1238 .get_channel_participant_details_internal(&channel, &*tx)
1239 .await?;
1240
1241 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1242 let new_path = format!("{}{}/", new_parent_path, channel.id);
1243
1244 if old_path == new_path {
1245 return Ok(None);
1246 }
1247
1248 let mut model = channel.into_active_model();
1249 model.parent_path = ActiveValue::Set(new_parent_path);
1250 let channel = model.update(&*tx).await?;
1251
1252 if new_parent_channel.is_none() {
1253 channel_member::ActiveModel {
1254 id: ActiveValue::NotSet,
1255 channel_id: ActiveValue::Set(channel_id),
1256 user_id: ActiveValue::Set(admin_id),
1257 accepted: ActiveValue::Set(true),
1258 role: ActiveValue::Set(ChannelRole::Admin),
1259 }
1260 .insert(&*tx)
1261 .await?;
1262 }
1263
1264 let descendent_ids =
1265 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1266 self.pool.get_database_backend(),
1267 "
1268 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1269 WHERE parent_path LIKE $3 || '%'
1270 RETURNING id
1271 ",
1272 [old_path.clone().into(), new_path.into(), old_path.into()],
1273 ))
1274 .all(&*tx)
1275 .await?;
1276
1277 let participants_to_update: HashMap<_, _> = self
1278 .participants_to_notify_for_channel_change(
1279 new_parent_channel.as_ref().unwrap_or(&channel),
1280 &*tx,
1281 )
1282 .await?
1283 .into_iter()
1284 .collect();
1285
1286 let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1287 for id in descendent_ids {
1288 moved_channels.insert(id);
1289 }
1290 moved_channels.insert(channel_id);
1291
1292 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1293 for participant in previous_participants {
1294 if participant.kind == proto::channel_member::Kind::AncestorMember {
1295 if !participants_to_update.contains_key(&participant.user_id) {
1296 participants_to_remove.insert(participant.user_id);
1297 }
1298 }
1299 }
1300
1301 Ok(Some(MoveChannelResult {
1302 participants_to_remove,
1303 participants_to_update,
1304 moved_channels,
1305 }))
1306 })
1307 .await
1308 }
1309}
1310
1311#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1312enum QueryIds {
1313 Id,
1314}
1315
1316#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1317enum QueryUserIds {
1318 UserId,
1319}