1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 /// Adds a user to the specified channel.
101 pub async fn join_channel(
102 &self,
103 channel_id: ChannelId,
104 user_id: UserId,
105 connection: ConnectionId,
106 environment: &str,
107 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
108 self.transaction(move |tx| async move {
109 let channel = self.get_channel_internal(channel_id, &*tx).await?;
110 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
111
112 let mut accept_invite_result = None;
113
114 if role.is_none() {
115 if let Some(invitation) = self
116 .pending_invite_for_channel(&channel, user_id, &*tx)
117 .await?
118 {
119 // note, this may be a parent channel
120 role = Some(invitation.role);
121 channel_member::Entity::update(channel_member::ActiveModel {
122 accepted: ActiveValue::Set(true),
123 ..invitation.into_active_model()
124 })
125 .exec(&*tx)
126 .await?;
127
128 accept_invite_result = Some(
129 self.calculate_membership_updated(&channel, user_id, &*tx)
130 .await?,
131 );
132
133 debug_assert!(
134 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
135 );
136 } else if channel.visibility == ChannelVisibility::Public {
137 role = Some(ChannelRole::Guest);
138 channel_member::Entity::insert(channel_member::ActiveModel {
139 id: ActiveValue::NotSet,
140 channel_id: ActiveValue::Set(channel.root_id()),
141 user_id: ActiveValue::Set(user_id),
142 accepted: ActiveValue::Set(true),
143 role: ActiveValue::Set(ChannelRole::Guest),
144 })
145 .exec(&*tx)
146 .await?;
147
148 accept_invite_result = Some(
149 self.calculate_membership_updated(&channel, user_id, &*tx)
150 .await?,
151 );
152
153 debug_assert!(
154 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
155 );
156 }
157 }
158
159 if role.is_none() || role == Some(ChannelRole::Banned) {
160 Err(ErrorCode::Forbidden.anyhow())?
161 }
162 let role = role.unwrap();
163
164 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
165 let room_id = self
166 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
167 .await?;
168
169 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
170 .await
171 .map(|jr| (jr, accept_invite_result, role))
172 })
173 .await
174 }
175
176 /// Sets the visibiltity of the given channel.
177 pub async fn set_channel_visibility(
178 &self,
179 channel_id: ChannelId,
180 visibility: ChannelVisibility,
181 admin_id: UserId,
182 ) -> Result<(Channel, Vec<channel_member::Model>)> {
183 self.transaction(move |tx| async move {
184 let channel = self.get_channel_internal(channel_id, &*tx).await?;
185 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
186 .await?;
187
188 if visibility == ChannelVisibility::Public {
189 if let Some(parent_id) = channel.parent_id() {
190 let parent = self.get_channel_internal(parent_id, &*tx).await?;
191
192 if parent.visibility != ChannelVisibility::Public {
193 Err(ErrorCode::BadPublicNesting
194 .with_tag("direction", "parent")
195 .anyhow())?;
196 }
197 }
198 } else if visibility == ChannelVisibility::Members {
199 if self
200 .get_channel_descendants_including_self(vec![channel_id], &*tx)
201 .await?
202 .into_iter()
203 .any(|channel| {
204 channel.id != channel_id && channel.visibility == ChannelVisibility::Public
205 })
206 {
207 Err(ErrorCode::BadPublicNesting
208 .with_tag("direction", "children")
209 .anyhow())?;
210 }
211 }
212
213 let mut model = channel.into_active_model();
214 model.visibility = ActiveValue::Set(visibility);
215 let channel = model.update(&*tx).await?;
216
217 let channel_members = channel_member::Entity::find()
218 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
219 .all(&*tx)
220 .await?;
221
222 Ok((Channel::from_model(channel), channel_members))
223 })
224 .await
225 }
226
227 #[cfg(test)]
228 pub async fn set_channel_requires_zed_cla(
229 &self,
230 channel_id: ChannelId,
231 requires_zed_cla: bool,
232 ) -> Result<()> {
233 self.transaction(move |tx| async move {
234 let channel = self.get_channel_internal(channel_id, &*tx).await?;
235 let mut model = channel.into_active_model();
236 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
237 model.update(&*tx).await?;
238 Ok(())
239 })
240 .await
241 }
242
243 /// Deletes the channel with the specified ID.
244 pub async fn delete_channel(
245 &self,
246 channel_id: ChannelId,
247 user_id: UserId,
248 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
249 self.transaction(move |tx| async move {
250 let channel = self.get_channel_internal(channel_id, &*tx).await?;
251 self.check_user_is_channel_admin(&channel, user_id, &*tx)
252 .await?;
253
254 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
255 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
256 .select_only()
257 .column(channel_member::Column::UserId)
258 .distinct()
259 .into_values::<_, QueryUserIds>()
260 .all(&*tx)
261 .await?;
262
263 let channels_to_remove = self
264 .get_channel_descendants_including_self(vec![channel.id], &*tx)
265 .await?
266 .into_iter()
267 .map(|channel| channel.id)
268 .collect::<Vec<_>>();
269
270 channel::Entity::delete_many()
271 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
272 .exec(&*tx)
273 .await?;
274
275 Ok((channels_to_remove, members_to_notify))
276 })
277 .await
278 }
279
280 /// Invites a user to a channel as a member.
281 pub async fn invite_channel_member(
282 &self,
283 channel_id: ChannelId,
284 invitee_id: UserId,
285 inviter_id: UserId,
286 role: ChannelRole,
287 ) -> Result<InviteMemberResult> {
288 self.transaction(move |tx| async move {
289 let channel = self.get_channel_internal(channel_id, &*tx).await?;
290 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
291 .await?;
292 if !channel.is_root() {
293 Err(ErrorCode::NotARootChannel.anyhow())?
294 }
295
296 channel_member::ActiveModel {
297 id: ActiveValue::NotSet,
298 channel_id: ActiveValue::Set(channel_id),
299 user_id: ActiveValue::Set(invitee_id),
300 accepted: ActiveValue::Set(false),
301 role: ActiveValue::Set(role),
302 }
303 .insert(&*tx)
304 .await?;
305
306 let channel = Channel::from_model(channel);
307
308 let notifications = self
309 .create_notification(
310 invitee_id,
311 rpc::Notification::ChannelInvitation {
312 channel_id: channel_id.to_proto(),
313 channel_name: channel.name.clone(),
314 inviter_id: inviter_id.to_proto(),
315 },
316 true,
317 &*tx,
318 )
319 .await?
320 .into_iter()
321 .collect();
322
323 Ok(InviteMemberResult {
324 channel,
325 notifications,
326 })
327 })
328 .await
329 }
330
331 fn sanitize_channel_name(name: &str) -> Result<&str> {
332 let new_name = name.trim().trim_start_matches('#');
333 if new_name == "" {
334 Err(anyhow!("channel name can't be blank"))?;
335 }
336 Ok(new_name)
337 }
338
339 /// Renames the specified channel.
340 pub async fn rename_channel(
341 &self,
342 channel_id: ChannelId,
343 admin_id: UserId,
344 new_name: &str,
345 ) -> Result<(Channel, Vec<channel_member::Model>)> {
346 self.transaction(move |tx| async move {
347 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
348
349 let channel = self.get_channel_internal(channel_id, &*tx).await?;
350 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
351 .await?;
352
353 let mut model = channel.into_active_model();
354 model.name = ActiveValue::Set(new_name.clone());
355 let channel = model.update(&*tx).await?;
356
357 let channel_members = channel_member::Entity::find()
358 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
359 .all(&*tx)
360 .await?;
361
362 Ok((Channel::from_model(channel), channel_members))
363 })
364 .await
365 }
366
367 /// accept or decline an invite to join a channel
368 pub async fn respond_to_channel_invite(
369 &self,
370 channel_id: ChannelId,
371 user_id: UserId,
372 accept: bool,
373 ) -> Result<RespondToChannelInvite> {
374 self.transaction(move |tx| async move {
375 let channel = self.get_channel_internal(channel_id, &*tx).await?;
376
377 let membership_update = if accept {
378 let rows_affected = channel_member::Entity::update_many()
379 .set(channel_member::ActiveModel {
380 accepted: ActiveValue::Set(accept),
381 ..Default::default()
382 })
383 .filter(
384 channel_member::Column::ChannelId
385 .eq(channel_id)
386 .and(channel_member::Column::UserId.eq(user_id))
387 .and(channel_member::Column::Accepted.eq(false)),
388 )
389 .exec(&*tx)
390 .await?
391 .rows_affected;
392
393 if rows_affected == 0 {
394 Err(anyhow!("no such invitation"))?;
395 }
396
397 Some(
398 self.calculate_membership_updated(&channel, user_id, &*tx)
399 .await?,
400 )
401 } else {
402 let rows_affected = channel_member::Entity::delete_many()
403 .filter(
404 channel_member::Column::ChannelId
405 .eq(channel_id)
406 .and(channel_member::Column::UserId.eq(user_id))
407 .and(channel_member::Column::Accepted.eq(false)),
408 )
409 .exec(&*tx)
410 .await?
411 .rows_affected;
412 if rows_affected == 0 {
413 Err(anyhow!("no such invitation"))?;
414 }
415
416 None
417 };
418
419 Ok(RespondToChannelInvite {
420 membership_update,
421 notifications: self
422 .mark_notification_as_read_with_response(
423 user_id,
424 &rpc::Notification::ChannelInvitation {
425 channel_id: channel_id.to_proto(),
426 channel_name: Default::default(),
427 inviter_id: Default::default(),
428 },
429 accept,
430 &*tx,
431 )
432 .await?
433 .into_iter()
434 .collect(),
435 })
436 })
437 .await
438 }
439
440 async fn calculate_membership_updated(
441 &self,
442 channel: &channel::Model,
443 user_id: UserId,
444 tx: &DatabaseTransaction,
445 ) -> Result<MembershipUpdated> {
446 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
447 let removed_channels = self
448 .get_channel_descendants_including_self(vec![channel.id], &*tx)
449 .await?
450 .into_iter()
451 .filter_map(|channel| {
452 if !new_channels.channels.iter().any(|c| c.id == channel.id) {
453 Some(channel.id)
454 } else {
455 None
456 }
457 })
458 .collect::<Vec<_>>();
459
460 Ok(MembershipUpdated {
461 channel_id: channel.id,
462 new_channels,
463 removed_channels,
464 })
465 }
466
467 /// Removes a channel member.
468 pub async fn remove_channel_member(
469 &self,
470 channel_id: ChannelId,
471 member_id: UserId,
472 admin_id: UserId,
473 ) -> Result<RemoveChannelMemberResult> {
474 self.transaction(|tx| async move {
475 let channel = self.get_channel_internal(channel_id, &*tx).await?;
476
477 if member_id != admin_id {
478 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
479 .await?;
480 }
481
482 let result = channel_member::Entity::delete_many()
483 .filter(
484 channel_member::Column::ChannelId
485 .eq(channel_id)
486 .and(channel_member::Column::UserId.eq(member_id)),
487 )
488 .exec(&*tx)
489 .await?;
490
491 if result.rows_affected == 0 {
492 Err(anyhow!("no such member"))?;
493 }
494
495 Ok(RemoveChannelMemberResult {
496 membership_update: self
497 .calculate_membership_updated(&channel, member_id, &*tx)
498 .await?,
499 notification_id: self
500 .remove_notification(
501 member_id,
502 rpc::Notification::ChannelInvitation {
503 channel_id: channel_id.to_proto(),
504 channel_name: Default::default(),
505 inviter_id: Default::default(),
506 },
507 &*tx,
508 )
509 .await?,
510 })
511 })
512 .await
513 }
514
515 /// Returns all channel invites for the user with the given ID.
516 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
517 self.transaction(|tx| async move {
518 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
519
520 let channel_invites = channel_member::Entity::find()
521 .filter(
522 channel_member::Column::UserId
523 .eq(user_id)
524 .and(channel_member::Column::Accepted.eq(false)),
525 )
526 .all(&*tx)
527 .await?;
528
529 for invite in channel_invites {
530 role_for_channel.insert(invite.channel_id, invite.role);
531 }
532
533 let channels = channel::Entity::find()
534 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
535 .all(&*tx)
536 .await?;
537
538 let channels = channels
539 .into_iter()
540 .filter_map(|channel| Some(Channel::from_model(channel)))
541 .collect();
542
543 Ok(channels)
544 })
545 .await
546 }
547
548 pub async fn get_channel_memberships(
549 &self,
550 user_id: UserId,
551 ) -> Result<(Vec<channel_member::Model>, Vec<channel::Model>)> {
552 self.transaction(|tx| async move {
553 let memberships = channel_member::Entity::find()
554 .filter(channel_member::Column::UserId.eq(user_id))
555 .all(&*tx)
556 .await?;
557 let channels = self
558 .get_channel_descendants_including_self(
559 memberships.iter().map(|m| m.channel_id),
560 &*tx,
561 )
562 .await?;
563 Ok((memberships, channels))
564 })
565 .await
566 }
567
568 /// Returns all channels for the user with the given ID.
569 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
570 self.transaction(|tx| async move {
571 let tx = tx;
572
573 self.get_user_channels(user_id, None, &tx).await
574 })
575 .await
576 }
577
578 /// Returns all channels for the user with the given ID that are descendants
579 /// of the specified ancestor channel.
580 pub async fn get_user_channels(
581 &self,
582 user_id: UserId,
583 ancestor_channel: Option<&channel::Model>,
584 tx: &DatabaseTransaction,
585 ) -> Result<ChannelsForUser> {
586 let mut filter = channel_member::Column::UserId
587 .eq(user_id)
588 .and(channel_member::Column::Accepted.eq(true));
589
590 if let Some(ancestor) = ancestor_channel {
591 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
592 }
593
594 let channel_memberships = channel_member::Entity::find()
595 .filter(filter)
596 .all(&*tx)
597 .await?;
598
599 let descendants = self
600 .get_channel_descendants_including_self(
601 channel_memberships.iter().map(|m| m.channel_id),
602 &*tx,
603 )
604 .await?;
605
606 let roles_by_channel_id = channel_memberships
607 .iter()
608 .map(|membership| (membership.channel_id, membership.role))
609 .collect::<HashMap<_, _>>();
610
611 let channels: Vec<Channel> = descendants
612 .into_iter()
613 .filter_map(|channel| {
614 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
615 if parent_role.can_see_channel(channel.visibility) {
616 Some(Channel::from_model(channel))
617 } else {
618 None
619 }
620 })
621 .collect();
622
623 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
624 enum QueryUserIdsAndChannelIds {
625 ChannelId,
626 UserId,
627 }
628
629 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
630 {
631 let mut rows = room_participant::Entity::find()
632 .inner_join(room::Entity)
633 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
634 .select_only()
635 .column(room::Column::ChannelId)
636 .column(room_participant::Column::UserId)
637 .into_values::<_, QueryUserIdsAndChannelIds>()
638 .stream(&*tx)
639 .await?;
640 while let Some(row) = rows.next().await {
641 let row: (ChannelId, UserId) = row?;
642 channel_participants.entry(row.0).or_default().push(row.1)
643 }
644 }
645
646 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
647 let latest_buffer_versions = self
648 .latest_channel_buffer_changes(&channel_ids, &*tx)
649 .await?;
650
651 let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
652
653 Ok(ChannelsForUser {
654 channel_memberships,
655 channels,
656 channel_participants,
657 latest_buffer_versions,
658 latest_channel_messages: latest_messages,
659 })
660 }
661
662 /// Sets the role for the specified channel member.
663 pub async fn set_channel_member_role(
664 &self,
665 channel_id: ChannelId,
666 admin_id: UserId,
667 for_user: UserId,
668 role: ChannelRole,
669 ) -> Result<SetMemberRoleResult> {
670 self.transaction(|tx| async move {
671 let channel = self.get_channel_internal(channel_id, &*tx).await?;
672 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
673 .await?;
674
675 let membership = channel_member::Entity::find()
676 .filter(
677 channel_member::Column::ChannelId
678 .eq(channel_id)
679 .and(channel_member::Column::UserId.eq(for_user)),
680 )
681 .one(&*tx)
682 .await?;
683
684 let Some(membership) = membership else {
685 Err(anyhow!("no such member"))?
686 };
687
688 let mut update = membership.into_active_model();
689 update.role = ActiveValue::Set(role);
690 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
691
692 if updated.accepted {
693 Ok(SetMemberRoleResult::MembershipUpdated(
694 self.calculate_membership_updated(&channel, for_user, &*tx)
695 .await?,
696 ))
697 } else {
698 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
699 channel,
700 )))
701 }
702 })
703 .await
704 }
705
706 /// Returns the details for the specified channel member.
707 pub async fn get_channel_participant_details(
708 &self,
709 channel_id: ChannelId,
710 user_id: UserId,
711 ) -> Result<Vec<proto::ChannelMember>> {
712 let (role, members) = self
713 .transaction(move |tx| async move {
714 let channel = self.get_channel_internal(channel_id, &*tx).await?;
715 let role = self
716 .check_user_is_channel_participant(&channel, user_id, &*tx)
717 .await?;
718 Ok((
719 role,
720 self.get_channel_participant_details_internal(&channel, &*tx)
721 .await?,
722 ))
723 })
724 .await?;
725
726 if role == ChannelRole::Admin {
727 Ok(members
728 .into_iter()
729 .map(|channel_member| proto::ChannelMember {
730 role: channel_member.role.into(),
731 user_id: channel_member.user_id.to_proto(),
732 kind: if channel_member.accepted {
733 Kind::Member
734 } else {
735 Kind::Invitee
736 }
737 .into(),
738 })
739 .collect())
740 } else {
741 return Ok(members
742 .into_iter()
743 .filter_map(|member| {
744 if !member.accepted {
745 return None;
746 }
747 Some(proto::ChannelMember {
748 role: member.role.into(),
749 user_id: member.user_id.to_proto(),
750 kind: Kind::Member.into(),
751 })
752 })
753 .collect());
754 }
755 }
756
757 async fn get_channel_participant_details_internal(
758 &self,
759 channel: &channel::Model,
760 tx: &DatabaseTransaction,
761 ) -> Result<Vec<channel_member::Model>> {
762 Ok(channel_member::Entity::find()
763 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
764 .all(tx)
765 .await?)
766 }
767
768 /// Returns the participants in the given channel.
769 pub async fn get_channel_participants(
770 &self,
771 channel: &channel::Model,
772 tx: &DatabaseTransaction,
773 ) -> Result<Vec<UserId>> {
774 let participants = self
775 .get_channel_participant_details_internal(channel, &*tx)
776 .await?;
777 Ok(participants
778 .into_iter()
779 .map(|member| member.user_id)
780 .collect())
781 }
782
783 /// Returns whether the given user is an admin in the specified channel.
784 pub async fn check_user_is_channel_admin(
785 &self,
786 channel: &channel::Model,
787 user_id: UserId,
788 tx: &DatabaseTransaction,
789 ) -> Result<ChannelRole> {
790 let role = self.channel_role_for_user(channel, user_id, tx).await?;
791 match role {
792 Some(ChannelRole::Admin) => Ok(role.unwrap()),
793 Some(ChannelRole::Member)
794 | Some(ChannelRole::Banned)
795 | Some(ChannelRole::Guest)
796 | None => Err(anyhow!(
797 "user is not a channel admin or channel does not exist"
798 ))?,
799 }
800 }
801
802 /// Returns whether the given user is a member of the specified channel.
803 pub async fn check_user_is_channel_member(
804 &self,
805 channel: &channel::Model,
806 user_id: UserId,
807 tx: &DatabaseTransaction,
808 ) -> Result<ChannelRole> {
809 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
810 match channel_role {
811 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
812 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
813 "user is not a channel member or channel does not exist"
814 ))?,
815 }
816 }
817
818 /// Returns whether the given user is a participant in the specified channel.
819 pub async fn check_user_is_channel_participant(
820 &self,
821 channel: &channel::Model,
822 user_id: UserId,
823 tx: &DatabaseTransaction,
824 ) -> Result<ChannelRole> {
825 let role = self.channel_role_for_user(channel, user_id, tx).await?;
826 match role {
827 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
828 Ok(role.unwrap())
829 }
830 Some(ChannelRole::Banned) | None => Err(anyhow!(
831 "user is not a channel participant or channel does not exist"
832 ))?,
833 }
834 }
835
836 /// Returns a user's pending invite for the given channel, if one exists.
837 pub async fn pending_invite_for_channel(
838 &self,
839 channel: &channel::Model,
840 user_id: UserId,
841 tx: &DatabaseTransaction,
842 ) -> Result<Option<channel_member::Model>> {
843 let row = channel_member::Entity::find()
844 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
845 .filter(channel_member::Column::UserId.eq(user_id))
846 .filter(channel_member::Column::Accepted.eq(false))
847 .one(&*tx)
848 .await?;
849
850 Ok(row)
851 }
852
853 /// Returns the role for a user in the given channel.
854 pub async fn channel_role_for_user(
855 &self,
856 channel: &channel::Model,
857 user_id: UserId,
858 tx: &DatabaseTransaction,
859 ) -> Result<Option<ChannelRole>> {
860 let membership = channel_member::Entity::find()
861 .filter(
862 channel_member::Column::ChannelId
863 .eq(channel.root_id())
864 .and(channel_member::Column::UserId.eq(user_id))
865 .and(channel_member::Column::Accepted.eq(true)),
866 )
867 .one(&*tx)
868 .await?;
869
870 let Some(membership) = membership else {
871 return Ok(None);
872 };
873
874 if !membership.role.can_see_channel(channel.visibility) {
875 return Ok(None);
876 }
877
878 Ok(Some(membership.role))
879 }
880
881 // Get the descendants of the given set if channels, ordered by their
882 // path.
883 async fn get_channel_descendants_including_self(
884 &self,
885 channel_ids: impl IntoIterator<Item = ChannelId>,
886 tx: &DatabaseTransaction,
887 ) -> Result<Vec<channel::Model>> {
888 let mut values = String::new();
889 for id in channel_ids {
890 if !values.is_empty() {
891 values.push_str(", ");
892 }
893 write!(&mut values, "({})", id).unwrap();
894 }
895
896 if values.is_empty() {
897 return Ok(vec![]);
898 }
899
900 let sql = format!(
901 r#"
902 SELECT DISTINCT
903 descendant_channels.*,
904 descendant_channels.parent_path || descendant_channels.id as full_path
905 FROM
906 channels parent_channels, channels descendant_channels
907 WHERE
908 descendant_channels.id IN ({values}) OR
909 (
910 parent_channels.id IN ({values}) AND
911 descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
912 )
913 ORDER BY
914 full_path ASC
915 "#
916 );
917
918 Ok(channel::Entity::find()
919 .from_raw_sql(Statement::from_string(
920 self.pool.get_database_backend(),
921 sql,
922 ))
923 .all(tx)
924 .await?)
925 }
926
927 /// Returns the channel with the given ID.
928 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
929 self.transaction(|tx| async move {
930 let channel = self.get_channel_internal(channel_id, &*tx).await?;
931 self.check_user_is_channel_participant(&channel, user_id, &*tx)
932 .await?;
933
934 Ok(Channel::from_model(channel))
935 })
936 .await
937 }
938
939 pub(crate) async fn get_channel_internal(
940 &self,
941 channel_id: ChannelId,
942 tx: &DatabaseTransaction,
943 ) -> Result<channel::Model> {
944 Ok(channel::Entity::find_by_id(channel_id)
945 .one(&*tx)
946 .await?
947 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
948 }
949
950 pub(crate) async fn get_or_create_channel_room(
951 &self,
952 channel_id: ChannelId,
953 live_kit_room: &str,
954 environment: &str,
955 tx: &DatabaseTransaction,
956 ) -> Result<RoomId> {
957 let room = room::Entity::find()
958 .filter(room::Column::ChannelId.eq(channel_id))
959 .one(&*tx)
960 .await?;
961
962 let room_id = if let Some(room) = room {
963 if let Some(env) = room.environment {
964 if &env != environment {
965 Err(ErrorCode::WrongReleaseChannel
966 .with_tag("required", &env)
967 .anyhow())?;
968 }
969 }
970 room.id
971 } else {
972 let result = room::Entity::insert(room::ActiveModel {
973 channel_id: ActiveValue::Set(Some(channel_id)),
974 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
975 environment: ActiveValue::Set(Some(environment.to_string())),
976 ..Default::default()
977 })
978 .exec(&*tx)
979 .await?;
980
981 result.last_insert_id
982 };
983
984 Ok(room_id)
985 }
986
987 /// Move a channel from one parent to another
988 pub async fn move_channel(
989 &self,
990 channel_id: ChannelId,
991 new_parent_id: ChannelId,
992 admin_id: UserId,
993 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
994 self.transaction(|tx| async move {
995 let channel = self.get_channel_internal(channel_id, &*tx).await?;
996 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
997 .await?;
998 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
999
1000 if new_parent.root_id() != channel.root_id() {
1001 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
1002 }
1003
1004 if new_parent
1005 .ancestors_including_self()
1006 .any(|id| id == channel.id)
1007 {
1008 Err(anyhow!(ErrorCode::CircularNesting))?;
1009 }
1010
1011 if channel.visibility == ChannelVisibility::Public
1012 && new_parent.visibility != ChannelVisibility::Public
1013 {
1014 Err(anyhow!(ErrorCode::BadPublicNesting))?;
1015 }
1016
1017 let root_id = channel.root_id();
1018 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1019 let new_path = format!("{}{}/", new_parent.path(), channel.id);
1020
1021 let mut model = channel.into_active_model();
1022 model.parent_path = ActiveValue::Set(new_parent.path());
1023 let channel = model.update(&*tx).await?;
1024
1025 let descendent_ids =
1026 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1027 self.pool.get_database_backend(),
1028 "
1029 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1030 WHERE parent_path LIKE $3 || '%'
1031 RETURNING id
1032 ",
1033 [old_path.clone().into(), new_path.into(), old_path.into()],
1034 ))
1035 .all(&*tx)
1036 .await?;
1037
1038 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1039
1040 let channels = channel::Entity::find()
1041 .filter(channel::Column::Id.is_in(all_moved_ids))
1042 .all(&*tx)
1043 .await?
1044 .into_iter()
1045 .map(|c| Channel::from_model(c))
1046 .collect::<Vec<_>>();
1047
1048 let channel_members = channel_member::Entity::find()
1049 .filter(channel_member::Column::ChannelId.eq(root_id))
1050 .all(&*tx)
1051 .await?;
1052
1053 Ok((channels, channel_members))
1054 })
1055 .await
1056 }
1057}
1058
1059#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1060enum QueryIds {
1061 Id,
1062}
1063
1064#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1065enum QueryUserIds {
1066 UserId,
1067}