1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 /// Adds a user to the specified channel.
101 pub async fn join_channel(
102 &self,
103 channel_id: ChannelId,
104 user_id: UserId,
105 connection: ConnectionId,
106 environment: &str,
107 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
108 self.transaction(move |tx| async move {
109 let channel = self.get_channel_internal(channel_id, &*tx).await?;
110 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
111
112 let mut accept_invite_result = None;
113
114 if role.is_none() {
115 if let Some(invitation) = self
116 .pending_invite_for_channel(&channel, user_id, &*tx)
117 .await?
118 {
119 // note, this may be a parent channel
120 role = Some(invitation.role);
121 channel_member::Entity::update(channel_member::ActiveModel {
122 accepted: ActiveValue::Set(true),
123 ..invitation.into_active_model()
124 })
125 .exec(&*tx)
126 .await?;
127
128 accept_invite_result = Some(
129 self.calculate_membership_updated(&channel, user_id, &*tx)
130 .await?,
131 );
132
133 debug_assert!(
134 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
135 );
136 } else if channel.visibility == ChannelVisibility::Public {
137 role = Some(ChannelRole::Guest);
138 channel_member::Entity::insert(channel_member::ActiveModel {
139 id: ActiveValue::NotSet,
140 channel_id: ActiveValue::Set(channel.root_id()),
141 user_id: ActiveValue::Set(user_id),
142 accepted: ActiveValue::Set(true),
143 role: ActiveValue::Set(ChannelRole::Guest),
144 })
145 .exec(&*tx)
146 .await?;
147
148 accept_invite_result = Some(
149 self.calculate_membership_updated(&channel, user_id, &*tx)
150 .await?,
151 );
152
153 debug_assert!(
154 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
155 );
156 }
157 }
158
159 if role.is_none() || role == Some(ChannelRole::Banned) {
160 Err(ErrorCode::Forbidden.anyhow())?
161 }
162 let role = role.unwrap();
163
164 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
165 let room_id = self
166 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
167 .await?;
168
169 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
170 .await
171 .map(|jr| (jr, accept_invite_result, role))
172 })
173 .await
174 }
175
176 /// Sets the visibiltity of the given channel.
177 pub async fn set_channel_visibility(
178 &self,
179 channel_id: ChannelId,
180 visibility: ChannelVisibility,
181 admin_id: UserId,
182 ) -> Result<(Channel, Vec<channel_member::Model>)> {
183 self.transaction(move |tx| async move {
184 let channel = self.get_channel_internal(channel_id, &*tx).await?;
185 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
186 .await?;
187
188 if visibility == ChannelVisibility::Public {
189 if let Some(parent_id) = channel.parent_id() {
190 let parent = self.get_channel_internal(parent_id, &*tx).await?;
191
192 if parent.visibility != ChannelVisibility::Public {
193 Err(ErrorCode::BadPublicNesting
194 .with_tag("direction", "parent")
195 .anyhow())?;
196 }
197 }
198 } else if visibility == ChannelVisibility::Members {
199 if self
200 .get_channel_descendants_including_self(vec![channel_id], &*tx)
201 .await?
202 .into_iter()
203 .any(|channel| {
204 channel.id != channel_id && channel.visibility == ChannelVisibility::Public
205 })
206 {
207 Err(ErrorCode::BadPublicNesting
208 .with_tag("direction", "children")
209 .anyhow())?;
210 }
211 }
212
213 let mut model = channel.into_active_model();
214 model.visibility = ActiveValue::Set(visibility);
215 let channel = model.update(&*tx).await?;
216
217 let channel_members = channel_member::Entity::find()
218 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
219 .all(&*tx)
220 .await?;
221
222 Ok((Channel::from_model(channel), channel_members))
223 })
224 .await
225 }
226
227 #[cfg(test)]
228 pub async fn set_channel_requires_zed_cla(
229 &self,
230 channel_id: ChannelId,
231 requires_zed_cla: bool,
232 ) -> Result<()> {
233 self.transaction(move |tx| async move {
234 let channel = self.get_channel_internal(channel_id, &*tx).await?;
235 let mut model = channel.into_active_model();
236 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
237 model.update(&*tx).await?;
238 Ok(())
239 })
240 .await
241 }
242
243 /// Deletes the channel with the specified ID.
244 pub async fn delete_channel(
245 &self,
246 channel_id: ChannelId,
247 user_id: UserId,
248 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
249 self.transaction(move |tx| async move {
250 let channel = self.get_channel_internal(channel_id, &*tx).await?;
251 self.check_user_is_channel_admin(&channel, user_id, &*tx)
252 .await?;
253
254 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
255 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
256 .select_only()
257 .column(channel_member::Column::UserId)
258 .distinct()
259 .into_values::<_, QueryUserIds>()
260 .all(&*tx)
261 .await?;
262
263 let channels_to_remove = self
264 .get_channel_descendants_including_self(vec![channel.id], &*tx)
265 .await?
266 .into_iter()
267 .map(|channel| channel.id)
268 .collect::<Vec<_>>();
269
270 channel::Entity::delete_many()
271 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
272 .exec(&*tx)
273 .await?;
274
275 Ok((channels_to_remove, members_to_notify))
276 })
277 .await
278 }
279
280 /// Invites a user to a channel as a member.
281 pub async fn invite_channel_member(
282 &self,
283 channel_id: ChannelId,
284 invitee_id: UserId,
285 inviter_id: UserId,
286 role: ChannelRole,
287 ) -> Result<InviteMemberResult> {
288 self.transaction(move |tx| async move {
289 let channel = self.get_channel_internal(channel_id, &*tx).await?;
290 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
291 .await?;
292 if !channel.is_root() {
293 Err(ErrorCode::NotARootChannel.anyhow())?
294 }
295
296 channel_member::ActiveModel {
297 id: ActiveValue::NotSet,
298 channel_id: ActiveValue::Set(channel_id),
299 user_id: ActiveValue::Set(invitee_id),
300 accepted: ActiveValue::Set(false),
301 role: ActiveValue::Set(role),
302 }
303 .insert(&*tx)
304 .await?;
305
306 let channel = Channel::from_model(channel);
307
308 let notifications = self
309 .create_notification(
310 invitee_id,
311 rpc::Notification::ChannelInvitation {
312 channel_id: channel_id.to_proto(),
313 channel_name: channel.name.clone(),
314 inviter_id: inviter_id.to_proto(),
315 },
316 true,
317 &*tx,
318 )
319 .await?
320 .into_iter()
321 .collect();
322
323 Ok(InviteMemberResult {
324 channel,
325 notifications,
326 })
327 })
328 .await
329 }
330
331 fn sanitize_channel_name(name: &str) -> Result<&str> {
332 let new_name = name.trim().trim_start_matches('#');
333 if new_name == "" {
334 Err(anyhow!("channel name can't be blank"))?;
335 }
336 Ok(new_name)
337 }
338
339 /// Renames the specified channel.
340 pub async fn rename_channel(
341 &self,
342 channel_id: ChannelId,
343 admin_id: UserId,
344 new_name: &str,
345 ) -> Result<(Channel, Vec<channel_member::Model>)> {
346 self.transaction(move |tx| async move {
347 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
348
349 let channel = self.get_channel_internal(channel_id, &*tx).await?;
350 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
351 .await?;
352
353 let mut model = channel.into_active_model();
354 model.name = ActiveValue::Set(new_name.clone());
355 let channel = model.update(&*tx).await?;
356
357 let channel_members = channel_member::Entity::find()
358 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
359 .all(&*tx)
360 .await?;
361
362 Ok((Channel::from_model(channel), channel_members))
363 })
364 .await
365 }
366
367 /// accept or decline an invite to join a channel
368 pub async fn respond_to_channel_invite(
369 &self,
370 channel_id: ChannelId,
371 user_id: UserId,
372 accept: bool,
373 ) -> Result<RespondToChannelInvite> {
374 self.transaction(move |tx| async move {
375 let channel = self.get_channel_internal(channel_id, &*tx).await?;
376
377 let membership_update = if accept {
378 let rows_affected = channel_member::Entity::update_many()
379 .set(channel_member::ActiveModel {
380 accepted: ActiveValue::Set(accept),
381 ..Default::default()
382 })
383 .filter(
384 channel_member::Column::ChannelId
385 .eq(channel_id)
386 .and(channel_member::Column::UserId.eq(user_id))
387 .and(channel_member::Column::Accepted.eq(false)),
388 )
389 .exec(&*tx)
390 .await?
391 .rows_affected;
392
393 if rows_affected == 0 {
394 Err(anyhow!("no such invitation"))?;
395 }
396
397 Some(
398 self.calculate_membership_updated(&channel, user_id, &*tx)
399 .await?,
400 )
401 } else {
402 let rows_affected = channel_member::Entity::delete_many()
403 .filter(
404 channel_member::Column::ChannelId
405 .eq(channel_id)
406 .and(channel_member::Column::UserId.eq(user_id))
407 .and(channel_member::Column::Accepted.eq(false)),
408 )
409 .exec(&*tx)
410 .await?
411 .rows_affected;
412 if rows_affected == 0 {
413 Err(anyhow!("no such invitation"))?;
414 }
415
416 None
417 };
418
419 Ok(RespondToChannelInvite {
420 membership_update,
421 notifications: self
422 .mark_notification_as_read_with_response(
423 user_id,
424 &rpc::Notification::ChannelInvitation {
425 channel_id: channel_id.to_proto(),
426 channel_name: Default::default(),
427 inviter_id: Default::default(),
428 },
429 accept,
430 &*tx,
431 )
432 .await?
433 .into_iter()
434 .collect(),
435 })
436 })
437 .await
438 }
439
440 async fn calculate_membership_updated(
441 &self,
442 channel: &channel::Model,
443 user_id: UserId,
444 tx: &DatabaseTransaction,
445 ) -> Result<MembershipUpdated> {
446 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
447 let removed_channels = self
448 .get_channel_descendants_including_self(vec![channel.id], &*tx)
449 .await?
450 .into_iter()
451 .filter_map(|channel| {
452 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
453 Some(channel.id)
454 } else {
455 None
456 }
457 })
458 .collect::<Vec<_>>();
459
460 Ok(MembershipUpdated {
461 channel_id: channel.id,
462 new_channels,
463 removed_channels,
464 })
465 }
466
467 /// Removes a channel member.
468 pub async fn remove_channel_member(
469 &self,
470 channel_id: ChannelId,
471 member_id: UserId,
472 admin_id: UserId,
473 ) -> Result<RemoveChannelMemberResult> {
474 self.transaction(|tx| async move {
475 let channel = self.get_channel_internal(channel_id, &*tx).await?;
476 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
477 .await?;
478
479 let result = channel_member::Entity::delete_many()
480 .filter(
481 channel_member::Column::ChannelId
482 .eq(channel_id)
483 .and(channel_member::Column::UserId.eq(member_id)),
484 )
485 .exec(&*tx)
486 .await?;
487
488 if result.rows_affected == 0 {
489 Err(anyhow!("no such member"))?;
490 }
491
492 Ok(RemoveChannelMemberResult {
493 membership_update: self
494 .calculate_membership_updated(&channel, member_id, &*tx)
495 .await?,
496 notification_id: self
497 .remove_notification(
498 member_id,
499 rpc::Notification::ChannelInvitation {
500 channel_id: channel_id.to_proto(),
501 channel_name: Default::default(),
502 inviter_id: Default::default(),
503 },
504 &*tx,
505 )
506 .await?,
507 })
508 })
509 .await
510 }
511
512 /// Returns all channel invites for the user with the given ID.
513 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
514 self.transaction(|tx| async move {
515 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
516
517 let channel_invites = channel_member::Entity::find()
518 .filter(
519 channel_member::Column::UserId
520 .eq(user_id)
521 .and(channel_member::Column::Accepted.eq(false)),
522 )
523 .all(&*tx)
524 .await?;
525
526 for invite in channel_invites {
527 role_for_channel.insert(invite.channel_id, invite.role);
528 }
529
530 let channels = channel::Entity::find()
531 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
532 .all(&*tx)
533 .await?;
534
535 let channels = channels
536 .into_iter()
537 .filter_map(|channel| Some(Channel::from_model(channel)))
538 .collect();
539
540 Ok(channels)
541 })
542 .await
543 }
544
545 pub async fn get_channel_memberships(
546 &self,
547 user_id: UserId,
548 ) -> Result<(Vec<channel_member::Model>, Vec<channel::Model>)> {
549 self.transaction(|tx| async move {
550 let memberships = channel_member::Entity::find()
551 .filter(channel_member::Column::UserId.eq(user_id))
552 .all(&*tx)
553 .await?;
554 let channels = self
555 .get_channel_descendants_including_self(
556 memberships.iter().map(|m| m.channel_id),
557 &*tx,
558 )
559 .await?;
560 Ok((memberships, channels))
561 })
562 .await
563 }
564
565 /// Returns all channels for the user with the given ID.
566 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
567 self.transaction(|tx| async move {
568 let tx = tx;
569
570 self.get_user_channels(user_id, None, &tx).await
571 })
572 .await
573 }
574
575 /// Returns all channels for the user with the given ID that are descendants
576 /// of the specified ancestor channel.
577 pub async fn get_user_channels(
578 &self,
579 user_id: UserId,
580 ancestor_channel: Option<&channel::Model>,
581 tx: &DatabaseTransaction,
582 ) -> Result<ChannelsForUser> {
583 let mut filter = channel_member::Column::UserId
584 .eq(user_id)
585 .and(channel_member::Column::Accepted.eq(true));
586
587 if let Some(ancestor) = ancestor_channel {
588 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
589 }
590
591 let channel_memberships = channel_member::Entity::find()
592 .filter(filter)
593 .all(&*tx)
594 .await?;
595
596 let descendants = self
597 .get_channel_descendants_including_self(
598 channel_memberships.iter().map(|m| m.channel_id),
599 &*tx,
600 )
601 .await?;
602
603 let roles_by_channel_id = channel_memberships
604 .iter()
605 .map(|membership| (membership.channel_id, membership.role))
606 .collect::<HashMap<_, _>>();
607
608 let channels: Vec<Channel> = descendants
609 .into_iter()
610 .filter_map(|channel| {
611 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
612 if parent_role.can_see_channel(channel.visibility) {
613 Some(Channel::from_model(channel))
614 } else {
615 None
616 }
617 })
618 .collect();
619
620 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
621 enum QueryUserIdsAndChannelIds {
622 ChannelId,
623 UserId,
624 }
625
626 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
627 {
628 let mut rows = room_participant::Entity::find()
629 .inner_join(room::Entity)
630 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
631 .select_only()
632 .column(room::Column::ChannelId)
633 .column(room_participant::Column::UserId)
634 .into_values::<_, QueryUserIdsAndChannelIds>()
635 .stream(&*tx)
636 .await?;
637 while let Some(row) = rows.next().await {
638 let row: (ChannelId, UserId) = row?;
639 channel_participants.entry(row.0).or_default().push(row.1)
640 }
641 }
642
643 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
644 let latest_buffer_versions = self
645 .latest_channel_buffer_changes(&channel_ids, &*tx)
646 .await?;
647
648 let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
649
650 Ok(ChannelsForUser {
651 channel_memberships,
652 channels,
653 channel_participants,
654 latest_buffer_versions,
655 latest_channel_messages: latest_messages,
656 })
657 }
658
659 /// Sets the role for the specified channel member.
660 pub async fn set_channel_member_role(
661 &self,
662 channel_id: ChannelId,
663 admin_id: UserId,
664 for_user: UserId,
665 role: ChannelRole,
666 ) -> Result<SetMemberRoleResult> {
667 self.transaction(|tx| async move {
668 let channel = self.get_channel_internal(channel_id, &*tx).await?;
669 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
670 .await?;
671
672 let membership = channel_member::Entity::find()
673 .filter(
674 channel_member::Column::ChannelId
675 .eq(channel_id)
676 .and(channel_member::Column::UserId.eq(for_user)),
677 )
678 .one(&*tx)
679 .await?;
680
681 let Some(membership) = membership else {
682 Err(anyhow!("no such member"))?
683 };
684
685 let mut update = membership.into_active_model();
686 update.role = ActiveValue::Set(role);
687 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
688
689 if updated.accepted {
690 Ok(SetMemberRoleResult::MembershipUpdated(
691 self.calculate_membership_updated(&channel, for_user, &*tx)
692 .await?,
693 ))
694 } else {
695 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
696 channel,
697 )))
698 }
699 })
700 .await
701 }
702
703 /// Returns the details for the specified channel member.
704 pub async fn get_channel_participant_details(
705 &self,
706 channel_id: ChannelId,
707 user_id: UserId,
708 ) -> Result<Vec<proto::ChannelMember>> {
709 let (role, members) = self
710 .transaction(move |tx| async move {
711 let channel = self.get_channel_internal(channel_id, &*tx).await?;
712 let role = self
713 .check_user_is_channel_participant(&channel, user_id, &*tx)
714 .await?;
715 Ok((
716 role,
717 self.get_channel_participant_details_internal(&channel, &*tx)
718 .await?,
719 ))
720 })
721 .await?;
722
723 if role == ChannelRole::Admin {
724 Ok(members
725 .into_iter()
726 .map(|channel_member| proto::ChannelMember {
727 role: channel_member.role.into(),
728 user_id: channel_member.user_id.to_proto(),
729 kind: if channel_member.accepted {
730 Kind::Member
731 } else {
732 Kind::Invitee
733 }
734 .into(),
735 })
736 .collect())
737 } else {
738 return Ok(members
739 .into_iter()
740 .filter_map(|member| {
741 if !member.accepted {
742 return None;
743 }
744 Some(proto::ChannelMember {
745 role: member.role.into(),
746 user_id: member.user_id.to_proto(),
747 kind: Kind::Member.into(),
748 })
749 })
750 .collect());
751 }
752 }
753
754 async fn get_channel_participant_details_internal(
755 &self,
756 channel: &channel::Model,
757 tx: &DatabaseTransaction,
758 ) -> Result<Vec<channel_member::Model>> {
759 Ok(channel_member::Entity::find()
760 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
761 .all(tx)
762 .await?)
763 }
764
765 /// Returns the participants in the given channel.
766 pub async fn get_channel_participants(
767 &self,
768 channel: &channel::Model,
769 tx: &DatabaseTransaction,
770 ) -> Result<Vec<UserId>> {
771 let participants = self
772 .get_channel_participant_details_internal(channel, &*tx)
773 .await?;
774 Ok(participants
775 .into_iter()
776 .map(|member| member.user_id)
777 .collect())
778 }
779
780 /// Returns whether the given user is an admin in the specified channel.
781 pub async fn check_user_is_channel_admin(
782 &self,
783 channel: &channel::Model,
784 user_id: UserId,
785 tx: &DatabaseTransaction,
786 ) -> Result<ChannelRole> {
787 let role = self.channel_role_for_user(channel, user_id, tx).await?;
788 match role {
789 Some(ChannelRole::Admin) => Ok(role.unwrap()),
790 Some(ChannelRole::Member)
791 | Some(ChannelRole::Banned)
792 | Some(ChannelRole::Guest)
793 | None => Err(anyhow!(
794 "user is not a channel admin or channel does not exist"
795 ))?,
796 }
797 }
798
799 /// Returns whether the given user is a member of the specified channel.
800 pub async fn check_user_is_channel_member(
801 &self,
802 channel: &channel::Model,
803 user_id: UserId,
804 tx: &DatabaseTransaction,
805 ) -> Result<ChannelRole> {
806 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
807 match channel_role {
808 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
809 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
810 "user is not a channel member or channel does not exist"
811 ))?,
812 }
813 }
814
815 /// Returns whether the given user is a participant in the specified channel.
816 pub async fn check_user_is_channel_participant(
817 &self,
818 channel: &channel::Model,
819 user_id: UserId,
820 tx: &DatabaseTransaction,
821 ) -> Result<ChannelRole> {
822 let role = self.channel_role_for_user(channel, user_id, tx).await?;
823 match role {
824 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
825 Ok(role.unwrap())
826 }
827 Some(ChannelRole::Banned) | None => Err(anyhow!(
828 "user is not a channel participant or channel does not exist"
829 ))?,
830 }
831 }
832
833 /// Returns a user's pending invite for the given channel, if one exists.
834 pub async fn pending_invite_for_channel(
835 &self,
836 channel: &channel::Model,
837 user_id: UserId,
838 tx: &DatabaseTransaction,
839 ) -> Result<Option<channel_member::Model>> {
840 let row = channel_member::Entity::find()
841 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
842 .filter(channel_member::Column::UserId.eq(user_id))
843 .filter(channel_member::Column::Accepted.eq(false))
844 .one(&*tx)
845 .await?;
846
847 Ok(row)
848 }
849
850 /// Returns the role for a user in the given channel.
851 pub async fn channel_role_for_user(
852 &self,
853 channel: &channel::Model,
854 user_id: UserId,
855 tx: &DatabaseTransaction,
856 ) -> Result<Option<ChannelRole>> {
857 let membership = channel_member::Entity::find()
858 .filter(
859 channel_member::Column::ChannelId
860 .eq(channel.root_id())
861 .and(channel_member::Column::UserId.eq(user_id))
862 .and(channel_member::Column::Accepted.eq(true)),
863 )
864 .one(&*tx)
865 .await?;
866
867 let Some(membership) = membership else {
868 return Ok(None);
869 };
870
871 if !membership.role.can_see_channel(channel.visibility) {
872 return Ok(None);
873 }
874
875 Ok(Some(membership.role))
876 }
877
878 // Get the descendants of the given set if channels, ordered by their
879 // path.
880 async fn get_channel_descendants_including_self(
881 &self,
882 channel_ids: impl IntoIterator<Item = ChannelId>,
883 tx: &DatabaseTransaction,
884 ) -> Result<Vec<channel::Model>> {
885 let mut values = String::new();
886 for id in channel_ids {
887 if !values.is_empty() {
888 values.push_str(", ");
889 }
890 write!(&mut values, "({})", id).unwrap();
891 }
892
893 if values.is_empty() {
894 return Ok(vec![]);
895 }
896
897 let sql = format!(
898 r#"
899 SELECT DISTINCT
900 descendant_channels.*,
901 descendant_channels.parent_path || descendant_channels.id as full_path
902 FROM
903 channels parent_channels, channels descendant_channels
904 WHERE
905 descendant_channels.id IN ({values}) OR
906 (
907 parent_channels.id IN ({values}) AND
908 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
909 )
910 ORDER BY
911 full_path ASC
912 "#
913 );
914
915 Ok(channel::Entity::find()
916 .from_raw_sql(Statement::from_string(
917 self.pool.get_database_backend(),
918 sql,
919 ))
920 .all(tx)
921 .await?)
922 }
923
924 /// Returns the channel with the given ID.
925 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
926 self.transaction(|tx| async move {
927 let channel = self.get_channel_internal(channel_id, &*tx).await?;
928 self.check_user_is_channel_participant(&channel, user_id, &*tx)
929 .await?;
930
931 Ok(Channel::from_model(channel))
932 })
933 .await
934 }
935
936 pub(crate) async fn get_channel_internal(
937 &self,
938 channel_id: ChannelId,
939 tx: &DatabaseTransaction,
940 ) -> Result<channel::Model> {
941 Ok(channel::Entity::find_by_id(channel_id)
942 .one(&*tx)
943 .await?
944 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
945 }
946
947 pub(crate) async fn get_or_create_channel_room(
948 &self,
949 channel_id: ChannelId,
950 live_kit_room: &str,
951 environment: &str,
952 tx: &DatabaseTransaction,
953 ) -> Result<RoomId> {
954 let room = room::Entity::find()
955 .filter(room::Column::ChannelId.eq(channel_id))
956 .one(&*tx)
957 .await?;
958
959 let room_id = if let Some(room) = room {
960 if let Some(env) = room.environment {
961 if &env != environment {
962 Err(ErrorCode::WrongReleaseChannel
963 .with_tag("required", &env)
964 .anyhow())?;
965 }
966 }
967 room.id
968 } else {
969 let result = room::Entity::insert(room::ActiveModel {
970 channel_id: ActiveValue::Set(Some(channel_id)),
971 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
972 environment: ActiveValue::Set(Some(environment.to_string())),
973 ..Default::default()
974 })
975 .exec(&*tx)
976 .await?;
977
978 result.last_insert_id
979 };
980
981 Ok(room_id)
982 }
983
984 /// Move a channel from one parent to another
985 pub async fn move_channel(
986 &self,
987 channel_id: ChannelId,
988 new_parent_id: ChannelId,
989 admin_id: UserId,
990 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
991 self.transaction(|tx| async move {
992 let channel = self.get_channel_internal(channel_id, &*tx).await?;
993 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
994 .await?;
995 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
996
997 if new_parent.root_id() != channel.root_id() {
998 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
999 }
1000
1001 if new_parent
1002 .ancestors_including_self()
1003 .any(|id| id == channel.id)
1004 {
1005 Err(anyhow!(ErrorCode::CircularNesting))?;
1006 }
1007
1008 if channel.visibility == ChannelVisibility::Public
1009 && new_parent.visibility != ChannelVisibility::Public
1010 {
1011 Err(anyhow!(ErrorCode::BadPublicNesting))?;
1012 }
1013
1014 let root_id = channel.root_id();
1015 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1016 let new_path = format!("{}{}/", new_parent.path(), channel.id);
1017
1018 let mut model = channel.into_active_model();
1019 model.parent_path = ActiveValue::Set(new_parent.path());
1020 let channel = model.update(&*tx).await?;
1021
1022 let descendent_ids =
1023 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1024 self.pool.get_database_backend(),
1025 "
1026 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1027 WHERE parent_path LIKE $3 || '%'
1028 RETURNING id
1029 ",
1030 [old_path.clone().into(), new_path.into(), old_path.into()],
1031 ))
1032 .all(&*tx)
1033 .await?;
1034
1035 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1036
1037 let channels = channel::Entity::find()
1038 .filter(channel::Column::Id.is_in(all_moved_ids))
1039 .all(&*tx)
1040 .await?
1041 .into_iter()
1042 .map(|c| Channel::from_model(c))
1043 .collect::<Vec<_>>();
1044
1045 let channel_members = channel_member::Entity::find()
1046 .filter(channel_member::Column::ChannelId.eq(root_id))
1047 .all(&*tx)
1048 .await?;
1049
1050 Ok((channels, channel_members))
1051 })
1052 .await
1053 }
1054}
1055
1056#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1057enum QueryIds {
1058 Id,
1059}
1060
1061#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1062enum QueryUserIds {
1063 UserId,
1064}