1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .id)
36 }
37
38 /// Creates a new channel.
39 pub async fn create_channel(
40 &self,
41 name: &str,
42 parent_channel_id: Option<ChannelId>,
43 admin_id: UserId,
44 ) -> Result<Channel> {
45 let name = Self::sanitize_channel_name(name)?;
46 self.transaction(move |tx| async move {
47 let mut parent = None;
48
49 if let Some(parent_channel_id) = parent_channel_id {
50 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
51 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
52 .await?;
53 parent = Some(parent_channel);
54 }
55
56 let channel = channel::ActiveModel {
57 id: ActiveValue::NotSet,
58 name: ActiveValue::Set(name.to_string()),
59 visibility: ActiveValue::Set(ChannelVisibility::Members),
60 parent_path: ActiveValue::Set(
61 parent
62 .as_ref()
63 .map_or(String::new(), |parent| parent.path()),
64 ),
65 requires_zed_cla: ActiveValue::NotSet,
66 }
67 .insert(&*tx)
68 .await?;
69
70 if parent.is_none() {
71 channel_member::ActiveModel {
72 id: ActiveValue::NotSet,
73 channel_id: ActiveValue::Set(channel.id),
74 user_id: ActiveValue::Set(admin_id),
75 accepted: ActiveValue::Set(true),
76 role: ActiveValue::Set(ChannelRole::Admin),
77 }
78 .insert(&*tx)
79 .await?;
80 }
81
82 Ok(Channel::from_model(channel, ChannelRole::Admin))
83 })
84 .await
85 }
86
87 /// Adds a user to the specified channel.
88 pub async fn join_channel(
89 &self,
90 channel_id: ChannelId,
91 user_id: UserId,
92 connection: ConnectionId,
93 environment: &str,
94 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
95 self.transaction(move |tx| async move {
96 let channel = self.get_channel_internal(channel_id, &*tx).await?;
97 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
98
99 let mut accept_invite_result = None;
100
101 if role.is_none() {
102 if let Some(invitation) = self
103 .pending_invite_for_channel(&channel, user_id, &*tx)
104 .await?
105 {
106 // note, this may be a parent channel
107 role = Some(invitation.role);
108 channel_member::Entity::update(channel_member::ActiveModel {
109 accepted: ActiveValue::Set(true),
110 ..invitation.into_active_model()
111 })
112 .exec(&*tx)
113 .await?;
114
115 accept_invite_result = Some(
116 self.calculate_membership_updated(&channel, user_id, &*tx)
117 .await?,
118 );
119
120 debug_assert!(
121 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
122 );
123 } else if channel.visibility == ChannelVisibility::Public {
124 role = Some(ChannelRole::Guest);
125 let channel_to_join = self
126 .public_ancestors_including_self(&channel, &*tx)
127 .await?
128 .first()
129 .cloned()
130 .unwrap_or(channel.clone());
131
132 channel_member::Entity::insert(channel_member::ActiveModel {
133 id: ActiveValue::NotSet,
134 channel_id: ActiveValue::Set(channel_to_join.id),
135 user_id: ActiveValue::Set(user_id),
136 accepted: ActiveValue::Set(true),
137 role: ActiveValue::Set(ChannelRole::Guest),
138 })
139 .exec(&*tx)
140 .await?;
141
142 accept_invite_result = Some(
143 self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
144 .await?,
145 );
146
147 debug_assert!(
148 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
149 );
150 }
151 }
152
153 if role.is_none() || role == Some(ChannelRole::Banned) {
154 Err(ErrorCode::Forbidden.anyhow())?
155 }
156 let role = role.unwrap();
157
158 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
159 let room_id = self
160 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
161 .await?;
162
163 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
164 .await
165 .map(|jr| (jr, accept_invite_result, role))
166 })
167 .await
168 }
169
170 /// Sets the visibiltity of the given channel.
171 pub async fn set_channel_visibility(
172 &self,
173 channel_id: ChannelId,
174 visibility: ChannelVisibility,
175 admin_id: UserId,
176 ) -> Result<SetChannelVisibilityResult> {
177 self.transaction(move |tx| async move {
178 let channel = self.get_channel_internal(channel_id, &*tx).await?;
179
180 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
181 .await?;
182
183 let previous_members = self
184 .get_channel_participant_details_internal(&channel, &*tx)
185 .await?;
186
187 let mut model = channel.into_active_model();
188 model.visibility = ActiveValue::Set(visibility);
189 let channel = model.update(&*tx).await?;
190
191 let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
192 .participants_to_notify_for_channel_change(&channel, &*tx)
193 .await?
194 .into_iter()
195 .collect();
196
197 let mut channels_to_remove: Vec<ChannelId> = vec![];
198 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
199 match visibility {
200 ChannelVisibility::Members => {
201 let all_descendents: Vec<ChannelId> = self
202 .get_channel_descendants_including_self(vec![channel_id], &*tx)
203 .await?
204 .into_iter()
205 .map(|channel| channel.id)
206 .collect();
207
208 channels_to_remove = channel::Entity::find()
209 .filter(
210 channel::Column::Id
211 .is_in(all_descendents)
212 .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
213 )
214 .all(&*tx)
215 .await?
216 .into_iter()
217 .map(|channel| channel.id)
218 .collect();
219
220 channels_to_remove.push(channel_id);
221
222 for member in previous_members {
223 if member.role.can_only_see_public_descendants() {
224 participants_to_remove.insert(member.user_id);
225 }
226 }
227 }
228 ChannelVisibility::Public => {
229 if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
230 let parent_updates = self
231 .participants_to_notify_for_channel_change(&public_parent, &*tx)
232 .await?;
233
234 for (user_id, channels) in parent_updates {
235 participants_to_update.insert(user_id, channels);
236 }
237 }
238 }
239 }
240
241 Ok(SetChannelVisibilityResult {
242 participants_to_update,
243 participants_to_remove,
244 channels_to_remove,
245 })
246 })
247 .await
248 }
249
250 #[cfg(test)]
251 pub async fn set_channel_requires_zed_cla(
252 &self,
253 channel_id: ChannelId,
254 requires_zed_cla: bool,
255 ) -> Result<()> {
256 self.transaction(move |tx| async move {
257 let channel = self.get_channel_internal(channel_id, &*tx).await?;
258 let mut model = channel.into_active_model();
259 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
260 model.update(&*tx).await?;
261 Ok(())
262 })
263 .await
264 }
265
266 /// Deletes the channel with the specified ID.
267 pub async fn delete_channel(
268 &self,
269 channel_id: ChannelId,
270 user_id: UserId,
271 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
272 self.transaction(move |tx| async move {
273 let channel = self.get_channel_internal(channel_id, &*tx).await?;
274 self.check_user_is_channel_admin(&channel, user_id, &*tx)
275 .await?;
276
277 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
278 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
279 .select_only()
280 .column(channel_member::Column::UserId)
281 .distinct()
282 .into_values::<_, QueryUserIds>()
283 .all(&*tx)
284 .await?;
285
286 let channels_to_remove = self
287 .get_channel_descendants_including_self(vec![channel.id], &*tx)
288 .await?
289 .into_iter()
290 .map(|channel| channel.id)
291 .collect::<Vec<_>>();
292
293 channel::Entity::delete_many()
294 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
295 .exec(&*tx)
296 .await?;
297
298 Ok((channels_to_remove, members_to_notify))
299 })
300 .await
301 }
302
303 /// Invites a user to a channel as a member.
304 pub async fn invite_channel_member(
305 &self,
306 channel_id: ChannelId,
307 invitee_id: UserId,
308 inviter_id: UserId,
309 role: ChannelRole,
310 ) -> Result<InviteMemberResult> {
311 self.transaction(move |tx| async move {
312 let channel = self.get_channel_internal(channel_id, &*tx).await?;
313 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
314 .await?;
315
316 channel_member::ActiveModel {
317 id: ActiveValue::NotSet,
318 channel_id: ActiveValue::Set(channel_id),
319 user_id: ActiveValue::Set(invitee_id),
320 accepted: ActiveValue::Set(false),
321 role: ActiveValue::Set(role),
322 }
323 .insert(&*tx)
324 .await?;
325
326 let channel = Channel::from_model(channel, role);
327
328 let notifications = self
329 .create_notification(
330 invitee_id,
331 rpc::Notification::ChannelInvitation {
332 channel_id: channel_id.to_proto(),
333 channel_name: channel.name.clone(),
334 inviter_id: inviter_id.to_proto(),
335 },
336 true,
337 &*tx,
338 )
339 .await?
340 .into_iter()
341 .collect();
342
343 Ok(InviteMemberResult {
344 channel,
345 notifications,
346 })
347 })
348 .await
349 }
350
351 fn sanitize_channel_name(name: &str) -> Result<&str> {
352 let new_name = name.trim().trim_start_matches('#');
353 if new_name == "" {
354 Err(anyhow!("channel name can't be blank"))?;
355 }
356 Ok(new_name)
357 }
358
359 /// Renames the specified channel.
360 pub async fn rename_channel(
361 &self,
362 channel_id: ChannelId,
363 admin_id: UserId,
364 new_name: &str,
365 ) -> Result<RenameChannelResult> {
366 self.transaction(move |tx| async move {
367 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
368
369 let channel = self.get_channel_internal(channel_id, &*tx).await?;
370 let role = self
371 .check_user_is_channel_admin(&channel, admin_id, &*tx)
372 .await?;
373
374 let mut model = channel.into_active_model();
375 model.name = ActiveValue::Set(new_name.clone());
376 let channel = model.update(&*tx).await?;
377
378 let participants = self
379 .get_channel_participant_details_internal(&channel, &*tx)
380 .await?;
381
382 Ok(RenameChannelResult {
383 channel: Channel::from_model(channel.clone(), role),
384 participants_to_update: participants
385 .iter()
386 .map(|participant| {
387 (
388 participant.user_id,
389 Channel::from_model(channel.clone(), participant.role),
390 )
391 })
392 .collect(),
393 })
394 })
395 .await
396 }
397
398 /// accept or decline an invite to join a channel
399 pub async fn respond_to_channel_invite(
400 &self,
401 channel_id: ChannelId,
402 user_id: UserId,
403 accept: bool,
404 ) -> Result<RespondToChannelInvite> {
405 self.transaction(move |tx| async move {
406 let channel = self.get_channel_internal(channel_id, &*tx).await?;
407
408 let membership_update = if accept {
409 let rows_affected = channel_member::Entity::update_many()
410 .set(channel_member::ActiveModel {
411 accepted: ActiveValue::Set(accept),
412 ..Default::default()
413 })
414 .filter(
415 channel_member::Column::ChannelId
416 .eq(channel_id)
417 .and(channel_member::Column::UserId.eq(user_id))
418 .and(channel_member::Column::Accepted.eq(false)),
419 )
420 .exec(&*tx)
421 .await?
422 .rows_affected;
423
424 if rows_affected == 0 {
425 Err(anyhow!("no such invitation"))?;
426 }
427
428 Some(
429 self.calculate_membership_updated(&channel, user_id, &*tx)
430 .await?,
431 )
432 } else {
433 let rows_affected = channel_member::Entity::delete_many()
434 .filter(
435 channel_member::Column::ChannelId
436 .eq(channel_id)
437 .and(channel_member::Column::UserId.eq(user_id))
438 .and(channel_member::Column::Accepted.eq(false)),
439 )
440 .exec(&*tx)
441 .await?
442 .rows_affected;
443 if rows_affected == 0 {
444 Err(anyhow!("no such invitation"))?;
445 }
446
447 None
448 };
449
450 Ok(RespondToChannelInvite {
451 membership_update,
452 notifications: self
453 .mark_notification_as_read_with_response(
454 user_id,
455 &rpc::Notification::ChannelInvitation {
456 channel_id: channel_id.to_proto(),
457 channel_name: Default::default(),
458 inviter_id: Default::default(),
459 },
460 accept,
461 &*tx,
462 )
463 .await?
464 .into_iter()
465 .collect(),
466 })
467 })
468 .await
469 }
470
471 async fn calculate_membership_updated(
472 &self,
473 channel: &channel::Model,
474 user_id: UserId,
475 tx: &DatabaseTransaction,
476 ) -> Result<MembershipUpdated> {
477 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
478 let removed_channels = self
479 .get_channel_descendants_including_self(vec![channel.id], &*tx)
480 .await?
481 .into_iter()
482 .filter_map(|channel| {
483 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
484 Some(channel.id)
485 } else {
486 None
487 }
488 })
489 .collect::<Vec<_>>();
490
491 Ok(MembershipUpdated {
492 channel_id: channel.id,
493 new_channels,
494 removed_channels,
495 })
496 }
497
498 /// Removes a channel member.
499 pub async fn remove_channel_member(
500 &self,
501 channel_id: ChannelId,
502 member_id: UserId,
503 admin_id: UserId,
504 ) -> Result<RemoveChannelMemberResult> {
505 self.transaction(|tx| async move {
506 let channel = self.get_channel_internal(channel_id, &*tx).await?;
507 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
508 .await?;
509
510 let result = channel_member::Entity::delete_many()
511 .filter(
512 channel_member::Column::ChannelId
513 .eq(channel_id)
514 .and(channel_member::Column::UserId.eq(member_id)),
515 )
516 .exec(&*tx)
517 .await?;
518
519 if result.rows_affected == 0 {
520 Err(anyhow!("no such member"))?;
521 }
522
523 Ok(RemoveChannelMemberResult {
524 membership_update: self
525 .calculate_membership_updated(&channel, member_id, &*tx)
526 .await?,
527 notification_id: self
528 .remove_notification(
529 member_id,
530 rpc::Notification::ChannelInvitation {
531 channel_id: channel_id.to_proto(),
532 channel_name: Default::default(),
533 inviter_id: Default::default(),
534 },
535 &*tx,
536 )
537 .await?,
538 })
539 })
540 .await
541 }
542
543 /// Returns all channel invites for the user with the given ID.
544 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
545 self.transaction(|tx| async move {
546 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
547
548 let channel_invites = channel_member::Entity::find()
549 .filter(
550 channel_member::Column::UserId
551 .eq(user_id)
552 .and(channel_member::Column::Accepted.eq(false)),
553 )
554 .all(&*tx)
555 .await?;
556
557 for invite in channel_invites {
558 role_for_channel.insert(invite.channel_id, invite.role);
559 }
560
561 let channels = channel::Entity::find()
562 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
563 .all(&*tx)
564 .await?;
565
566 let channels = channels
567 .into_iter()
568 .filter_map(|channel| {
569 let role = *role_for_channel.get(&channel.id)?;
570 Some(Channel::from_model(channel, role))
571 })
572 .collect();
573
574 Ok(channels)
575 })
576 .await
577 }
578
579 /// Returns all channels for the user with the given ID.
580 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
581 self.transaction(|tx| async move {
582 let tx = tx;
583
584 self.get_user_channels(user_id, None, &tx).await
585 })
586 .await
587 }
588
589 /// Returns all channels for the user with the given ID that are descendants
590 /// of the specified ancestor channel.
591 pub async fn get_user_channels(
592 &self,
593 user_id: UserId,
594 ancestor_channel: Option<&channel::Model>,
595 tx: &DatabaseTransaction,
596 ) -> Result<ChannelsForUser> {
597 let channel_memberships = channel_member::Entity::find()
598 .filter(
599 channel_member::Column::UserId
600 .eq(user_id)
601 .and(channel_member::Column::Accepted.eq(true)),
602 )
603 .all(&*tx)
604 .await?;
605
606 let descendants = self
607 .get_channel_descendants_including_self(
608 channel_memberships.iter().map(|m| m.channel_id),
609 &*tx,
610 )
611 .await?;
612
613 let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
614 for membership in channel_memberships.iter() {
615 roles_by_channel_id.insert(membership.channel_id, membership.role);
616 }
617
618 let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
619
620 let channels: Vec<Channel> = descendants
621 .into_iter()
622 .filter_map(|channel| {
623 let parent_role = channel
624 .parent_id()
625 .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
626
627 let role = if let Some(parent_role) = parent_role {
628 let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
629 existing_role.max(*parent_role)
630 } else {
631 *parent_role
632 };
633 roles_by_channel_id.insert(channel.id, role);
634 role
635 } else {
636 *roles_by_channel_id.get(&channel.id)?
637 };
638
639 let can_see_parent_paths = role.can_see_all_descendants()
640 || role.can_only_see_public_descendants()
641 && channel.visibility == ChannelVisibility::Public;
642 if !can_see_parent_paths {
643 return None;
644 }
645
646 visible_channel_ids.insert(channel.id);
647
648 if let Some(ancestor) = ancestor_channel {
649 if !channel
650 .ancestors_including_self()
651 .any(|id| id == ancestor.id)
652 {
653 return None;
654 }
655 }
656
657 let mut channel = Channel::from_model(channel, role);
658 channel
659 .parent_path
660 .retain(|id| visible_channel_ids.contains(&id));
661
662 Some(channel)
663 })
664 .collect();
665
666 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
667 enum QueryUserIdsAndChannelIds {
668 ChannelId,
669 UserId,
670 }
671
672 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
673 {
674 let mut rows = room_participant::Entity::find()
675 .inner_join(room::Entity)
676 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
677 .select_only()
678 .column(room::Column::ChannelId)
679 .column(room_participant::Column::UserId)
680 .into_values::<_, QueryUserIdsAndChannelIds>()
681 .stream(&*tx)
682 .await?;
683 while let Some(row) = rows.next().await {
684 let row: (ChannelId, UserId) = row?;
685 channel_participants.entry(row.0).or_default().push(row.1)
686 }
687 }
688
689 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
690 let channel_buffer_changes = self
691 .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
692 .await?;
693
694 let unseen_messages = self
695 .unseen_channel_messages(user_id, &channel_ids, &*tx)
696 .await?;
697
698 Ok(ChannelsForUser {
699 channels,
700 channel_participants,
701 unseen_buffer_changes: channel_buffer_changes,
702 channel_messages: unseen_messages,
703 })
704 }
705
706 pub async fn new_participants_to_notify(
707 &self,
708 parent_channel_id: ChannelId,
709 ) -> Result<Vec<(UserId, ChannelsForUser)>> {
710 self.weak_transaction(|tx| async move {
711 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
712 self.participants_to_notify_for_channel_change(&parent_channel, &*tx)
713 .await
714 })
715 .await
716 }
717
718 // TODO: this is very expensive, and we should rethink
719 async fn participants_to_notify_for_channel_change(
720 &self,
721 new_parent: &channel::Model,
722 tx: &DatabaseTransaction,
723 ) -> Result<Vec<(UserId, ChannelsForUser)>> {
724 let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
725
726 let members = self
727 .get_channel_participant_details_internal(new_parent, &*tx)
728 .await?;
729
730 for member in members.iter() {
731 if !member.role.can_see_all_descendants() {
732 continue;
733 }
734 results.push((
735 member.user_id,
736 self.get_user_channels(member.user_id, Some(new_parent), &*tx)
737 .await?,
738 ))
739 }
740
741 let public_parents = self
742 .public_ancestors_including_self(new_parent, &*tx)
743 .await?;
744 let public_parent = public_parents.last();
745
746 let Some(public_parent) = public_parent else {
747 return Ok(results);
748 };
749
750 // could save some time in the common case by skipping this if the
751 // new channel is not public and has no public descendants.
752 let public_members = if public_parent == new_parent {
753 members
754 } else {
755 self.get_channel_participant_details_internal(public_parent, &*tx)
756 .await?
757 };
758
759 for member in public_members {
760 if !member.role.can_only_see_public_descendants() {
761 continue;
762 };
763 results.push((
764 member.user_id,
765 self.get_user_channels(member.user_id, Some(public_parent), &*tx)
766 .await?,
767 ))
768 }
769
770 Ok(results)
771 }
772
773 /// Sets the role for the specified channel member.
774 pub async fn set_channel_member_role(
775 &self,
776 channel_id: ChannelId,
777 admin_id: UserId,
778 for_user: UserId,
779 role: ChannelRole,
780 ) -> Result<SetMemberRoleResult> {
781 self.transaction(|tx| async move {
782 let channel = self.get_channel_internal(channel_id, &*tx).await?;
783 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
784 .await?;
785
786 let membership = channel_member::Entity::find()
787 .filter(
788 channel_member::Column::ChannelId
789 .eq(channel_id)
790 .and(channel_member::Column::UserId.eq(for_user)),
791 )
792 .one(&*tx)
793 .await?;
794
795 let Some(membership) = membership else {
796 Err(anyhow!("no such member"))?
797 };
798
799 let mut update = membership.into_active_model();
800 update.role = ActiveValue::Set(role);
801 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
802
803 if updated.accepted {
804 Ok(SetMemberRoleResult::MembershipUpdated(
805 self.calculate_membership_updated(&channel, for_user, &*tx)
806 .await?,
807 ))
808 } else {
809 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
810 channel, role,
811 )))
812 }
813 })
814 .await
815 }
816
817 /// Returns the details for the specified channel member.
818 pub async fn get_channel_participant_details(
819 &self,
820 channel_id: ChannelId,
821 user_id: UserId,
822 ) -> Result<Vec<proto::ChannelMember>> {
823 let (role, members) = self
824 .transaction(move |tx| async move {
825 let channel = self.get_channel_internal(channel_id, &*tx).await?;
826 let role = self
827 .check_user_is_channel_participant(&channel, user_id, &*tx)
828 .await?;
829 Ok((
830 role,
831 self.get_channel_participant_details_internal(&channel, &*tx)
832 .await?,
833 ))
834 })
835 .await?;
836
837 if role == ChannelRole::Admin {
838 Ok(members
839 .into_iter()
840 .map(|channel_member| channel_member.to_proto())
841 .collect())
842 } else {
843 return Ok(members
844 .into_iter()
845 .filter_map(|member| {
846 if member.kind == proto::channel_member::Kind::Invitee {
847 return None;
848 }
849 Some(ChannelMember {
850 role: member.role,
851 user_id: member.user_id,
852 kind: proto::channel_member::Kind::Member,
853 })
854 })
855 .map(|channel_member| channel_member.to_proto())
856 .collect());
857 }
858 }
859
860 async fn get_channel_participant_details_internal(
861 &self,
862 channel: &channel::Model,
863 tx: &DatabaseTransaction,
864 ) -> Result<Vec<ChannelMember>> {
865 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
866 enum QueryMemberDetails {
867 UserId,
868 Role,
869 IsDirectMember,
870 Accepted,
871 Visibility,
872 }
873
874 let mut stream = channel_member::Entity::find()
875 .left_join(channel::Entity)
876 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
877 .select_only()
878 .column(channel_member::Column::UserId)
879 .column(channel_member::Column::Role)
880 .column_as(
881 channel_member::Column::ChannelId.eq(channel.id),
882 QueryMemberDetails::IsDirectMember,
883 )
884 .column(channel_member::Column::Accepted)
885 .column(channel::Column::Visibility)
886 .into_values::<_, QueryMemberDetails>()
887 .stream(&*tx)
888 .await?;
889
890 let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
891
892 while let Some(user_membership) = stream.next().await {
893 let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
894 UserId,
895 ChannelRole,
896 bool,
897 bool,
898 ChannelVisibility,
899 ) = user_membership?;
900 let kind = match (is_direct_member, is_invite_accepted) {
901 (true, true) => proto::channel_member::Kind::Member,
902 (true, false) => proto::channel_member::Kind::Invitee,
903 (false, true) => proto::channel_member::Kind::AncestorMember,
904 (false, false) => continue,
905 };
906
907 if channel_role == ChannelRole::Guest
908 && visibility != ChannelVisibility::Public
909 && channel.visibility != ChannelVisibility::Public
910 {
911 continue;
912 }
913
914 if let Some(details_mut) = user_details.get_mut(&user_id) {
915 if channel_role.should_override(details_mut.role) {
916 details_mut.role = channel_role;
917 }
918 if kind == Kind::Member {
919 details_mut.kind = kind;
920 // the UI is going to be a bit confusing if you already have permissions
921 // that are greater than or equal to the ones you're being invited to.
922 } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
923 details_mut.kind = kind;
924 }
925 } else {
926 user_details.insert(
927 user_id,
928 ChannelMember {
929 user_id,
930 kind,
931 role: channel_role,
932 },
933 );
934 }
935 }
936
937 Ok(user_details
938 .into_iter()
939 .map(|(_, details)| details)
940 .collect())
941 }
942
943 /// Returns the participants in the given channel.
944 pub async fn get_channel_participants(
945 &self,
946 channel: &channel::Model,
947 tx: &DatabaseTransaction,
948 ) -> Result<Vec<UserId>> {
949 let participants = self
950 .get_channel_participant_details_internal(channel, &*tx)
951 .await?;
952 Ok(participants
953 .into_iter()
954 .map(|member| member.user_id)
955 .collect())
956 }
957
958 /// Returns whether the given user is an admin in the specified channel.
959 pub async fn check_user_is_channel_admin(
960 &self,
961 channel: &channel::Model,
962 user_id: UserId,
963 tx: &DatabaseTransaction,
964 ) -> Result<ChannelRole> {
965 let role = self.channel_role_for_user(channel, user_id, tx).await?;
966 match role {
967 Some(ChannelRole::Admin) => Ok(role.unwrap()),
968 Some(ChannelRole::Member)
969 | Some(ChannelRole::Banned)
970 | Some(ChannelRole::Guest)
971 | None => Err(anyhow!(
972 "user is not a channel admin or channel does not exist"
973 ))?,
974 }
975 }
976
977 /// Returns whether the given user is a member of the specified channel.
978 pub async fn check_user_is_channel_member(
979 &self,
980 channel: &channel::Model,
981 user_id: UserId,
982 tx: &DatabaseTransaction,
983 ) -> Result<ChannelRole> {
984 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
985 match channel_role {
986 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
987 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
988 "user is not a channel member or channel does not exist"
989 ))?,
990 }
991 }
992
993 /// Returns whether the given user is a participant in the specified channel.
994 pub async fn check_user_is_channel_participant(
995 &self,
996 channel: &channel::Model,
997 user_id: UserId,
998 tx: &DatabaseTransaction,
999 ) -> Result<ChannelRole> {
1000 let role = self.channel_role_for_user(channel, user_id, tx).await?;
1001 match role {
1002 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
1003 Ok(role.unwrap())
1004 }
1005 Some(ChannelRole::Banned) | None => Err(anyhow!(
1006 "user is not a channel participant or channel does not exist"
1007 ))?,
1008 }
1009 }
1010
1011 /// Returns a user's pending invite for the given channel, if one exists.
1012 pub async fn pending_invite_for_channel(
1013 &self,
1014 channel: &channel::Model,
1015 user_id: UserId,
1016 tx: &DatabaseTransaction,
1017 ) -> Result<Option<channel_member::Model>> {
1018 let row = channel_member::Entity::find()
1019 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
1020 .filter(channel_member::Column::UserId.eq(user_id))
1021 .filter(channel_member::Column::Accepted.eq(false))
1022 .one(&*tx)
1023 .await?;
1024
1025 Ok(row)
1026 }
1027
1028 async fn public_parent_channel(
1029 &self,
1030 channel: &channel::Model,
1031 tx: &DatabaseTransaction,
1032 ) -> Result<Option<channel::Model>> {
1033 let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1034 if path.last().unwrap().id == channel.id {
1035 path.pop();
1036 }
1037 Ok(path.pop())
1038 }
1039
1040 pub(crate) async fn public_ancestors_including_self(
1041 &self,
1042 channel: &channel::Model,
1043 tx: &DatabaseTransaction,
1044 ) -> Result<Vec<channel::Model>> {
1045 let visible_channels = channel::Entity::find()
1046 .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1047 .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1048 .order_by_asc(channel::Column::ParentPath)
1049 .all(&*tx)
1050 .await?;
1051
1052 Ok(visible_channels)
1053 }
1054
1055 /// Returns the role for a user in the given channel.
1056 pub async fn channel_role_for_user(
1057 &self,
1058 channel: &channel::Model,
1059 user_id: UserId,
1060 tx: &DatabaseTransaction,
1061 ) -> Result<Option<ChannelRole>> {
1062 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1063 enum QueryChannelMembership {
1064 ChannelId,
1065 Role,
1066 Visibility,
1067 }
1068
1069 let mut rows = channel_member::Entity::find()
1070 .left_join(channel::Entity)
1071 .filter(
1072 channel_member::Column::ChannelId
1073 .is_in(channel.ancestors_including_self())
1074 .and(channel_member::Column::UserId.eq(user_id))
1075 .and(channel_member::Column::Accepted.eq(true)),
1076 )
1077 .select_only()
1078 .column(channel_member::Column::ChannelId)
1079 .column(channel_member::Column::Role)
1080 .column(channel::Column::Visibility)
1081 .into_values::<_, QueryChannelMembership>()
1082 .stream(&*tx)
1083 .await?;
1084
1085 let mut user_role: Option<ChannelRole> = None;
1086
1087 let mut is_participant = false;
1088 let mut current_channel_visibility = None;
1089
1090 // note these channels are not iterated in any particular order,
1091 // our current logic takes the highest permission available.
1092 while let Some(row) = rows.next().await {
1093 let (membership_channel, role, visibility): (
1094 ChannelId,
1095 ChannelRole,
1096 ChannelVisibility,
1097 ) = row?;
1098
1099 match role {
1100 ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1101 if let Some(users_role) = user_role {
1102 user_role = Some(users_role.max(role));
1103 } else {
1104 user_role = Some(role)
1105 }
1106 }
1107 ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1108 is_participant = true
1109 }
1110 ChannelRole::Guest => {}
1111 }
1112 if channel.id == membership_channel {
1113 current_channel_visibility = Some(visibility);
1114 }
1115 }
1116 // free up database connection
1117 drop(rows);
1118
1119 if is_participant && user_role.is_none() {
1120 if current_channel_visibility.is_none() {
1121 current_channel_visibility = channel::Entity::find()
1122 .filter(channel::Column::Id.eq(channel.id))
1123 .one(&*tx)
1124 .await?
1125 .map(|channel| channel.visibility);
1126 }
1127 if current_channel_visibility == Some(ChannelVisibility::Public) {
1128 user_role = Some(ChannelRole::Guest);
1129 }
1130 }
1131
1132 Ok(user_role)
1133 }
1134
1135 // Get the descendants of the given set if channels, ordered by their
1136 // path.
1137 async fn get_channel_descendants_including_self(
1138 &self,
1139 channel_ids: impl IntoIterator<Item = ChannelId>,
1140 tx: &DatabaseTransaction,
1141 ) -> Result<Vec<channel::Model>> {
1142 let mut values = String::new();
1143 for id in channel_ids {
1144 if !values.is_empty() {
1145 values.push_str(", ");
1146 }
1147 write!(&mut values, "({})", id).unwrap();
1148 }
1149
1150 if values.is_empty() {
1151 return Ok(vec![]);
1152 }
1153
1154 let sql = format!(
1155 r#"
1156 SELECT DISTINCT
1157 descendant_channels.*,
1158 descendant_channels.parent_path || descendant_channels.id as full_path
1159 FROM
1160 channels parent_channels, channels descendant_channels
1161 WHERE
1162 descendant_channels.id IN ({values}) OR
1163 (
1164 parent_channels.id IN ({values}) AND
1165 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1166 )
1167 ORDER BY
1168 full_path ASC
1169 "#
1170 );
1171
1172 Ok(channel::Entity::find()
1173 .from_raw_sql(Statement::from_string(
1174 self.pool.get_database_backend(),
1175 sql,
1176 ))
1177 .all(tx)
1178 .await?)
1179 }
1180
1181 /// Returns the channel with the given ID.
1182 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1183 self.transaction(|tx| async move {
1184 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1185 let role = self
1186 .check_user_is_channel_participant(&channel, user_id, &*tx)
1187 .await?;
1188
1189 Ok(Channel::from_model(channel, role))
1190 })
1191 .await
1192 }
1193
1194 pub(crate) async fn get_channel_internal(
1195 &self,
1196 channel_id: ChannelId,
1197 tx: &DatabaseTransaction,
1198 ) -> Result<channel::Model> {
1199 Ok(channel::Entity::find_by_id(channel_id)
1200 .one(&*tx)
1201 .await?
1202 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
1203 }
1204
1205 pub(crate) async fn get_or_create_channel_room(
1206 &self,
1207 channel_id: ChannelId,
1208 live_kit_room: &str,
1209 environment: &str,
1210 tx: &DatabaseTransaction,
1211 ) -> Result<RoomId> {
1212 let room = room::Entity::find()
1213 .filter(room::Column::ChannelId.eq(channel_id))
1214 .one(&*tx)
1215 .await?;
1216
1217 let room_id = if let Some(room) = room {
1218 if let Some(env) = room.environment {
1219 if &env != environment {
1220 Err(ErrorCode::WrongReleaseChannel
1221 .with_tag("required", &env)
1222 .anyhow())?;
1223 }
1224 }
1225 room.id
1226 } else {
1227 let result = room::Entity::insert(room::ActiveModel {
1228 channel_id: ActiveValue::Set(Some(channel_id)),
1229 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1230 environment: ActiveValue::Set(Some(environment.to_string())),
1231 ..Default::default()
1232 })
1233 .exec(&*tx)
1234 .await?;
1235
1236 result.last_insert_id
1237 };
1238
1239 Ok(room_id)
1240 }
1241
1242 /// Move a channel from one parent to another
1243 pub async fn move_channel(
1244 &self,
1245 channel_id: ChannelId,
1246 new_parent_id: Option<ChannelId>,
1247 admin_id: UserId,
1248 ) -> Result<Option<MoveChannelResult>> {
1249 self.transaction(|tx| async move {
1250 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1251 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1252 .await?;
1253
1254 let new_parent_path;
1255 let new_parent_channel;
1256 if let Some(new_parent_id) = new_parent_id {
1257 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1258 self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1259 .await?;
1260
1261 if new_parent
1262 .ancestors_including_self()
1263 .any(|id| id == channel.id)
1264 {
1265 Err(anyhow!("cannot move a channel into one of its descendants"))?;
1266 }
1267
1268 new_parent_path = new_parent.path();
1269 new_parent_channel = Some(new_parent);
1270 } else {
1271 new_parent_path = String::new();
1272 new_parent_channel = None;
1273 };
1274
1275 let previous_participants = self
1276 .get_channel_participant_details_internal(&channel, &*tx)
1277 .await?;
1278
1279 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1280 let new_path = format!("{}{}/", new_parent_path, channel.id);
1281
1282 if old_path == new_path {
1283 return Ok(None);
1284 }
1285
1286 let mut model = channel.into_active_model();
1287 model.parent_path = ActiveValue::Set(new_parent_path);
1288 model.update(&*tx).await?;
1289
1290 if new_parent_channel.is_none() {
1291 channel_member::ActiveModel {
1292 id: ActiveValue::NotSet,
1293 channel_id: ActiveValue::Set(channel_id),
1294 user_id: ActiveValue::Set(admin_id),
1295 accepted: ActiveValue::Set(true),
1296 role: ActiveValue::Set(ChannelRole::Admin),
1297 }
1298 .insert(&*tx)
1299 .await?;
1300 }
1301
1302 let descendent_ids =
1303 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1304 self.pool.get_database_backend(),
1305 "
1306 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1307 WHERE parent_path LIKE $3 || '%'
1308 RETURNING id
1309 ",
1310 [old_path.clone().into(), new_path.into(), old_path.into()],
1311 ))
1312 .all(&*tx)
1313 .await?;
1314
1315 Ok(Some(MoveChannelResult {
1316 previous_participants,
1317 descendent_ids,
1318 }))
1319 })
1320 .await
1321 }
1322}
1323
1324#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1325enum QueryIds {
1326 Id,
1327}
1328
1329#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1330enum QueryUserIds {
1331 UserId,
1332}