1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 /// Adds a user to the specified channel.
101 pub async fn join_channel(
102 &self,
103 channel_id: ChannelId,
104 user_id: UserId,
105 connection: ConnectionId,
106 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
107 self.transaction(move |tx| async move {
108 let channel = self.get_channel_internal(channel_id, &tx).await?;
109 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
110
111 let mut accept_invite_result = None;
112
113 if role.is_none() {
114 if let Some(invitation) = self
115 .pending_invite_for_channel(&channel, user_id, &tx)
116 .await?
117 {
118 // note, this may be a parent channel
119 role = Some(invitation.role);
120 channel_member::Entity::update(channel_member::ActiveModel {
121 accepted: ActiveValue::Set(true),
122 ..invitation.into_active_model()
123 })
124 .exec(&*tx)
125 .await?;
126
127 accept_invite_result = Some(
128 self.calculate_membership_updated(&channel, user_id, &tx)
129 .await?,
130 );
131
132 debug_assert!(
133 self.channel_role_for_user(&channel, user_id, &tx).await? == role
134 );
135 } else if channel.visibility == ChannelVisibility::Public {
136 role = Some(ChannelRole::Guest);
137 channel_member::Entity::insert(channel_member::ActiveModel {
138 id: ActiveValue::NotSet,
139 channel_id: ActiveValue::Set(channel.root_id()),
140 user_id: ActiveValue::Set(user_id),
141 accepted: ActiveValue::Set(true),
142 role: ActiveValue::Set(ChannelRole::Guest),
143 })
144 .exec(&*tx)
145 .await?;
146
147 accept_invite_result = Some(
148 self.calculate_membership_updated(&channel, user_id, &tx)
149 .await?,
150 );
151
152 debug_assert!(
153 self.channel_role_for_user(&channel, user_id, &tx).await? == role
154 );
155 }
156 }
157
158 if role.is_none() || role == Some(ChannelRole::Banned) {
159 Err(ErrorCode::Forbidden.anyhow())?
160 }
161 let role = role.unwrap();
162
163 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
164 let room_id = self
165 .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
166 .await?;
167
168 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
169 .await
170 .map(|jr| (jr, accept_invite_result, role))
171 })
172 .await
173 }
174
175 /// Sets the visibility of the given channel.
176 pub async fn set_channel_visibility(
177 &self,
178 channel_id: ChannelId,
179 visibility: ChannelVisibility,
180 admin_id: UserId,
181 ) -> Result<(Channel, Vec<channel_member::Model>)> {
182 self.transaction(move |tx| async move {
183 let channel = self.get_channel_internal(channel_id, &tx).await?;
184 self.check_user_is_channel_admin(&channel, admin_id, &tx)
185 .await?;
186
187 if visibility == ChannelVisibility::Public {
188 if let Some(parent_id) = channel.parent_id() {
189 let parent = self.get_channel_internal(parent_id, &tx).await?;
190
191 if parent.visibility != ChannelVisibility::Public {
192 Err(ErrorCode::BadPublicNesting
193 .with_tag("direction", "parent")
194 .anyhow())?;
195 }
196 }
197 } else if visibility == ChannelVisibility::Members {
198 if self
199 .get_channel_descendants_excluding_self([&channel], &tx)
200 .await?
201 .into_iter()
202 .any(|channel| channel.visibility == ChannelVisibility::Public)
203 {
204 Err(ErrorCode::BadPublicNesting
205 .with_tag("direction", "children")
206 .anyhow())?;
207 }
208 }
209
210 let mut model = channel.into_active_model();
211 model.visibility = ActiveValue::Set(visibility);
212 let channel = model.update(&*tx).await?;
213
214 let channel_members = channel_member::Entity::find()
215 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
216 .all(&*tx)
217 .await?;
218
219 Ok((Channel::from_model(channel), channel_members))
220 })
221 .await
222 }
223
224 #[cfg(test)]
225 pub async fn set_channel_requires_zed_cla(
226 &self,
227 channel_id: ChannelId,
228 requires_zed_cla: bool,
229 ) -> Result<()> {
230 self.transaction(move |tx| async move {
231 let channel = self.get_channel_internal(channel_id, &tx).await?;
232 let mut model = channel.into_active_model();
233 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
234 model.update(&*tx).await?;
235 Ok(())
236 })
237 .await
238 }
239
240 /// Deletes the channel with the specified ID.
241 pub async fn delete_channel(
242 &self,
243 channel_id: ChannelId,
244 user_id: UserId,
245 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
246 self.transaction(move |tx| async move {
247 let channel = self.get_channel_internal(channel_id, &tx).await?;
248 self.check_user_is_channel_admin(&channel, user_id, &tx)
249 .await?;
250
251 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
252 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
253 .select_only()
254 .column(channel_member::Column::UserId)
255 .distinct()
256 .into_values::<_, QueryUserIds>()
257 .all(&*tx)
258 .await?;
259
260 let channels_to_remove = self
261 .get_channel_descendants_excluding_self([&channel], &tx)
262 .await?
263 .into_iter()
264 .map(|channel| channel.id)
265 .chain(Some(channel_id))
266 .collect::<Vec<_>>();
267
268 channel::Entity::delete_many()
269 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
270 .exec(&*tx)
271 .await?;
272
273 Ok((channels_to_remove, members_to_notify))
274 })
275 .await
276 }
277
278 /// Invites a user to a channel as a member.
279 pub async fn invite_channel_member(
280 &self,
281 channel_id: ChannelId,
282 invitee_id: UserId,
283 inviter_id: UserId,
284 role: ChannelRole,
285 ) -> Result<InviteMemberResult> {
286 self.transaction(move |tx| async move {
287 let channel = self.get_channel_internal(channel_id, &tx).await?;
288 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
289 .await?;
290 if !channel.is_root() {
291 Err(ErrorCode::NotARootChannel.anyhow())?
292 }
293
294 channel_member::ActiveModel {
295 id: ActiveValue::NotSet,
296 channel_id: ActiveValue::Set(channel_id),
297 user_id: ActiveValue::Set(invitee_id),
298 accepted: ActiveValue::Set(false),
299 role: ActiveValue::Set(role),
300 }
301 .insert(&*tx)
302 .await?;
303
304 let channel = Channel::from_model(channel);
305
306 let notifications = self
307 .create_notification(
308 invitee_id,
309 rpc::Notification::ChannelInvitation {
310 channel_id: channel_id.to_proto(),
311 channel_name: channel.name.clone(),
312 inviter_id: inviter_id.to_proto(),
313 },
314 true,
315 &tx,
316 )
317 .await?
318 .into_iter()
319 .collect();
320
321 Ok(InviteMemberResult {
322 channel,
323 notifications,
324 })
325 })
326 .await
327 }
328
329 fn sanitize_channel_name(name: &str) -> Result<&str> {
330 let new_name = name.trim().trim_start_matches('#');
331 if new_name == "" {
332 Err(anyhow!("channel name can't be blank"))?;
333 }
334 Ok(new_name)
335 }
336
337 /// Renames the specified channel.
338 pub async fn rename_channel(
339 &self,
340 channel_id: ChannelId,
341 admin_id: UserId,
342 new_name: &str,
343 ) -> Result<(Channel, Vec<channel_member::Model>)> {
344 self.transaction(move |tx| async move {
345 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
346
347 let channel = self.get_channel_internal(channel_id, &tx).await?;
348 self.check_user_is_channel_admin(&channel, admin_id, &tx)
349 .await?;
350
351 let mut model = channel.into_active_model();
352 model.name = ActiveValue::Set(new_name.clone());
353 let channel = model.update(&*tx).await?;
354
355 let channel_members = channel_member::Entity::find()
356 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
357 .all(&*tx)
358 .await?;
359
360 Ok((Channel::from_model(channel), channel_members))
361 })
362 .await
363 }
364
365 /// accept or decline an invite to join a channel
366 pub async fn respond_to_channel_invite(
367 &self,
368 channel_id: ChannelId,
369 user_id: UserId,
370 accept: bool,
371 ) -> Result<RespondToChannelInvite> {
372 self.transaction(move |tx| async move {
373 let channel = self.get_channel_internal(channel_id, &tx).await?;
374
375 let membership_update = if accept {
376 let rows_affected = channel_member::Entity::update_many()
377 .set(channel_member::ActiveModel {
378 accepted: ActiveValue::Set(accept),
379 ..Default::default()
380 })
381 .filter(
382 channel_member::Column::ChannelId
383 .eq(channel_id)
384 .and(channel_member::Column::UserId.eq(user_id))
385 .and(channel_member::Column::Accepted.eq(false)),
386 )
387 .exec(&*tx)
388 .await?
389 .rows_affected;
390
391 if rows_affected == 0 {
392 Err(anyhow!("no such invitation"))?;
393 }
394
395 Some(
396 self.calculate_membership_updated(&channel, user_id, &tx)
397 .await?,
398 )
399 } else {
400 let rows_affected = channel_member::Entity::delete_many()
401 .filter(
402 channel_member::Column::ChannelId
403 .eq(channel_id)
404 .and(channel_member::Column::UserId.eq(user_id))
405 .and(channel_member::Column::Accepted.eq(false)),
406 )
407 .exec(&*tx)
408 .await?
409 .rows_affected;
410 if rows_affected == 0 {
411 Err(anyhow!("no such invitation"))?;
412 }
413
414 None
415 };
416
417 Ok(RespondToChannelInvite {
418 membership_update,
419 notifications: self
420 .mark_notification_as_read_with_response(
421 user_id,
422 &rpc::Notification::ChannelInvitation {
423 channel_id: channel_id.to_proto(),
424 channel_name: Default::default(),
425 inviter_id: Default::default(),
426 },
427 accept,
428 &tx,
429 )
430 .await?
431 .into_iter()
432 .collect(),
433 })
434 })
435 .await
436 }
437
438 async fn calculate_membership_updated(
439 &self,
440 channel: &channel::Model,
441 user_id: UserId,
442 tx: &DatabaseTransaction,
443 ) -> Result<MembershipUpdated> {
444 let new_channels = self.get_user_channels(user_id, Some(channel), tx).await?;
445 let removed_channels = self
446 .get_channel_descendants_excluding_self([channel], tx)
447 .await?
448 .into_iter()
449 .map(|channel| channel.id)
450 .chain([channel.id])
451 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
452 .collect::<Vec<_>>();
453
454 Ok(MembershipUpdated {
455 channel_id: channel.id,
456 new_channels,
457 removed_channels,
458 })
459 }
460
461 /// Removes a channel member.
462 pub async fn remove_channel_member(
463 &self,
464 channel_id: ChannelId,
465 member_id: UserId,
466 admin_id: UserId,
467 ) -> Result<RemoveChannelMemberResult> {
468 self.transaction(|tx| async move {
469 let channel = self.get_channel_internal(channel_id, &tx).await?;
470
471 if member_id != admin_id {
472 self.check_user_is_channel_admin(&channel, admin_id, &tx)
473 .await?;
474 }
475
476 let result = channel_member::Entity::delete_many()
477 .filter(
478 channel_member::Column::ChannelId
479 .eq(channel_id)
480 .and(channel_member::Column::UserId.eq(member_id)),
481 )
482 .exec(&*tx)
483 .await?;
484
485 if result.rows_affected == 0 {
486 Err(anyhow!("no such member"))?;
487 }
488
489 Ok(RemoveChannelMemberResult {
490 membership_update: self
491 .calculate_membership_updated(&channel, member_id, &tx)
492 .await?,
493 notification_id: self
494 .remove_notification(
495 member_id,
496 rpc::Notification::ChannelInvitation {
497 channel_id: channel_id.to_proto(),
498 channel_name: Default::default(),
499 inviter_id: Default::default(),
500 },
501 &tx,
502 )
503 .await?,
504 })
505 })
506 .await
507 }
508
509 /// Returns all channel invites for the user with the given ID.
510 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
511 self.transaction(|tx| async move {
512 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
513
514 let channel_invites = channel_member::Entity::find()
515 .filter(
516 channel_member::Column::UserId
517 .eq(user_id)
518 .and(channel_member::Column::Accepted.eq(false)),
519 )
520 .all(&*tx)
521 .await?;
522
523 for invite in channel_invites {
524 role_for_channel.insert(invite.channel_id, invite.role);
525 }
526
527 let channels = channel::Entity::find()
528 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
529 .all(&*tx)
530 .await?;
531
532 let channels = channels.into_iter().map(Channel::from_model).collect();
533
534 Ok(channels)
535 })
536 .await
537 }
538
539 /// Returns all channels for the user with the given ID.
540 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
541 self.transaction(|tx| async move {
542 let tx = tx;
543
544 self.get_user_channels(user_id, None, &tx).await
545 })
546 .await
547 }
548
549 /// Returns all channels for the user with the given ID that are descendants
550 /// of the specified ancestor channel.
551 pub async fn get_user_channels(
552 &self,
553 user_id: UserId,
554 ancestor_channel: Option<&channel::Model>,
555 tx: &DatabaseTransaction,
556 ) -> Result<ChannelsForUser> {
557 let mut filter = channel_member::Column::UserId
558 .eq(user_id)
559 .and(channel_member::Column::Accepted.eq(true));
560
561 if let Some(ancestor) = ancestor_channel {
562 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
563 }
564
565 let channel_memberships = channel_member::Entity::find()
566 .filter(filter)
567 .all(tx)
568 .await?;
569
570 let channels = channel::Entity::find()
571 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
572 .all(tx)
573 .await?;
574
575 let mut descendants = self
576 .get_channel_descendants_excluding_self(channels.iter(), tx)
577 .await?;
578
579 for channel in channels {
580 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
581 descendants.insert(ix, channel);
582 }
583 }
584
585 let roles_by_channel_id = channel_memberships
586 .iter()
587 .map(|membership| (membership.channel_id, membership.role))
588 .collect::<HashMap<_, _>>();
589
590 let channels: Vec<Channel> = descendants
591 .into_iter()
592 .filter_map(|channel| {
593 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
594 if parent_role.can_see_channel(channel.visibility) {
595 Some(Channel::from_model(channel))
596 } else {
597 None
598 }
599 })
600 .collect();
601
602 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
603 enum QueryUserIdsAndChannelIds {
604 ChannelId,
605 UserId,
606 }
607
608 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
609 {
610 let mut rows = room_participant::Entity::find()
611 .inner_join(room::Entity)
612 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
613 .select_only()
614 .column(room::Column::ChannelId)
615 .column(room_participant::Column::UserId)
616 .into_values::<_, QueryUserIdsAndChannelIds>()
617 .stream(tx)
618 .await?;
619 while let Some(row) = rows.next().await {
620 let row: (ChannelId, UserId) = row?;
621 channel_participants.entry(row.0).or_default().push(row.1)
622 }
623 }
624
625 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
626
627 let mut channel_ids_by_buffer_id = HashMap::default();
628 let mut rows = buffer::Entity::find()
629 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
630 .stream(tx)
631 .await?;
632 while let Some(row) = rows.next().await {
633 let row = row?;
634 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
635 }
636 drop(rows);
637
638 let latest_buffer_versions = self
639 .latest_channel_buffer_changes(&channel_ids_by_buffer_id, tx)
640 .await?;
641
642 let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
643
644 let observed_buffer_versions = self
645 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
646 .await?;
647
648 let observed_channel_messages = self
649 .observed_channel_messages(&channel_ids, user_id, tx)
650 .await?;
651
652 let hosted_projects = self
653 .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
654 .await?;
655
656 Ok(ChannelsForUser {
657 channel_memberships,
658 channels,
659 hosted_projects,
660 channel_participants,
661 latest_buffer_versions,
662 latest_channel_messages,
663 observed_buffer_versions,
664 observed_channel_messages,
665 })
666 }
667
668 /// Sets the role for the specified channel member.
669 pub async fn set_channel_member_role(
670 &self,
671 channel_id: ChannelId,
672 admin_id: UserId,
673 for_user: UserId,
674 role: ChannelRole,
675 ) -> Result<SetMemberRoleResult> {
676 self.transaction(|tx| async move {
677 let channel = self.get_channel_internal(channel_id, &tx).await?;
678 self.check_user_is_channel_admin(&channel, admin_id, &tx)
679 .await?;
680
681 let membership = channel_member::Entity::find()
682 .filter(
683 channel_member::Column::ChannelId
684 .eq(channel_id)
685 .and(channel_member::Column::UserId.eq(for_user)),
686 )
687 .one(&*tx)
688 .await?;
689
690 let Some(membership) = membership else {
691 Err(anyhow!("no such member"))?
692 };
693
694 let mut update = membership.into_active_model();
695 update.role = ActiveValue::Set(role);
696 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
697
698 if updated.accepted {
699 Ok(SetMemberRoleResult::MembershipUpdated(
700 self.calculate_membership_updated(&channel, for_user, &tx)
701 .await?,
702 ))
703 } else {
704 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
705 channel,
706 )))
707 }
708 })
709 .await
710 }
711
712 /// Returns the details for the specified channel member.
713 pub async fn get_channel_participant_details(
714 &self,
715 channel_id: ChannelId,
716 user_id: UserId,
717 ) -> Result<Vec<proto::ChannelMember>> {
718 let (role, members) = self
719 .transaction(move |tx| async move {
720 let channel = self.get_channel_internal(channel_id, &tx).await?;
721 let role = self
722 .check_user_is_channel_participant(&channel, user_id, &tx)
723 .await?;
724 Ok((
725 role,
726 self.get_channel_participant_details_internal(&channel, &tx)
727 .await?,
728 ))
729 })
730 .await?;
731
732 if role == ChannelRole::Admin {
733 Ok(members
734 .into_iter()
735 .map(|channel_member| proto::ChannelMember {
736 role: channel_member.role.into(),
737 user_id: channel_member.user_id.to_proto(),
738 kind: if channel_member.accepted {
739 Kind::Member
740 } else {
741 Kind::Invitee
742 }
743 .into(),
744 })
745 .collect())
746 } else {
747 return Ok(members
748 .into_iter()
749 .filter_map(|member| {
750 if !member.accepted {
751 return None;
752 }
753 Some(proto::ChannelMember {
754 role: member.role.into(),
755 user_id: member.user_id.to_proto(),
756 kind: Kind::Member.into(),
757 })
758 })
759 .collect());
760 }
761 }
762
763 async fn get_channel_participant_details_internal(
764 &self,
765 channel: &channel::Model,
766 tx: &DatabaseTransaction,
767 ) -> Result<Vec<channel_member::Model>> {
768 Ok(channel_member::Entity::find()
769 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
770 .all(tx)
771 .await?)
772 }
773
774 /// Returns the participants in the given channel.
775 pub async fn get_channel_participants(
776 &self,
777 channel: &channel::Model,
778 tx: &DatabaseTransaction,
779 ) -> Result<Vec<UserId>> {
780 let participants = self
781 .get_channel_participant_details_internal(channel, tx)
782 .await?;
783 Ok(participants
784 .into_iter()
785 .map(|member| member.user_id)
786 .collect())
787 }
788
789 /// Returns whether the given user is an admin in the specified channel.
790 pub async fn check_user_is_channel_admin(
791 &self,
792 channel: &channel::Model,
793 user_id: UserId,
794 tx: &DatabaseTransaction,
795 ) -> Result<ChannelRole> {
796 let role = self.channel_role_for_user(channel, user_id, tx).await?;
797 match role {
798 Some(ChannelRole::Admin) => Ok(role.unwrap()),
799 Some(ChannelRole::Member)
800 | Some(ChannelRole::Talker)
801 | Some(ChannelRole::Banned)
802 | Some(ChannelRole::Guest)
803 | None => Err(anyhow!(
804 "user is not a channel admin or channel does not exist"
805 ))?,
806 }
807 }
808
809 /// Returns whether the given user is a member of the specified channel.
810 pub async fn check_user_is_channel_member(
811 &self,
812 channel: &channel::Model,
813 user_id: UserId,
814 tx: &DatabaseTransaction,
815 ) -> Result<ChannelRole> {
816 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
817 match channel_role {
818 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
819 Some(ChannelRole::Banned)
820 | Some(ChannelRole::Guest)
821 | Some(ChannelRole::Talker)
822 | None => Err(anyhow!(
823 "user is not a channel member or channel does not exist"
824 ))?,
825 }
826 }
827
828 /// Returns whether the given user is a participant in the specified channel.
829 pub async fn check_user_is_channel_participant(
830 &self,
831 channel: &channel::Model,
832 user_id: UserId,
833 tx: &DatabaseTransaction,
834 ) -> Result<ChannelRole> {
835 let role = self.channel_role_for_user(channel, user_id, tx).await?;
836 match role {
837 Some(ChannelRole::Admin)
838 | Some(ChannelRole::Member)
839 | Some(ChannelRole::Guest)
840 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
841 Some(ChannelRole::Banned) | None => Err(anyhow!(
842 "user is not a channel participant or channel does not exist"
843 ))?,
844 }
845 }
846
847 /// Returns a user's pending invite for the given channel, if one exists.
848 pub async fn pending_invite_for_channel(
849 &self,
850 channel: &channel::Model,
851 user_id: UserId,
852 tx: &DatabaseTransaction,
853 ) -> Result<Option<channel_member::Model>> {
854 let row = channel_member::Entity::find()
855 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
856 .filter(channel_member::Column::UserId.eq(user_id))
857 .filter(channel_member::Column::Accepted.eq(false))
858 .one(tx)
859 .await?;
860
861 Ok(row)
862 }
863
864 /// Returns the role for a user in the given channel.
865 pub async fn channel_role_for_user(
866 &self,
867 channel: &channel::Model,
868 user_id: UserId,
869 tx: &DatabaseTransaction,
870 ) -> Result<Option<ChannelRole>> {
871 let membership = channel_member::Entity::find()
872 .filter(
873 channel_member::Column::ChannelId
874 .eq(channel.root_id())
875 .and(channel_member::Column::UserId.eq(user_id))
876 .and(channel_member::Column::Accepted.eq(true)),
877 )
878 .one(tx)
879 .await?;
880
881 let Some(membership) = membership else {
882 return Ok(None);
883 };
884
885 if !membership.role.can_see_channel(channel.visibility) {
886 return Ok(None);
887 }
888
889 Ok(Some(membership.role))
890 }
891
892 // Get the descendants of the given set if channels, ordered by their
893 // path.
894 pub(crate) async fn get_channel_descendants_excluding_self(
895 &self,
896 channels: impl IntoIterator<Item = &channel::Model>,
897 tx: &DatabaseTransaction,
898 ) -> Result<Vec<channel::Model>> {
899 let mut filter = Condition::any();
900 for channel in channels.into_iter() {
901 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
902 }
903
904 if filter.is_empty() {
905 return Ok(vec![]);
906 }
907
908 Ok(channel::Entity::find()
909 .filter(filter)
910 .order_by_asc(Expr::cust("parent_path || id || '/'"))
911 .all(tx)
912 .await?)
913 }
914
915 /// Returns the channel with the given ID.
916 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
917 self.transaction(|tx| async move {
918 let channel = self.get_channel_internal(channel_id, &tx).await?;
919 self.check_user_is_channel_participant(&channel, user_id, &tx)
920 .await?;
921
922 Ok(Channel::from_model(channel))
923 })
924 .await
925 }
926
927 pub(crate) async fn get_channel_internal(
928 &self,
929 channel_id: ChannelId,
930 tx: &DatabaseTransaction,
931 ) -> Result<channel::Model> {
932 Ok(channel::Entity::find_by_id(channel_id)
933 .one(tx)
934 .await?
935 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
936 }
937
938 pub(crate) async fn get_or_create_channel_room(
939 &self,
940 channel_id: ChannelId,
941 live_kit_room: &str,
942 tx: &DatabaseTransaction,
943 ) -> Result<RoomId> {
944 let room = room::Entity::find()
945 .filter(room::Column::ChannelId.eq(channel_id))
946 .one(tx)
947 .await?;
948
949 let room_id = if let Some(room) = room {
950 room.id
951 } else {
952 let result = room::Entity::insert(room::ActiveModel {
953 channel_id: ActiveValue::Set(Some(channel_id)),
954 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
955 ..Default::default()
956 })
957 .exec(tx)
958 .await?;
959
960 result.last_insert_id
961 };
962
963 Ok(room_id)
964 }
965
966 /// Move a channel from one parent to another
967 pub async fn move_channel(
968 &self,
969 channel_id: ChannelId,
970 new_parent_id: ChannelId,
971 admin_id: UserId,
972 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
973 self.transaction(|tx| async move {
974 let channel = self.get_channel_internal(channel_id, &tx).await?;
975 self.check_user_is_channel_admin(&channel, admin_id, &tx)
976 .await?;
977 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
978
979 if new_parent.root_id() != channel.root_id() {
980 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
981 }
982
983 if new_parent
984 .ancestors_including_self()
985 .any(|id| id == channel.id)
986 {
987 Err(anyhow!(ErrorCode::CircularNesting))?;
988 }
989
990 if channel.visibility == ChannelVisibility::Public
991 && new_parent.visibility != ChannelVisibility::Public
992 {
993 Err(anyhow!(ErrorCode::BadPublicNesting))?;
994 }
995
996 let root_id = channel.root_id();
997 let old_path = format!("{}{}/", channel.parent_path, channel.id);
998 let new_path = format!("{}{}/", new_parent.path(), channel.id);
999
1000 let mut model = channel.into_active_model();
1001 model.parent_path = ActiveValue::Set(new_parent.path());
1002 let channel = model.update(&*tx).await?;
1003
1004 let descendent_ids =
1005 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1006 self.pool.get_database_backend(),
1007 "
1008 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1009 WHERE parent_path LIKE $3 || '%'
1010 RETURNING id
1011 ",
1012 [old_path.clone().into(), new_path.into(), old_path.into()],
1013 ))
1014 .all(&*tx)
1015 .await?;
1016
1017 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1018
1019 let channels = channel::Entity::find()
1020 .filter(channel::Column::Id.is_in(all_moved_ids))
1021 .all(&*tx)
1022 .await?
1023 .into_iter()
1024 .map(|c| Channel::from_model(c))
1025 .collect::<Vec<_>>();
1026
1027 let channel_members = channel_member::Entity::find()
1028 .filter(channel_member::Column::ChannelId.eq(root_id))
1029 .all(&*tx)
1030 .await?;
1031
1032 Ok((channels, channel_members))
1033 })
1034 .await
1035 }
1036}
1037
1038#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1039enum QueryIds {
1040 Id,
1041}
1042
1043#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1044enum QueryUserIds {
1045 UserId,
1046}