1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 /// Adds a user to the specified channel.
101 pub async fn join_channel(
102 &self,
103 channel_id: ChannelId,
104 user_id: UserId,
105 connection: ConnectionId,
106 environment: &str,
107 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
108 self.transaction(move |tx| async move {
109 let channel = self.get_channel_internal(channel_id, &*tx).await?;
110 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
111
112 let mut accept_invite_result = None;
113
114 if role.is_none() {
115 if let Some(invitation) = self
116 .pending_invite_for_channel(&channel, user_id, &*tx)
117 .await?
118 {
119 // note, this may be a parent channel
120 role = Some(invitation.role);
121 channel_member::Entity::update(channel_member::ActiveModel {
122 accepted: ActiveValue::Set(true),
123 ..invitation.into_active_model()
124 })
125 .exec(&*tx)
126 .await?;
127
128 accept_invite_result = Some(
129 self.calculate_membership_updated(&channel, user_id, &*tx)
130 .await?,
131 );
132
133 debug_assert!(
134 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
135 );
136 } else if channel.visibility == ChannelVisibility::Public {
137 role = Some(ChannelRole::Guest);
138 channel_member::Entity::insert(channel_member::ActiveModel {
139 id: ActiveValue::NotSet,
140 channel_id: ActiveValue::Set(channel.root_id()),
141 user_id: ActiveValue::Set(user_id),
142 accepted: ActiveValue::Set(true),
143 role: ActiveValue::Set(ChannelRole::Guest),
144 })
145 .exec(&*tx)
146 .await?;
147
148 accept_invite_result = Some(
149 self.calculate_membership_updated(&channel, user_id, &*tx)
150 .await?,
151 );
152
153 debug_assert!(
154 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
155 );
156 }
157 }
158
159 if role.is_none() || role == Some(ChannelRole::Banned) {
160 Err(ErrorCode::Forbidden.anyhow())?
161 }
162 let role = role.unwrap();
163
164 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
165 let room_id = self
166 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
167 .await?;
168
169 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
170 .await
171 .map(|jr| (jr, accept_invite_result, role))
172 })
173 .await
174 }
175
176 /// Sets the visibility of the given channel.
177 pub async fn set_channel_visibility(
178 &self,
179 channel_id: ChannelId,
180 visibility: ChannelVisibility,
181 admin_id: UserId,
182 ) -> Result<(Channel, Vec<channel_member::Model>)> {
183 self.transaction(move |tx| async move {
184 let channel = self.get_channel_internal(channel_id, &*tx).await?;
185 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
186 .await?;
187
188 if visibility == ChannelVisibility::Public {
189 if let Some(parent_id) = channel.parent_id() {
190 let parent = self.get_channel_internal(parent_id, &*tx).await?;
191
192 if parent.visibility != ChannelVisibility::Public {
193 Err(ErrorCode::BadPublicNesting
194 .with_tag("direction", "parent")
195 .anyhow())?;
196 }
197 }
198 } else if visibility == ChannelVisibility::Members {
199 if self
200 .get_channel_descendants_excluding_self([&channel], &*tx)
201 .await?
202 .into_iter()
203 .any(|channel| channel.visibility == ChannelVisibility::Public)
204 {
205 Err(ErrorCode::BadPublicNesting
206 .with_tag("direction", "children")
207 .anyhow())?;
208 }
209 }
210
211 let mut model = channel.into_active_model();
212 model.visibility = ActiveValue::Set(visibility);
213 let channel = model.update(&*tx).await?;
214
215 let channel_members = channel_member::Entity::find()
216 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
217 .all(&*tx)
218 .await?;
219
220 Ok((Channel::from_model(channel), channel_members))
221 })
222 .await
223 }
224
225 #[cfg(test)]
226 pub async fn set_channel_requires_zed_cla(
227 &self,
228 channel_id: ChannelId,
229 requires_zed_cla: bool,
230 ) -> Result<()> {
231 self.transaction(move |tx| async move {
232 let channel = self.get_channel_internal(channel_id, &*tx).await?;
233 let mut model = channel.into_active_model();
234 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
235 model.update(&*tx).await?;
236 Ok(())
237 })
238 .await
239 }
240
241 /// Deletes the channel with the specified ID.
242 pub async fn delete_channel(
243 &self,
244 channel_id: ChannelId,
245 user_id: UserId,
246 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
247 self.transaction(move |tx| async move {
248 let channel = self.get_channel_internal(channel_id, &*tx).await?;
249 self.check_user_is_channel_admin(&channel, user_id, &*tx)
250 .await?;
251
252 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
253 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
254 .select_only()
255 .column(channel_member::Column::UserId)
256 .distinct()
257 .into_values::<_, QueryUserIds>()
258 .all(&*tx)
259 .await?;
260
261 let channels_to_remove = self
262 .get_channel_descendants_excluding_self([&channel], &*tx)
263 .await?
264 .into_iter()
265 .map(|channel| channel.id)
266 .chain(Some(channel_id))
267 .collect::<Vec<_>>();
268
269 channel::Entity::delete_many()
270 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
271 .exec(&*tx)
272 .await?;
273
274 Ok((channels_to_remove, members_to_notify))
275 })
276 .await
277 }
278
279 /// Invites a user to a channel as a member.
280 pub async fn invite_channel_member(
281 &self,
282 channel_id: ChannelId,
283 invitee_id: UserId,
284 inviter_id: UserId,
285 role: ChannelRole,
286 ) -> Result<InviteMemberResult> {
287 self.transaction(move |tx| async move {
288 let channel = self.get_channel_internal(channel_id, &*tx).await?;
289 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
290 .await?;
291 if !channel.is_root() {
292 Err(ErrorCode::NotARootChannel.anyhow())?
293 }
294
295 channel_member::ActiveModel {
296 id: ActiveValue::NotSet,
297 channel_id: ActiveValue::Set(channel_id),
298 user_id: ActiveValue::Set(invitee_id),
299 accepted: ActiveValue::Set(false),
300 role: ActiveValue::Set(role),
301 }
302 .insert(&*tx)
303 .await?;
304
305 let channel = Channel::from_model(channel);
306
307 let notifications = self
308 .create_notification(
309 invitee_id,
310 rpc::Notification::ChannelInvitation {
311 channel_id: channel_id.to_proto(),
312 channel_name: channel.name.clone(),
313 inviter_id: inviter_id.to_proto(),
314 },
315 true,
316 &*tx,
317 )
318 .await?
319 .into_iter()
320 .collect();
321
322 Ok(InviteMemberResult {
323 channel,
324 notifications,
325 })
326 })
327 .await
328 }
329
330 fn sanitize_channel_name(name: &str) -> Result<&str> {
331 let new_name = name.trim().trim_start_matches('#');
332 if new_name == "" {
333 Err(anyhow!("channel name can't be blank"))?;
334 }
335 Ok(new_name)
336 }
337
338 /// Renames the specified channel.
339 pub async fn rename_channel(
340 &self,
341 channel_id: ChannelId,
342 admin_id: UserId,
343 new_name: &str,
344 ) -> Result<(Channel, Vec<channel_member::Model>)> {
345 self.transaction(move |tx| async move {
346 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
347
348 let channel = self.get_channel_internal(channel_id, &*tx).await?;
349 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
350 .await?;
351
352 let mut model = channel.into_active_model();
353 model.name = ActiveValue::Set(new_name.clone());
354 let channel = model.update(&*tx).await?;
355
356 let channel_members = channel_member::Entity::find()
357 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
358 .all(&*tx)
359 .await?;
360
361 Ok((Channel::from_model(channel), channel_members))
362 })
363 .await
364 }
365
366 /// accept or decline an invite to join a channel
367 pub async fn respond_to_channel_invite(
368 &self,
369 channel_id: ChannelId,
370 user_id: UserId,
371 accept: bool,
372 ) -> Result<RespondToChannelInvite> {
373 self.transaction(move |tx| async move {
374 let channel = self.get_channel_internal(channel_id, &*tx).await?;
375
376 let membership_update = if accept {
377 let rows_affected = channel_member::Entity::update_many()
378 .set(channel_member::ActiveModel {
379 accepted: ActiveValue::Set(accept),
380 ..Default::default()
381 })
382 .filter(
383 channel_member::Column::ChannelId
384 .eq(channel_id)
385 .and(channel_member::Column::UserId.eq(user_id))
386 .and(channel_member::Column::Accepted.eq(false)),
387 )
388 .exec(&*tx)
389 .await?
390 .rows_affected;
391
392 if rows_affected == 0 {
393 Err(anyhow!("no such invitation"))?;
394 }
395
396 Some(
397 self.calculate_membership_updated(&channel, user_id, &*tx)
398 .await?,
399 )
400 } else {
401 let rows_affected = channel_member::Entity::delete_many()
402 .filter(
403 channel_member::Column::ChannelId
404 .eq(channel_id)
405 .and(channel_member::Column::UserId.eq(user_id))
406 .and(channel_member::Column::Accepted.eq(false)),
407 )
408 .exec(&*tx)
409 .await?
410 .rows_affected;
411 if rows_affected == 0 {
412 Err(anyhow!("no such invitation"))?;
413 }
414
415 None
416 };
417
418 Ok(RespondToChannelInvite {
419 membership_update,
420 notifications: self
421 .mark_notification_as_read_with_response(
422 user_id,
423 &rpc::Notification::ChannelInvitation {
424 channel_id: channel_id.to_proto(),
425 channel_name: Default::default(),
426 inviter_id: Default::default(),
427 },
428 accept,
429 &*tx,
430 )
431 .await?
432 .into_iter()
433 .collect(),
434 })
435 })
436 .await
437 }
438
439 async fn calculate_membership_updated(
440 &self,
441 channel: &channel::Model,
442 user_id: UserId,
443 tx: &DatabaseTransaction,
444 ) -> Result<MembershipUpdated> {
445 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
446 let removed_channels = self
447 .get_channel_descendants_excluding_self([channel], &*tx)
448 .await?
449 .into_iter()
450 .map(|channel| channel.id)
451 .chain([channel.id])
452 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
453 .collect::<Vec<_>>();
454
455 Ok(MembershipUpdated {
456 channel_id: channel.id,
457 new_channels,
458 removed_channels,
459 })
460 }
461
462 /// Removes a channel member.
463 pub async fn remove_channel_member(
464 &self,
465 channel_id: ChannelId,
466 member_id: UserId,
467 admin_id: UserId,
468 ) -> Result<RemoveChannelMemberResult> {
469 self.transaction(|tx| async move {
470 let channel = self.get_channel_internal(channel_id, &*tx).await?;
471
472 if member_id != admin_id {
473 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
474 .await?;
475 }
476
477 let result = channel_member::Entity::delete_many()
478 .filter(
479 channel_member::Column::ChannelId
480 .eq(channel_id)
481 .and(channel_member::Column::UserId.eq(member_id)),
482 )
483 .exec(&*tx)
484 .await?;
485
486 if result.rows_affected == 0 {
487 Err(anyhow!("no such member"))?;
488 }
489
490 Ok(RemoveChannelMemberResult {
491 membership_update: self
492 .calculate_membership_updated(&channel, member_id, &*tx)
493 .await?,
494 notification_id: self
495 .remove_notification(
496 member_id,
497 rpc::Notification::ChannelInvitation {
498 channel_id: channel_id.to_proto(),
499 channel_name: Default::default(),
500 inviter_id: Default::default(),
501 },
502 &*tx,
503 )
504 .await?,
505 })
506 })
507 .await
508 }
509
510 /// Returns all channel invites for the user with the given ID.
511 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
512 self.transaction(|tx| async move {
513 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
514
515 let channel_invites = channel_member::Entity::find()
516 .filter(
517 channel_member::Column::UserId
518 .eq(user_id)
519 .and(channel_member::Column::Accepted.eq(false)),
520 )
521 .all(&*tx)
522 .await?;
523
524 for invite in channel_invites {
525 role_for_channel.insert(invite.channel_id, invite.role);
526 }
527
528 let channels = channel::Entity::find()
529 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
530 .all(&*tx)
531 .await?;
532
533 let channels = channels
534 .into_iter()
535 .filter_map(|channel| Some(Channel::from_model(channel)))
536 .collect();
537
538 Ok(channels)
539 })
540 .await
541 }
542
543 /// Returns all channels for the user with the given ID.
544 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
545 self.transaction(|tx| async move {
546 let tx = tx;
547
548 self.get_user_channels(user_id, None, &tx).await
549 })
550 .await
551 }
552
553 /// Returns all channels for the user with the given ID that are descendants
554 /// of the specified ancestor channel.
555 pub async fn get_user_channels(
556 &self,
557 user_id: UserId,
558 ancestor_channel: Option<&channel::Model>,
559 tx: &DatabaseTransaction,
560 ) -> Result<ChannelsForUser> {
561 let mut filter = channel_member::Column::UserId
562 .eq(user_id)
563 .and(channel_member::Column::Accepted.eq(true));
564
565 if let Some(ancestor) = ancestor_channel {
566 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
567 }
568
569 let channel_memberships = channel_member::Entity::find()
570 .filter(filter)
571 .all(&*tx)
572 .await?;
573
574 let channels = channel::Entity::find()
575 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
576 .all(&*tx)
577 .await?;
578
579 let mut descendants = self
580 .get_channel_descendants_excluding_self(channels.iter(), &*tx)
581 .await?;
582
583 for channel in channels {
584 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
585 descendants.insert(ix, channel);
586 }
587 }
588
589 let roles_by_channel_id = channel_memberships
590 .iter()
591 .map(|membership| (membership.channel_id, membership.role))
592 .collect::<HashMap<_, _>>();
593
594 let channels: Vec<Channel> = descendants
595 .into_iter()
596 .filter_map(|channel| {
597 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
598 if parent_role.can_see_channel(channel.visibility) {
599 Some(Channel::from_model(channel))
600 } else {
601 None
602 }
603 })
604 .collect();
605
606 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
607 enum QueryUserIdsAndChannelIds {
608 ChannelId,
609 UserId,
610 }
611
612 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
613 {
614 let mut rows = room_participant::Entity::find()
615 .inner_join(room::Entity)
616 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
617 .select_only()
618 .column(room::Column::ChannelId)
619 .column(room_participant::Column::UserId)
620 .into_values::<_, QueryUserIdsAndChannelIds>()
621 .stream(&*tx)
622 .await?;
623 while let Some(row) = rows.next().await {
624 let row: (ChannelId, UserId) = row?;
625 channel_participants.entry(row.0).or_default().push(row.1)
626 }
627 }
628
629 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
630 let latest_buffer_versions = self
631 .latest_channel_buffer_changes(&channel_ids, &*tx)
632 .await?;
633
634 let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
635
636 Ok(ChannelsForUser {
637 channel_memberships,
638 channels,
639 channel_participants,
640 latest_buffer_versions,
641 latest_channel_messages: latest_messages,
642 })
643 }
644
645 /// Sets the role for the specified channel member.
646 pub async fn set_channel_member_role(
647 &self,
648 channel_id: ChannelId,
649 admin_id: UserId,
650 for_user: UserId,
651 role: ChannelRole,
652 ) -> Result<SetMemberRoleResult> {
653 self.transaction(|tx| async move {
654 let channel = self.get_channel_internal(channel_id, &*tx).await?;
655 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
656 .await?;
657
658 let membership = channel_member::Entity::find()
659 .filter(
660 channel_member::Column::ChannelId
661 .eq(channel_id)
662 .and(channel_member::Column::UserId.eq(for_user)),
663 )
664 .one(&*tx)
665 .await?;
666
667 let Some(membership) = membership else {
668 Err(anyhow!("no such member"))?
669 };
670
671 let mut update = membership.into_active_model();
672 update.role = ActiveValue::Set(role);
673 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
674
675 if updated.accepted {
676 Ok(SetMemberRoleResult::MembershipUpdated(
677 self.calculate_membership_updated(&channel, for_user, &*tx)
678 .await?,
679 ))
680 } else {
681 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
682 channel,
683 )))
684 }
685 })
686 .await
687 }
688
689 /// Returns the details for the specified channel member.
690 pub async fn get_channel_participant_details(
691 &self,
692 channel_id: ChannelId,
693 user_id: UserId,
694 ) -> Result<Vec<proto::ChannelMember>> {
695 let (role, members) = self
696 .transaction(move |tx| async move {
697 let channel = self.get_channel_internal(channel_id, &*tx).await?;
698 let role = self
699 .check_user_is_channel_participant(&channel, user_id, &*tx)
700 .await?;
701 Ok((
702 role,
703 self.get_channel_participant_details_internal(&channel, &*tx)
704 .await?,
705 ))
706 })
707 .await?;
708
709 if role == ChannelRole::Admin {
710 Ok(members
711 .into_iter()
712 .map(|channel_member| proto::ChannelMember {
713 role: channel_member.role.into(),
714 user_id: channel_member.user_id.to_proto(),
715 kind: if channel_member.accepted {
716 Kind::Member
717 } else {
718 Kind::Invitee
719 }
720 .into(),
721 })
722 .collect())
723 } else {
724 return Ok(members
725 .into_iter()
726 .filter_map(|member| {
727 if !member.accepted {
728 return None;
729 }
730 Some(proto::ChannelMember {
731 role: member.role.into(),
732 user_id: member.user_id.to_proto(),
733 kind: Kind::Member.into(),
734 })
735 })
736 .collect());
737 }
738 }
739
740 async fn get_channel_participant_details_internal(
741 &self,
742 channel: &channel::Model,
743 tx: &DatabaseTransaction,
744 ) -> Result<Vec<channel_member::Model>> {
745 Ok(channel_member::Entity::find()
746 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
747 .all(tx)
748 .await?)
749 }
750
751 /// Returns the participants in the given channel.
752 pub async fn get_channel_participants(
753 &self,
754 channel: &channel::Model,
755 tx: &DatabaseTransaction,
756 ) -> Result<Vec<UserId>> {
757 let participants = self
758 .get_channel_participant_details_internal(channel, &*tx)
759 .await?;
760 Ok(participants
761 .into_iter()
762 .map(|member| member.user_id)
763 .collect())
764 }
765
766 /// Returns whether the given user is an admin in the specified channel.
767 pub async fn check_user_is_channel_admin(
768 &self,
769 channel: &channel::Model,
770 user_id: UserId,
771 tx: &DatabaseTransaction,
772 ) -> Result<ChannelRole> {
773 let role = self.channel_role_for_user(channel, user_id, tx).await?;
774 match role {
775 Some(ChannelRole::Admin) => Ok(role.unwrap()),
776 Some(ChannelRole::Member)
777 | Some(ChannelRole::Banned)
778 | Some(ChannelRole::Guest)
779 | None => Err(anyhow!(
780 "user is not a channel admin or channel does not exist"
781 ))?,
782 }
783 }
784
785 /// Returns whether the given user is a member of the specified channel.
786 pub async fn check_user_is_channel_member(
787 &self,
788 channel: &channel::Model,
789 user_id: UserId,
790 tx: &DatabaseTransaction,
791 ) -> Result<ChannelRole> {
792 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
793 match channel_role {
794 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
795 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
796 "user is not a channel member or channel does not exist"
797 ))?,
798 }
799 }
800
801 /// Returns whether the given user is a participant in the specified channel.
802 pub async fn check_user_is_channel_participant(
803 &self,
804 channel: &channel::Model,
805 user_id: UserId,
806 tx: &DatabaseTransaction,
807 ) -> Result<ChannelRole> {
808 let role = self.channel_role_for_user(channel, user_id, tx).await?;
809 match role {
810 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
811 Ok(role.unwrap())
812 }
813 Some(ChannelRole::Banned) | None => Err(anyhow!(
814 "user is not a channel participant or channel does not exist"
815 ))?,
816 }
817 }
818
819 /// Returns a user's pending invite for the given channel, if one exists.
820 pub async fn pending_invite_for_channel(
821 &self,
822 channel: &channel::Model,
823 user_id: UserId,
824 tx: &DatabaseTransaction,
825 ) -> Result<Option<channel_member::Model>> {
826 let row = channel_member::Entity::find()
827 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
828 .filter(channel_member::Column::UserId.eq(user_id))
829 .filter(channel_member::Column::Accepted.eq(false))
830 .one(&*tx)
831 .await?;
832
833 Ok(row)
834 }
835
836 /// Returns the role for a user in the given channel.
837 pub async fn channel_role_for_user(
838 &self,
839 channel: &channel::Model,
840 user_id: UserId,
841 tx: &DatabaseTransaction,
842 ) -> Result<Option<ChannelRole>> {
843 let membership = channel_member::Entity::find()
844 .filter(
845 channel_member::Column::ChannelId
846 .eq(channel.root_id())
847 .and(channel_member::Column::UserId.eq(user_id))
848 .and(channel_member::Column::Accepted.eq(true)),
849 )
850 .one(&*tx)
851 .await?;
852
853 let Some(membership) = membership else {
854 return Ok(None);
855 };
856
857 if !membership.role.can_see_channel(channel.visibility) {
858 return Ok(None);
859 }
860
861 Ok(Some(membership.role))
862 }
863
864 // Get the descendants of the given set if channels, ordered by their
865 // path.
866 pub(crate) async fn get_channel_descendants_excluding_self(
867 &self,
868 channels: impl IntoIterator<Item = &channel::Model>,
869 tx: &DatabaseTransaction,
870 ) -> Result<Vec<channel::Model>> {
871 let mut filter = Condition::any();
872 for channel in channels.into_iter() {
873 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
874 }
875
876 if filter.is_empty() {
877 return Ok(vec![]);
878 }
879
880 Ok(channel::Entity::find()
881 .filter(filter)
882 .order_by_asc(Expr::cust("parent_path || id || '/'"))
883 .all(tx)
884 .await?)
885 }
886
887 /// Returns the channel with the given ID.
888 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
889 self.transaction(|tx| async move {
890 let channel = self.get_channel_internal(channel_id, &*tx).await?;
891 self.check_user_is_channel_participant(&channel, user_id, &*tx)
892 .await?;
893
894 Ok(Channel::from_model(channel))
895 })
896 .await
897 }
898
899 pub(crate) async fn get_channel_internal(
900 &self,
901 channel_id: ChannelId,
902 tx: &DatabaseTransaction,
903 ) -> Result<channel::Model> {
904 Ok(channel::Entity::find_by_id(channel_id)
905 .one(&*tx)
906 .await?
907 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
908 }
909
910 pub(crate) async fn get_or_create_channel_room(
911 &self,
912 channel_id: ChannelId,
913 live_kit_room: &str,
914 environment: &str,
915 tx: &DatabaseTransaction,
916 ) -> Result<RoomId> {
917 let room = room::Entity::find()
918 .filter(room::Column::ChannelId.eq(channel_id))
919 .one(&*tx)
920 .await?;
921
922 let room_id = if let Some(room) = room {
923 if let Some(env) = room.environment {
924 if &env != environment {
925 Err(ErrorCode::WrongReleaseChannel
926 .with_tag("required", &env)
927 .anyhow())?;
928 }
929 }
930 room.id
931 } else {
932 let result = room::Entity::insert(room::ActiveModel {
933 channel_id: ActiveValue::Set(Some(channel_id)),
934 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
935 environment: ActiveValue::Set(Some(environment.to_string())),
936 ..Default::default()
937 })
938 .exec(&*tx)
939 .await?;
940
941 result.last_insert_id
942 };
943
944 Ok(room_id)
945 }
946
947 /// Move a channel from one parent to another
948 pub async fn move_channel(
949 &self,
950 channel_id: ChannelId,
951 new_parent_id: ChannelId,
952 admin_id: UserId,
953 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
954 self.transaction(|tx| async move {
955 let channel = self.get_channel_internal(channel_id, &*tx).await?;
956 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
957 .await?;
958 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
959
960 if new_parent.root_id() != channel.root_id() {
961 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
962 }
963
964 if new_parent
965 .ancestors_including_self()
966 .any(|id| id == channel.id)
967 {
968 Err(anyhow!(ErrorCode::CircularNesting))?;
969 }
970
971 if channel.visibility == ChannelVisibility::Public
972 && new_parent.visibility != ChannelVisibility::Public
973 {
974 Err(anyhow!(ErrorCode::BadPublicNesting))?;
975 }
976
977 let root_id = channel.root_id();
978 let old_path = format!("{}{}/", channel.parent_path, channel.id);
979 let new_path = format!("{}{}/", new_parent.path(), channel.id);
980
981 let mut model = channel.into_active_model();
982 model.parent_path = ActiveValue::Set(new_parent.path());
983 let channel = model.update(&*tx).await?;
984
985 let descendent_ids =
986 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
987 self.pool.get_database_backend(),
988 "
989 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
990 WHERE parent_path LIKE $3 || '%'
991 RETURNING id
992 ",
993 [old_path.clone().into(), new_path.into(), old_path.into()],
994 ))
995 .all(&*tx)
996 .await?;
997
998 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
999
1000 let channels = channel::Entity::find()
1001 .filter(channel::Column::Id.is_in(all_moved_ids))
1002 .all(&*tx)
1003 .await?
1004 .into_iter()
1005 .map(|c| Channel::from_model(c))
1006 .collect::<Vec<_>>();
1007
1008 let channel_members = channel_member::Entity::find()
1009 .filter(channel_member::Column::ChannelId.eq(root_id))
1010 .all(&*tx)
1011 .await?;
1012
1013 Ok((channels, channel_members))
1014 })
1015 .await
1016 }
1017}
1018
1019#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1020enum QueryIds {
1021 Id,
1022}
1023
1024#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1025enum QueryUserIds {
1026 UserId,
1027}