1use super::*;
2use rpc::proto::channel_member::Kind;
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self
23 .create_channel(name, None, creator_id)
24 .await?
25 .channel
26 .id)
27 }
28
29 #[cfg(test)]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .channel
40 .id)
41 }
42
43 /// Creates a new channel.
44 pub async fn create_channel(
45 &self,
46 name: &str,
47 parent_channel_id: Option<ChannelId>,
48 admin_id: UserId,
49 ) -> Result<CreateChannelResult> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53
54 if let Some(parent_channel_id) = parent_channel_id {
55 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
56 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
57 .await?;
58 parent = Some(parent_channel);
59 }
60
61 let channel = channel::ActiveModel {
62 id: ActiveValue::NotSet,
63 name: ActiveValue::Set(name.to_string()),
64 visibility: ActiveValue::Set(ChannelVisibility::Members),
65 parent_path: ActiveValue::Set(
66 parent
67 .as_ref()
68 .map_or(String::new(), |parent| parent.path()),
69 ),
70 requires_zed_cla: ActiveValue::NotSet,
71 }
72 .insert(&*tx)
73 .await?;
74
75 let participants_to_update;
76 if let Some(parent) = &parent {
77 participants_to_update = self
78 .participants_to_notify_for_channel_change(parent, &*tx)
79 .await?;
80 } else {
81 participants_to_update = vec![];
82
83 channel_member::ActiveModel {
84 id: ActiveValue::NotSet,
85 channel_id: ActiveValue::Set(channel.id),
86 user_id: ActiveValue::Set(admin_id),
87 accepted: ActiveValue::Set(true),
88 role: ActiveValue::Set(ChannelRole::Admin),
89 }
90 .insert(&*tx)
91 .await?;
92 };
93
94 Ok(CreateChannelResult {
95 channel: Channel::from_model(channel, ChannelRole::Admin),
96 participants_to_update,
97 })
98 })
99 .await
100 }
101
102 /// Adds a user to the specified channel.
103 pub async fn join_channel(
104 &self,
105 channel_id: ChannelId,
106 user_id: UserId,
107 connection: ConnectionId,
108 environment: &str,
109 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
110 self.transaction(move |tx| async move {
111 let channel = self.get_channel_internal(channel_id, &*tx).await?;
112 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
113
114 let mut accept_invite_result = None;
115
116 if role.is_none() {
117 if let Some(invitation) = self
118 .pending_invite_for_channel(&channel, user_id, &*tx)
119 .await?
120 {
121 // note, this may be a parent channel
122 role = Some(invitation.role);
123 channel_member::Entity::update(channel_member::ActiveModel {
124 accepted: ActiveValue::Set(true),
125 ..invitation.into_active_model()
126 })
127 .exec(&*tx)
128 .await?;
129
130 accept_invite_result = Some(
131 self.calculate_membership_updated(&channel, user_id, &*tx)
132 .await?,
133 );
134
135 debug_assert!(
136 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
137 );
138 } else if channel.visibility == ChannelVisibility::Public {
139 role = Some(ChannelRole::Guest);
140 let channel_to_join = self
141 .public_ancestors_including_self(&channel, &*tx)
142 .await?
143 .first()
144 .cloned()
145 .unwrap_or(channel.clone());
146
147 channel_member::Entity::insert(channel_member::ActiveModel {
148 id: ActiveValue::NotSet,
149 channel_id: ActiveValue::Set(channel_to_join.id),
150 user_id: ActiveValue::Set(user_id),
151 accepted: ActiveValue::Set(true),
152 role: ActiveValue::Set(ChannelRole::Guest),
153 })
154 .exec(&*tx)
155 .await?;
156
157 accept_invite_result = Some(
158 self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
159 .await?,
160 );
161
162 debug_assert!(
163 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
164 );
165 }
166 }
167
168 if role.is_none() || role == Some(ChannelRole::Banned) {
169 Err(anyhow!("not allowed"))?
170 }
171 let role = role.unwrap();
172
173 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
174 let room_id = self
175 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
176 .await?;
177
178 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
179 .await
180 .map(|jr| (jr, accept_invite_result, role))
181 })
182 .await
183 }
184
185 /// Sets the visibiltity of the given channel.
186 pub async fn set_channel_visibility(
187 &self,
188 channel_id: ChannelId,
189 visibility: ChannelVisibility,
190 admin_id: UserId,
191 ) -> Result<SetChannelVisibilityResult> {
192 self.transaction(move |tx| async move {
193 let channel = self.get_channel_internal(channel_id, &*tx).await?;
194
195 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
196 .await?;
197
198 let previous_members = self
199 .get_channel_participant_details_internal(&channel, &*tx)
200 .await?;
201
202 let mut model = channel.into_active_model();
203 model.visibility = ActiveValue::Set(visibility);
204 let channel = model.update(&*tx).await?;
205
206 let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
207 .participants_to_notify_for_channel_change(&channel, &*tx)
208 .await?
209 .into_iter()
210 .collect();
211
212 let mut channels_to_remove: Vec<ChannelId> = vec![];
213 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
214 match visibility {
215 ChannelVisibility::Members => {
216 let all_descendents: Vec<ChannelId> = self
217 .get_channel_descendants_including_self(vec![channel_id], &*tx)
218 .await?
219 .into_iter()
220 .map(|channel| channel.id)
221 .collect();
222
223 channels_to_remove = channel::Entity::find()
224 .filter(
225 channel::Column::Id
226 .is_in(all_descendents)
227 .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
228 )
229 .all(&*tx)
230 .await?
231 .into_iter()
232 .map(|channel| channel.id)
233 .collect();
234
235 channels_to_remove.push(channel_id);
236
237 for member in previous_members {
238 if member.role.can_only_see_public_descendants() {
239 participants_to_remove.insert(member.user_id);
240 }
241 }
242 }
243 ChannelVisibility::Public => {
244 if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
245 let parent_updates = self
246 .participants_to_notify_for_channel_change(&public_parent, &*tx)
247 .await?;
248
249 for (user_id, channels) in parent_updates {
250 participants_to_update.insert(user_id, channels);
251 }
252 }
253 }
254 }
255
256 Ok(SetChannelVisibilityResult {
257 participants_to_update,
258 participants_to_remove,
259 channels_to_remove,
260 })
261 })
262 .await
263 }
264
265 #[cfg(test)]
266 pub async fn set_channel_requires_zed_cla(
267 &self,
268 channel_id: ChannelId,
269 requires_zed_cla: bool,
270 ) -> Result<()> {
271 self.transaction(move |tx| async move {
272 let channel = self.get_channel_internal(channel_id, &*tx).await?;
273 let mut model = channel.into_active_model();
274 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
275 model.update(&*tx).await?;
276 Ok(())
277 })
278 .await
279 }
280
281 /// Deletes the channel with the specified ID.
282 pub async fn delete_channel(
283 &self,
284 channel_id: ChannelId,
285 user_id: UserId,
286 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
287 self.transaction(move |tx| async move {
288 let channel = self.get_channel_internal(channel_id, &*tx).await?;
289 self.check_user_is_channel_admin(&channel, user_id, &*tx)
290 .await?;
291
292 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
293 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
294 .select_only()
295 .column(channel_member::Column::UserId)
296 .distinct()
297 .into_values::<_, QueryUserIds>()
298 .all(&*tx)
299 .await?;
300
301 let channels_to_remove = self
302 .get_channel_descendants_including_self(vec![channel.id], &*tx)
303 .await?
304 .into_iter()
305 .map(|channel| channel.id)
306 .collect::<Vec<_>>();
307
308 channel::Entity::delete_many()
309 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
310 .exec(&*tx)
311 .await?;
312
313 Ok((channels_to_remove, members_to_notify))
314 })
315 .await
316 }
317
318 /// Invites a user to a channel as a member.
319 pub async fn invite_channel_member(
320 &self,
321 channel_id: ChannelId,
322 invitee_id: UserId,
323 inviter_id: UserId,
324 role: ChannelRole,
325 ) -> Result<InviteMemberResult> {
326 self.transaction(move |tx| async move {
327 let channel = self.get_channel_internal(channel_id, &*tx).await?;
328 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
329 .await?;
330
331 channel_member::ActiveModel {
332 id: ActiveValue::NotSet,
333 channel_id: ActiveValue::Set(channel_id),
334 user_id: ActiveValue::Set(invitee_id),
335 accepted: ActiveValue::Set(false),
336 role: ActiveValue::Set(role),
337 }
338 .insert(&*tx)
339 .await?;
340
341 let channel = Channel::from_model(channel, role);
342
343 let notifications = self
344 .create_notification(
345 invitee_id,
346 rpc::Notification::ChannelInvitation {
347 channel_id: channel_id.to_proto(),
348 channel_name: channel.name.clone(),
349 inviter_id: inviter_id.to_proto(),
350 },
351 true,
352 &*tx,
353 )
354 .await?
355 .into_iter()
356 .collect();
357
358 Ok(InviteMemberResult {
359 channel,
360 notifications,
361 })
362 })
363 .await
364 }
365
366 fn sanitize_channel_name(name: &str) -> Result<&str> {
367 let new_name = name.trim().trim_start_matches('#');
368 if new_name == "" {
369 Err(anyhow!("channel name can't be blank"))?;
370 }
371 Ok(new_name)
372 }
373
374 /// Renames the specified channel.
375 pub async fn rename_channel(
376 &self,
377 channel_id: ChannelId,
378 admin_id: UserId,
379 new_name: &str,
380 ) -> Result<RenameChannelResult> {
381 self.transaction(move |tx| async move {
382 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
383
384 let channel = self.get_channel_internal(channel_id, &*tx).await?;
385 let role = self
386 .check_user_is_channel_admin(&channel, admin_id, &*tx)
387 .await?;
388
389 let mut model = channel.into_active_model();
390 model.name = ActiveValue::Set(new_name.clone());
391 let channel = model.update(&*tx).await?;
392
393 let participants = self
394 .get_channel_participant_details_internal(&channel, &*tx)
395 .await?;
396
397 Ok(RenameChannelResult {
398 channel: Channel::from_model(channel.clone(), role),
399 participants_to_update: participants
400 .iter()
401 .map(|participant| {
402 (
403 participant.user_id,
404 Channel::from_model(channel.clone(), participant.role),
405 )
406 })
407 .collect(),
408 })
409 })
410 .await
411 }
412
413 /// accept or decline an invite to join a channel
414 pub async fn respond_to_channel_invite(
415 &self,
416 channel_id: ChannelId,
417 user_id: UserId,
418 accept: bool,
419 ) -> Result<RespondToChannelInvite> {
420 self.transaction(move |tx| async move {
421 let channel = self.get_channel_internal(channel_id, &*tx).await?;
422
423 let membership_update = if accept {
424 let rows_affected = channel_member::Entity::update_many()
425 .set(channel_member::ActiveModel {
426 accepted: ActiveValue::Set(accept),
427 ..Default::default()
428 })
429 .filter(
430 channel_member::Column::ChannelId
431 .eq(channel_id)
432 .and(channel_member::Column::UserId.eq(user_id))
433 .and(channel_member::Column::Accepted.eq(false)),
434 )
435 .exec(&*tx)
436 .await?
437 .rows_affected;
438
439 if rows_affected == 0 {
440 Err(anyhow!("no such invitation"))?;
441 }
442
443 Some(
444 self.calculate_membership_updated(&channel, user_id, &*tx)
445 .await?,
446 )
447 } else {
448 let rows_affected = channel_member::Entity::delete_many()
449 .filter(
450 channel_member::Column::ChannelId
451 .eq(channel_id)
452 .and(channel_member::Column::UserId.eq(user_id))
453 .and(channel_member::Column::Accepted.eq(false)),
454 )
455 .exec(&*tx)
456 .await?
457 .rows_affected;
458 if rows_affected == 0 {
459 Err(anyhow!("no such invitation"))?;
460 }
461
462 None
463 };
464
465 Ok(RespondToChannelInvite {
466 membership_update,
467 notifications: self
468 .mark_notification_as_read_with_response(
469 user_id,
470 &rpc::Notification::ChannelInvitation {
471 channel_id: channel_id.to_proto(),
472 channel_name: Default::default(),
473 inviter_id: Default::default(),
474 },
475 accept,
476 &*tx,
477 )
478 .await?
479 .into_iter()
480 .collect(),
481 })
482 })
483 .await
484 }
485
486 async fn calculate_membership_updated(
487 &self,
488 channel: &channel::Model,
489 user_id: UserId,
490 tx: &DatabaseTransaction,
491 ) -> Result<MembershipUpdated> {
492 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
493 let removed_channels = self
494 .get_channel_descendants_including_self(vec![channel.id], &*tx)
495 .await?
496 .into_iter()
497 .filter_map(|channel| {
498 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
499 Some(channel.id)
500 } else {
501 None
502 }
503 })
504 .collect::<Vec<_>>();
505
506 Ok(MembershipUpdated {
507 channel_id: channel.id,
508 new_channels,
509 removed_channels,
510 })
511 }
512
513 /// Removes a channel member.
514 pub async fn remove_channel_member(
515 &self,
516 channel_id: ChannelId,
517 member_id: UserId,
518 admin_id: UserId,
519 ) -> Result<RemoveChannelMemberResult> {
520 self.transaction(|tx| async move {
521 let channel = self.get_channel_internal(channel_id, &*tx).await?;
522 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
523 .await?;
524
525 let result = channel_member::Entity::delete_many()
526 .filter(
527 channel_member::Column::ChannelId
528 .eq(channel_id)
529 .and(channel_member::Column::UserId.eq(member_id)),
530 )
531 .exec(&*tx)
532 .await?;
533
534 if result.rows_affected == 0 {
535 Err(anyhow!("no such member"))?;
536 }
537
538 Ok(RemoveChannelMemberResult {
539 membership_update: self
540 .calculate_membership_updated(&channel, member_id, &*tx)
541 .await?,
542 notification_id: self
543 .remove_notification(
544 member_id,
545 rpc::Notification::ChannelInvitation {
546 channel_id: channel_id.to_proto(),
547 channel_name: Default::default(),
548 inviter_id: Default::default(),
549 },
550 &*tx,
551 )
552 .await?,
553 })
554 })
555 .await
556 }
557
558 /// Returns all channel invites for the user with the given ID.
559 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
560 self.transaction(|tx| async move {
561 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
562
563 let channel_invites = channel_member::Entity::find()
564 .filter(
565 channel_member::Column::UserId
566 .eq(user_id)
567 .and(channel_member::Column::Accepted.eq(false)),
568 )
569 .all(&*tx)
570 .await?;
571
572 for invite in channel_invites {
573 role_for_channel.insert(invite.channel_id, invite.role);
574 }
575
576 let channels = channel::Entity::find()
577 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
578 .all(&*tx)
579 .await?;
580
581 let channels = channels
582 .into_iter()
583 .filter_map(|channel| {
584 let role = *role_for_channel.get(&channel.id)?;
585 Some(Channel::from_model(channel, role))
586 })
587 .collect();
588
589 Ok(channels)
590 })
591 .await
592 }
593
594 /// Returns all channels for the user with the given ID.
595 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
596 self.transaction(|tx| async move {
597 let tx = tx;
598
599 self.get_user_channels(user_id, None, &tx).await
600 })
601 .await
602 }
603
604 /// Returns all channels for the user with the given ID that are descendants
605 /// of the specified ancestor channel.
606 pub async fn get_user_channels(
607 &self,
608 user_id: UserId,
609 ancestor_channel: Option<&channel::Model>,
610 tx: &DatabaseTransaction,
611 ) -> Result<ChannelsForUser> {
612 let channel_memberships = channel_member::Entity::find()
613 .filter(
614 channel_member::Column::UserId
615 .eq(user_id)
616 .and(channel_member::Column::Accepted.eq(true)),
617 )
618 .all(&*tx)
619 .await?;
620
621 let descendants = self
622 .get_channel_descendants_including_self(
623 channel_memberships.iter().map(|m| m.channel_id),
624 &*tx,
625 )
626 .await?;
627
628 let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
629 for membership in channel_memberships.iter() {
630 roles_by_channel_id.insert(membership.channel_id, membership.role);
631 }
632
633 let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
634
635 let channels: Vec<Channel> = descendants
636 .into_iter()
637 .filter_map(|channel| {
638 let parent_role = channel
639 .parent_id()
640 .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
641
642 let role = if let Some(parent_role) = parent_role {
643 let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
644 existing_role.max(*parent_role)
645 } else {
646 *parent_role
647 };
648 roles_by_channel_id.insert(channel.id, role);
649 role
650 } else {
651 *roles_by_channel_id.get(&channel.id)?
652 };
653
654 let can_see_parent_paths = role.can_see_all_descendants()
655 || role.can_only_see_public_descendants()
656 && channel.visibility == ChannelVisibility::Public;
657 if !can_see_parent_paths {
658 return None;
659 }
660
661 visible_channel_ids.insert(channel.id);
662
663 if let Some(ancestor) = ancestor_channel {
664 if !channel
665 .ancestors_including_self()
666 .any(|id| id == ancestor.id)
667 {
668 return None;
669 }
670 }
671
672 let mut channel = Channel::from_model(channel, role);
673 channel
674 .parent_path
675 .retain(|id| visible_channel_ids.contains(&id));
676
677 Some(channel)
678 })
679 .collect();
680
681 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
682 enum QueryUserIdsAndChannelIds {
683 ChannelId,
684 UserId,
685 }
686
687 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
688 {
689 let mut rows = room_participant::Entity::find()
690 .inner_join(room::Entity)
691 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
692 .select_only()
693 .column(room::Column::ChannelId)
694 .column(room_participant::Column::UserId)
695 .into_values::<_, QueryUserIdsAndChannelIds>()
696 .stream(&*tx)
697 .await?;
698 while let Some(row) = rows.next().await {
699 let row: (ChannelId, UserId) = row?;
700 channel_participants.entry(row.0).or_default().push(row.1)
701 }
702 }
703
704 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
705 let channel_buffer_changes = self
706 .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
707 .await?;
708
709 let unseen_messages = self
710 .unseen_channel_messages(user_id, &channel_ids, &*tx)
711 .await?;
712
713 Ok(ChannelsForUser {
714 channels,
715 channel_participants,
716 unseen_buffer_changes: channel_buffer_changes,
717 channel_messages: unseen_messages,
718 })
719 }
720
721 async fn participants_to_notify_for_channel_change(
722 &self,
723 new_parent: &channel::Model,
724 tx: &DatabaseTransaction,
725 ) -> Result<Vec<(UserId, ChannelsForUser)>> {
726 let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
727
728 let members = self
729 .get_channel_participant_details_internal(new_parent, &*tx)
730 .await?;
731
732 for member in members.iter() {
733 if !member.role.can_see_all_descendants() {
734 continue;
735 }
736 results.push((
737 member.user_id,
738 self.get_user_channels(member.user_id, Some(new_parent), &*tx)
739 .await?,
740 ))
741 }
742
743 let public_parents = self
744 .public_ancestors_including_self(new_parent, &*tx)
745 .await?;
746 let public_parent = public_parents.last();
747
748 let Some(public_parent) = public_parent else {
749 return Ok(results);
750 };
751
752 // could save some time in the common case by skipping this if the
753 // new channel is not public and has no public descendants.
754 let public_members = if public_parent == new_parent {
755 members
756 } else {
757 self.get_channel_participant_details_internal(public_parent, &*tx)
758 .await?
759 };
760
761 for member in public_members {
762 if !member.role.can_only_see_public_descendants() {
763 continue;
764 };
765 results.push((
766 member.user_id,
767 self.get_user_channels(member.user_id, Some(public_parent), &*tx)
768 .await?,
769 ))
770 }
771
772 Ok(results)
773 }
774
775 /// Sets the role for the specified channel member.
776 pub async fn set_channel_member_role(
777 &self,
778 channel_id: ChannelId,
779 admin_id: UserId,
780 for_user: UserId,
781 role: ChannelRole,
782 ) -> Result<SetMemberRoleResult> {
783 self.transaction(|tx| async move {
784 let channel = self.get_channel_internal(channel_id, &*tx).await?;
785 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
786 .await?;
787
788 let membership = channel_member::Entity::find()
789 .filter(
790 channel_member::Column::ChannelId
791 .eq(channel_id)
792 .and(channel_member::Column::UserId.eq(for_user)),
793 )
794 .one(&*tx)
795 .await?;
796
797 let Some(membership) = membership else {
798 Err(anyhow!("no such member"))?
799 };
800
801 let mut update = membership.into_active_model();
802 update.role = ActiveValue::Set(role);
803 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
804
805 if updated.accepted {
806 Ok(SetMemberRoleResult::MembershipUpdated(
807 self.calculate_membership_updated(&channel, for_user, &*tx)
808 .await?,
809 ))
810 } else {
811 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
812 channel, role,
813 )))
814 }
815 })
816 .await
817 }
818
819 /// Returns the details for the specified channel member.
820 pub async fn get_channel_participant_details(
821 &self,
822 channel_id: ChannelId,
823 user_id: UserId,
824 ) -> Result<Vec<proto::ChannelMember>> {
825 let (role, members) = self
826 .transaction(move |tx| async move {
827 let channel = self.get_channel_internal(channel_id, &*tx).await?;
828 let role = self
829 .check_user_is_channel_participant(&channel, user_id, &*tx)
830 .await?;
831 Ok((
832 role,
833 self.get_channel_participant_details_internal(&channel, &*tx)
834 .await?,
835 ))
836 })
837 .await?;
838
839 if role == ChannelRole::Admin {
840 Ok(members
841 .into_iter()
842 .map(|channel_member| channel_member.to_proto())
843 .collect())
844 } else {
845 return Ok(members
846 .into_iter()
847 .filter_map(|member| {
848 if member.kind == proto::channel_member::Kind::Invitee {
849 return None;
850 }
851 Some(ChannelMember {
852 role: member.role,
853 user_id: member.user_id,
854 kind: proto::channel_member::Kind::Member,
855 })
856 })
857 .map(|channel_member| channel_member.to_proto())
858 .collect());
859 }
860 }
861
862 async fn get_channel_participant_details_internal(
863 &self,
864 channel: &channel::Model,
865 tx: &DatabaseTransaction,
866 ) -> Result<Vec<ChannelMember>> {
867 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
868 enum QueryMemberDetails {
869 UserId,
870 Role,
871 IsDirectMember,
872 Accepted,
873 Visibility,
874 }
875
876 let mut stream = channel_member::Entity::find()
877 .left_join(channel::Entity)
878 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
879 .select_only()
880 .column(channel_member::Column::UserId)
881 .column(channel_member::Column::Role)
882 .column_as(
883 channel_member::Column::ChannelId.eq(channel.id),
884 QueryMemberDetails::IsDirectMember,
885 )
886 .column(channel_member::Column::Accepted)
887 .column(channel::Column::Visibility)
888 .into_values::<_, QueryMemberDetails>()
889 .stream(&*tx)
890 .await?;
891
892 let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
893
894 while let Some(user_membership) = stream.next().await {
895 let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
896 UserId,
897 ChannelRole,
898 bool,
899 bool,
900 ChannelVisibility,
901 ) = user_membership?;
902 let kind = match (is_direct_member, is_invite_accepted) {
903 (true, true) => proto::channel_member::Kind::Member,
904 (true, false) => proto::channel_member::Kind::Invitee,
905 (false, true) => proto::channel_member::Kind::AncestorMember,
906 (false, false) => continue,
907 };
908
909 if channel_role == ChannelRole::Guest
910 && visibility != ChannelVisibility::Public
911 && channel.visibility != ChannelVisibility::Public
912 {
913 continue;
914 }
915
916 if let Some(details_mut) = user_details.get_mut(&user_id) {
917 if channel_role.should_override(details_mut.role) {
918 details_mut.role = channel_role;
919 }
920 if kind == Kind::Member {
921 details_mut.kind = kind;
922 // the UI is going to be a bit confusing if you already have permissions
923 // that are greater than or equal to the ones you're being invited to.
924 } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
925 details_mut.kind = kind;
926 }
927 } else {
928 user_details.insert(
929 user_id,
930 ChannelMember {
931 user_id,
932 kind,
933 role: channel_role,
934 },
935 );
936 }
937 }
938
939 Ok(user_details
940 .into_iter()
941 .map(|(_, details)| details)
942 .collect())
943 }
944
945 /// Returns the participants in the given channel.
946 pub async fn get_channel_participants(
947 &self,
948 channel: &channel::Model,
949 tx: &DatabaseTransaction,
950 ) -> Result<Vec<UserId>> {
951 let participants = self
952 .get_channel_participant_details_internal(channel, &*tx)
953 .await?;
954 Ok(participants
955 .into_iter()
956 .map(|member| member.user_id)
957 .collect())
958 }
959
960 /// Returns whether the given user is an admin in the specified channel.
961 pub async fn check_user_is_channel_admin(
962 &self,
963 channel: &channel::Model,
964 user_id: UserId,
965 tx: &DatabaseTransaction,
966 ) -> Result<ChannelRole> {
967 let role = self.channel_role_for_user(channel, user_id, tx).await?;
968 match role {
969 Some(ChannelRole::Admin) => Ok(role.unwrap()),
970 Some(ChannelRole::Member)
971 | Some(ChannelRole::Banned)
972 | Some(ChannelRole::Guest)
973 | None => Err(anyhow!(
974 "user is not a channel admin or channel does not exist"
975 ))?,
976 }
977 }
978
979 /// Returns whether the given user is a member of the specified channel.
980 pub async fn check_user_is_channel_member(
981 &self,
982 channel: &channel::Model,
983 user_id: UserId,
984 tx: &DatabaseTransaction,
985 ) -> Result<ChannelRole> {
986 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
987 match channel_role {
988 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
989 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
990 "user is not a channel member or channel does not exist"
991 ))?,
992 }
993 }
994
995 /// Returns whether the given user is a participant in the specified channel.
996 pub async fn check_user_is_channel_participant(
997 &self,
998 channel: &channel::Model,
999 user_id: UserId,
1000 tx: &DatabaseTransaction,
1001 ) -> Result<ChannelRole> {
1002 let role = self.channel_role_for_user(channel, user_id, tx).await?;
1003 match role {
1004 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
1005 Ok(role.unwrap())
1006 }
1007 Some(ChannelRole::Banned) | None => Err(anyhow!(
1008 "user is not a channel participant or channel does not exist"
1009 ))?,
1010 }
1011 }
1012
1013 /// Returns a user's pending invite for the given channel, if one exists.
1014 pub async fn pending_invite_for_channel(
1015 &self,
1016 channel: &channel::Model,
1017 user_id: UserId,
1018 tx: &DatabaseTransaction,
1019 ) -> Result<Option<channel_member::Model>> {
1020 let row = channel_member::Entity::find()
1021 .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
1022 .filter(channel_member::Column::UserId.eq(user_id))
1023 .filter(channel_member::Column::Accepted.eq(false))
1024 .one(&*tx)
1025 .await?;
1026
1027 Ok(row)
1028 }
1029
1030 async fn public_parent_channel(
1031 &self,
1032 channel: &channel::Model,
1033 tx: &DatabaseTransaction,
1034 ) -> Result<Option<channel::Model>> {
1035 let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1036 if path.last().unwrap().id == channel.id {
1037 path.pop();
1038 }
1039 Ok(path.pop())
1040 }
1041
1042 pub(crate) async fn public_ancestors_including_self(
1043 &self,
1044 channel: &channel::Model,
1045 tx: &DatabaseTransaction,
1046 ) -> Result<Vec<channel::Model>> {
1047 let visible_channels = channel::Entity::find()
1048 .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1049 .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1050 .order_by_asc(channel::Column::ParentPath)
1051 .all(&*tx)
1052 .await?;
1053
1054 Ok(visible_channels)
1055 }
1056
1057 /// Returns the role for a user in the given channel.
1058 pub async fn channel_role_for_user(
1059 &self,
1060 channel: &channel::Model,
1061 user_id: UserId,
1062 tx: &DatabaseTransaction,
1063 ) -> Result<Option<ChannelRole>> {
1064 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1065 enum QueryChannelMembership {
1066 ChannelId,
1067 Role,
1068 Visibility,
1069 }
1070
1071 let mut rows = channel_member::Entity::find()
1072 .left_join(channel::Entity)
1073 .filter(
1074 channel_member::Column::ChannelId
1075 .is_in(channel.ancestors_including_self())
1076 .and(channel_member::Column::UserId.eq(user_id))
1077 .and(channel_member::Column::Accepted.eq(true)),
1078 )
1079 .select_only()
1080 .column(channel_member::Column::ChannelId)
1081 .column(channel_member::Column::Role)
1082 .column(channel::Column::Visibility)
1083 .into_values::<_, QueryChannelMembership>()
1084 .stream(&*tx)
1085 .await?;
1086
1087 let mut user_role: Option<ChannelRole> = None;
1088
1089 let mut is_participant = false;
1090 let mut current_channel_visibility = None;
1091
1092 // note these channels are not iterated in any particular order,
1093 // our current logic takes the highest permission available.
1094 while let Some(row) = rows.next().await {
1095 let (membership_channel, role, visibility): (
1096 ChannelId,
1097 ChannelRole,
1098 ChannelVisibility,
1099 ) = row?;
1100
1101 match role {
1102 ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1103 if let Some(users_role) = user_role {
1104 user_role = Some(users_role.max(role));
1105 } else {
1106 user_role = Some(role)
1107 }
1108 }
1109 ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1110 is_participant = true
1111 }
1112 ChannelRole::Guest => {}
1113 }
1114 if channel.id == membership_channel {
1115 current_channel_visibility = Some(visibility);
1116 }
1117 }
1118 // free up database connection
1119 drop(rows);
1120
1121 if is_participant && user_role.is_none() {
1122 if current_channel_visibility.is_none() {
1123 current_channel_visibility = channel::Entity::find()
1124 .filter(channel::Column::Id.eq(channel.id))
1125 .one(&*tx)
1126 .await?
1127 .map(|channel| channel.visibility);
1128 }
1129 if current_channel_visibility == Some(ChannelVisibility::Public) {
1130 user_role = Some(ChannelRole::Guest);
1131 }
1132 }
1133
1134 Ok(user_role)
1135 }
1136
1137 // Get the descendants of the given set if channels, ordered by their
1138 // path.
1139 async fn get_channel_descendants_including_self(
1140 &self,
1141 channel_ids: impl IntoIterator<Item = ChannelId>,
1142 tx: &DatabaseTransaction,
1143 ) -> Result<Vec<channel::Model>> {
1144 let mut values = String::new();
1145 for id in channel_ids {
1146 if !values.is_empty() {
1147 values.push_str(", ");
1148 }
1149 write!(&mut values, "({})", id).unwrap();
1150 }
1151
1152 if values.is_empty() {
1153 return Ok(vec![]);
1154 }
1155
1156 let sql = format!(
1157 r#"
1158 SELECT DISTINCT
1159 descendant_channels.*,
1160 descendant_channels.parent_path || descendant_channels.id as full_path
1161 FROM
1162 channels parent_channels, channels descendant_channels
1163 WHERE
1164 descendant_channels.id IN ({values}) OR
1165 (
1166 parent_channels.id IN ({values}) AND
1167 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1168 )
1169 ORDER BY
1170 full_path ASC
1171 "#
1172 );
1173
1174 Ok(channel::Entity::find()
1175 .from_raw_sql(Statement::from_string(
1176 self.pool.get_database_backend(),
1177 sql,
1178 ))
1179 .all(tx)
1180 .await?)
1181 }
1182
1183 /// Returns the channel with the given ID.
1184 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1185 self.transaction(|tx| async move {
1186 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1187 let role = self
1188 .check_user_is_channel_participant(&channel, user_id, &*tx)
1189 .await?;
1190
1191 Ok(Channel::from_model(channel, role))
1192 })
1193 .await
1194 }
1195
1196 pub(crate) async fn get_channel_internal(
1197 &self,
1198 channel_id: ChannelId,
1199 tx: &DatabaseTransaction,
1200 ) -> Result<channel::Model> {
1201 Ok(channel::Entity::find_by_id(channel_id)
1202 .one(&*tx)
1203 .await?
1204 .ok_or_else(|| anyhow!("no such channel"))?)
1205 }
1206
1207 pub(crate) async fn get_or_create_channel_room(
1208 &self,
1209 channel_id: ChannelId,
1210 live_kit_room: &str,
1211 environment: &str,
1212 tx: &DatabaseTransaction,
1213 ) -> Result<RoomId> {
1214 let room = room::Entity::find()
1215 .filter(room::Column::ChannelId.eq(channel_id))
1216 .one(&*tx)
1217 .await?;
1218
1219 let room_id = if let Some(room) = room {
1220 if let Some(env) = room.environment {
1221 if &env != environment {
1222 Err(anyhow!("must join using the {} release", env))?;
1223 }
1224 }
1225 room.id
1226 } else {
1227 let result = room::Entity::insert(room::ActiveModel {
1228 channel_id: ActiveValue::Set(Some(channel_id)),
1229 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1230 environment: ActiveValue::Set(Some(environment.to_string())),
1231 ..Default::default()
1232 })
1233 .exec(&*tx)
1234 .await?;
1235
1236 result.last_insert_id
1237 };
1238
1239 Ok(room_id)
1240 }
1241
1242 /// Move a channel from one parent to another
1243 pub async fn move_channel(
1244 &self,
1245 channel_id: ChannelId,
1246 new_parent_id: Option<ChannelId>,
1247 admin_id: UserId,
1248 ) -> Result<Option<MoveChannelResult>> {
1249 self.transaction(|tx| async move {
1250 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1251 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1252 .await?;
1253
1254 let new_parent_path;
1255 let new_parent_channel;
1256 if let Some(new_parent_id) = new_parent_id {
1257 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1258 self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1259 .await?;
1260
1261 if new_parent
1262 .ancestors_including_self()
1263 .any(|id| id == channel.id)
1264 {
1265 Err(anyhow!("cannot move a channel into one of its descendants"))?;
1266 }
1267
1268 new_parent_path = new_parent.path();
1269 new_parent_channel = Some(new_parent);
1270 } else {
1271 new_parent_path = String::new();
1272 new_parent_channel = None;
1273 };
1274
1275 let previous_participants = self
1276 .get_channel_participant_details_internal(&channel, &*tx)
1277 .await?;
1278
1279 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1280 let new_path = format!("{}{}/", new_parent_path, channel.id);
1281
1282 if old_path == new_path {
1283 return Ok(None);
1284 }
1285
1286 let mut model = channel.into_active_model();
1287 model.parent_path = ActiveValue::Set(new_parent_path);
1288 let channel = model.update(&*tx).await?;
1289
1290 if new_parent_channel.is_none() {
1291 channel_member::ActiveModel {
1292 id: ActiveValue::NotSet,
1293 channel_id: ActiveValue::Set(channel_id),
1294 user_id: ActiveValue::Set(admin_id),
1295 accepted: ActiveValue::Set(true),
1296 role: ActiveValue::Set(ChannelRole::Admin),
1297 }
1298 .insert(&*tx)
1299 .await?;
1300 }
1301
1302 let descendent_ids =
1303 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1304 self.pool.get_database_backend(),
1305 "
1306 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1307 WHERE parent_path LIKE $3 || '%'
1308 RETURNING id
1309 ",
1310 [old_path.clone().into(), new_path.into(), old_path.into()],
1311 ))
1312 .all(&*tx)
1313 .await?;
1314
1315 let participants_to_update: HashMap<_, _> = self
1316 .participants_to_notify_for_channel_change(
1317 new_parent_channel.as_ref().unwrap_or(&channel),
1318 &*tx,
1319 )
1320 .await?
1321 .into_iter()
1322 .collect();
1323
1324 let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1325 for id in descendent_ids {
1326 moved_channels.insert(id);
1327 }
1328 moved_channels.insert(channel_id);
1329
1330 let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1331 for participant in previous_participants {
1332 if participant.kind == proto::channel_member::Kind::AncestorMember {
1333 if !participants_to_update.contains_key(&participant.user_id) {
1334 participants_to_remove.insert(participant.user_id);
1335 }
1336 }
1337 }
1338
1339 Ok(Some(MoveChannelResult {
1340 participants_to_remove,
1341 participants_to_update,
1342 moved_channels,
1343 }))
1344 })
1345 .await
1346 }
1347}
1348
1349#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1350enum QueryIds {
1351 Id,
1352}
1353
1354#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1355enum QueryUserIds {
1356 UserId,
1357}