1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 /// Adds a user to the specified channel.
101 pub async fn join_channel(
102 &self,
103 channel_id: ChannelId,
104 user_id: UserId,
105 connection: ConnectionId,
106 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
107 self.transaction(move |tx| async move {
108 let channel = self.get_channel_internal(channel_id, &*tx).await?;
109 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
110
111 let mut accept_invite_result = None;
112
113 if role.is_none() {
114 if let Some(invitation) = self
115 .pending_invite_for_channel(&channel, user_id, &*tx)
116 .await?
117 {
118 // note, this may be a parent channel
119 role = Some(invitation.role);
120 channel_member::Entity::update(channel_member::ActiveModel {
121 accepted: ActiveValue::Set(true),
122 ..invitation.into_active_model()
123 })
124 .exec(&*tx)
125 .await?;
126
127 accept_invite_result = Some(
128 self.calculate_membership_updated(&channel, user_id, &*tx)
129 .await?,
130 );
131
132 debug_assert!(
133 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
134 );
135 } else if channel.visibility == ChannelVisibility::Public {
136 role = Some(ChannelRole::Guest);
137 channel_member::Entity::insert(channel_member::ActiveModel {
138 id: ActiveValue::NotSet,
139 channel_id: ActiveValue::Set(channel.root_id()),
140 user_id: ActiveValue::Set(user_id),
141 accepted: ActiveValue::Set(true),
142 role: ActiveValue::Set(ChannelRole::Guest),
143 })
144 .exec(&*tx)
145 .await?;
146
147 accept_invite_result = Some(
148 self.calculate_membership_updated(&channel, user_id, &*tx)
149 .await?,
150 );
151
152 debug_assert!(
153 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
154 );
155 }
156 }
157
158 if role.is_none() || role == Some(ChannelRole::Banned) {
159 Err(ErrorCode::Forbidden.anyhow())?
160 }
161 let role = role.unwrap();
162
163 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
164 let room_id = self
165 .get_or_create_channel_room(channel_id, &live_kit_room, &*tx)
166 .await?;
167
168 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
169 .await
170 .map(|jr| (jr, accept_invite_result, role))
171 })
172 .await
173 }
174
175 /// Sets the visibility of the given channel.
176 pub async fn set_channel_visibility(
177 &self,
178 channel_id: ChannelId,
179 visibility: ChannelVisibility,
180 admin_id: UserId,
181 ) -> Result<(Channel, Vec<channel_member::Model>)> {
182 self.transaction(move |tx| async move {
183 let channel = self.get_channel_internal(channel_id, &*tx).await?;
184 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
185 .await?;
186
187 if visibility == ChannelVisibility::Public {
188 if let Some(parent_id) = channel.parent_id() {
189 let parent = self.get_channel_internal(parent_id, &*tx).await?;
190
191 if parent.visibility != ChannelVisibility::Public {
192 Err(ErrorCode::BadPublicNesting
193 .with_tag("direction", "parent")
194 .anyhow())?;
195 }
196 }
197 } else if visibility == ChannelVisibility::Members {
198 if self
199 .get_channel_descendants_excluding_self([&channel], &*tx)
200 .await?
201 .into_iter()
202 .any(|channel| channel.visibility == ChannelVisibility::Public)
203 {
204 Err(ErrorCode::BadPublicNesting
205 .with_tag("direction", "children")
206 .anyhow())?;
207 }
208 }
209
210 let mut model = channel.into_active_model();
211 model.visibility = ActiveValue::Set(visibility);
212 let channel = model.update(&*tx).await?;
213
214 let channel_members = channel_member::Entity::find()
215 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
216 .all(&*tx)
217 .await?;
218
219 Ok((Channel::from_model(channel), channel_members))
220 })
221 .await
222 }
223
224 #[cfg(test)]
225 pub async fn set_channel_requires_zed_cla(
226 &self,
227 channel_id: ChannelId,
228 requires_zed_cla: bool,
229 ) -> Result<()> {
230 self.transaction(move |tx| async move {
231 let channel = self.get_channel_internal(channel_id, &*tx).await?;
232 let mut model = channel.into_active_model();
233 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
234 model.update(&*tx).await?;
235 Ok(())
236 })
237 .await
238 }
239
240 /// Deletes the channel with the specified ID.
241 pub async fn delete_channel(
242 &self,
243 channel_id: ChannelId,
244 user_id: UserId,
245 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
246 self.transaction(move |tx| async move {
247 let channel = self.get_channel_internal(channel_id, &*tx).await?;
248 self.check_user_is_channel_admin(&channel, user_id, &*tx)
249 .await?;
250
251 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
252 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
253 .select_only()
254 .column(channel_member::Column::UserId)
255 .distinct()
256 .into_values::<_, QueryUserIds>()
257 .all(&*tx)
258 .await?;
259
260 let channels_to_remove = self
261 .get_channel_descendants_excluding_self([&channel], &*tx)
262 .await?
263 .into_iter()
264 .map(|channel| channel.id)
265 .chain(Some(channel_id))
266 .collect::<Vec<_>>();
267
268 channel::Entity::delete_many()
269 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
270 .exec(&*tx)
271 .await?;
272
273 Ok((channels_to_remove, members_to_notify))
274 })
275 .await
276 }
277
278 /// Invites a user to a channel as a member.
279 pub async fn invite_channel_member(
280 &self,
281 channel_id: ChannelId,
282 invitee_id: UserId,
283 inviter_id: UserId,
284 role: ChannelRole,
285 ) -> Result<InviteMemberResult> {
286 self.transaction(move |tx| async move {
287 let channel = self.get_channel_internal(channel_id, &*tx).await?;
288 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
289 .await?;
290 if !channel.is_root() {
291 Err(ErrorCode::NotARootChannel.anyhow())?
292 }
293
294 channel_member::ActiveModel {
295 id: ActiveValue::NotSet,
296 channel_id: ActiveValue::Set(channel_id),
297 user_id: ActiveValue::Set(invitee_id),
298 accepted: ActiveValue::Set(false),
299 role: ActiveValue::Set(role),
300 }
301 .insert(&*tx)
302 .await?;
303
304 let channel = Channel::from_model(channel);
305
306 let notifications = self
307 .create_notification(
308 invitee_id,
309 rpc::Notification::ChannelInvitation {
310 channel_id: channel_id.to_proto(),
311 channel_name: channel.name.clone(),
312 inviter_id: inviter_id.to_proto(),
313 },
314 true,
315 &*tx,
316 )
317 .await?
318 .into_iter()
319 .collect();
320
321 Ok(InviteMemberResult {
322 channel,
323 notifications,
324 })
325 })
326 .await
327 }
328
329 fn sanitize_channel_name(name: &str) -> Result<&str> {
330 let new_name = name.trim().trim_start_matches('#');
331 if new_name == "" {
332 Err(anyhow!("channel name can't be blank"))?;
333 }
334 Ok(new_name)
335 }
336
337 /// Renames the specified channel.
338 pub async fn rename_channel(
339 &self,
340 channel_id: ChannelId,
341 admin_id: UserId,
342 new_name: &str,
343 ) -> Result<(Channel, Vec<channel_member::Model>)> {
344 self.transaction(move |tx| async move {
345 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
346
347 let channel = self.get_channel_internal(channel_id, &*tx).await?;
348 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
349 .await?;
350
351 let mut model = channel.into_active_model();
352 model.name = ActiveValue::Set(new_name.clone());
353 let channel = model.update(&*tx).await?;
354
355 let channel_members = channel_member::Entity::find()
356 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
357 .all(&*tx)
358 .await?;
359
360 Ok((Channel::from_model(channel), channel_members))
361 })
362 .await
363 }
364
365 /// accept or decline an invite to join a channel
366 pub async fn respond_to_channel_invite(
367 &self,
368 channel_id: ChannelId,
369 user_id: UserId,
370 accept: bool,
371 ) -> Result<RespondToChannelInvite> {
372 self.transaction(move |tx| async move {
373 let channel = self.get_channel_internal(channel_id, &*tx).await?;
374
375 let membership_update = if accept {
376 let rows_affected = channel_member::Entity::update_many()
377 .set(channel_member::ActiveModel {
378 accepted: ActiveValue::Set(accept),
379 ..Default::default()
380 })
381 .filter(
382 channel_member::Column::ChannelId
383 .eq(channel_id)
384 .and(channel_member::Column::UserId.eq(user_id))
385 .and(channel_member::Column::Accepted.eq(false)),
386 )
387 .exec(&*tx)
388 .await?
389 .rows_affected;
390
391 if rows_affected == 0 {
392 Err(anyhow!("no such invitation"))?;
393 }
394
395 Some(
396 self.calculate_membership_updated(&channel, user_id, &*tx)
397 .await?,
398 )
399 } else {
400 let rows_affected = channel_member::Entity::delete_many()
401 .filter(
402 channel_member::Column::ChannelId
403 .eq(channel_id)
404 .and(channel_member::Column::UserId.eq(user_id))
405 .and(channel_member::Column::Accepted.eq(false)),
406 )
407 .exec(&*tx)
408 .await?
409 .rows_affected;
410 if rows_affected == 0 {
411 Err(anyhow!("no such invitation"))?;
412 }
413
414 None
415 };
416
417 Ok(RespondToChannelInvite {
418 membership_update,
419 notifications: self
420 .mark_notification_as_read_with_response(
421 user_id,
422 &rpc::Notification::ChannelInvitation {
423 channel_id: channel_id.to_proto(),
424 channel_name: Default::default(),
425 inviter_id: Default::default(),
426 },
427 accept,
428 &*tx,
429 )
430 .await?
431 .into_iter()
432 .collect(),
433 })
434 })
435 .await
436 }
437
438 async fn calculate_membership_updated(
439 &self,
440 channel: &channel::Model,
441 user_id: UserId,
442 tx: &DatabaseTransaction,
443 ) -> Result<MembershipUpdated> {
444 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
445 let removed_channels = self
446 .get_channel_descendants_excluding_self([channel], &*tx)
447 .await?
448 .into_iter()
449 .map(|channel| channel.id)
450 .chain([channel.id])
451 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
452 .collect::<Vec<_>>();
453
454 Ok(MembershipUpdated {
455 channel_id: channel.id,
456 new_channels,
457 removed_channels,
458 })
459 }
460
461 /// Removes a channel member.
462 pub async fn remove_channel_member(
463 &self,
464 channel_id: ChannelId,
465 member_id: UserId,
466 admin_id: UserId,
467 ) -> Result<RemoveChannelMemberResult> {
468 self.transaction(|tx| async move {
469 let channel = self.get_channel_internal(channel_id, &*tx).await?;
470
471 if member_id != admin_id {
472 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
473 .await?;
474 }
475
476 let result = channel_member::Entity::delete_many()
477 .filter(
478 channel_member::Column::ChannelId
479 .eq(channel_id)
480 .and(channel_member::Column::UserId.eq(member_id)),
481 )
482 .exec(&*tx)
483 .await?;
484
485 if result.rows_affected == 0 {
486 Err(anyhow!("no such member"))?;
487 }
488
489 Ok(RemoveChannelMemberResult {
490 membership_update: self
491 .calculate_membership_updated(&channel, member_id, &*tx)
492 .await?,
493 notification_id: self
494 .remove_notification(
495 member_id,
496 rpc::Notification::ChannelInvitation {
497 channel_id: channel_id.to_proto(),
498 channel_name: Default::default(),
499 inviter_id: Default::default(),
500 },
501 &*tx,
502 )
503 .await?,
504 })
505 })
506 .await
507 }
508
509 /// Returns all channel invites for the user with the given ID.
510 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
511 self.transaction(|tx| async move {
512 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
513
514 let channel_invites = channel_member::Entity::find()
515 .filter(
516 channel_member::Column::UserId
517 .eq(user_id)
518 .and(channel_member::Column::Accepted.eq(false)),
519 )
520 .all(&*tx)
521 .await?;
522
523 for invite in channel_invites {
524 role_for_channel.insert(invite.channel_id, invite.role);
525 }
526
527 let channels = channel::Entity::find()
528 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
529 .all(&*tx)
530 .await?;
531
532 let channels = channels
533 .into_iter()
534 .filter_map(|channel| Some(Channel::from_model(channel)))
535 .collect();
536
537 Ok(channels)
538 })
539 .await
540 }
541
542 /// Returns all channels for the user with the given ID.
543 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
544 self.transaction(|tx| async move {
545 let tx = tx;
546
547 self.get_user_channels(user_id, None, &tx).await
548 })
549 .await
550 }
551
552 /// Returns all channels for the user with the given ID that are descendants
553 /// of the specified ancestor channel.
554 pub async fn get_user_channels(
555 &self,
556 user_id: UserId,
557 ancestor_channel: Option<&channel::Model>,
558 tx: &DatabaseTransaction,
559 ) -> Result<ChannelsForUser> {
560 let mut filter = channel_member::Column::UserId
561 .eq(user_id)
562 .and(channel_member::Column::Accepted.eq(true));
563
564 if let Some(ancestor) = ancestor_channel {
565 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
566 }
567
568 let channel_memberships = channel_member::Entity::find()
569 .filter(filter)
570 .all(&*tx)
571 .await?;
572
573 let channels = channel::Entity::find()
574 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
575 .all(&*tx)
576 .await?;
577
578 let mut descendants = self
579 .get_channel_descendants_excluding_self(channels.iter(), &*tx)
580 .await?;
581
582 for channel in channels {
583 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
584 descendants.insert(ix, channel);
585 }
586 }
587
588 let roles_by_channel_id = channel_memberships
589 .iter()
590 .map(|membership| (membership.channel_id, membership.role))
591 .collect::<HashMap<_, _>>();
592
593 let channels: Vec<Channel> = descendants
594 .into_iter()
595 .filter_map(|channel| {
596 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
597 if parent_role.can_see_channel(channel.visibility) {
598 Some(Channel::from_model(channel))
599 } else {
600 None
601 }
602 })
603 .collect();
604
605 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
606 enum QueryUserIdsAndChannelIds {
607 ChannelId,
608 UserId,
609 }
610
611 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
612 {
613 let mut rows = room_participant::Entity::find()
614 .inner_join(room::Entity)
615 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
616 .select_only()
617 .column(room::Column::ChannelId)
618 .column(room_participant::Column::UserId)
619 .into_values::<_, QueryUserIdsAndChannelIds>()
620 .stream(&*tx)
621 .await?;
622 while let Some(row) = rows.next().await {
623 let row: (ChannelId, UserId) = row?;
624 channel_participants.entry(row.0).or_default().push(row.1)
625 }
626 }
627
628 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
629
630 let mut channel_ids_by_buffer_id = HashMap::default();
631 let mut rows = buffer::Entity::find()
632 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
633 .stream(&*tx)
634 .await?;
635 while let Some(row) = rows.next().await {
636 let row = row?;
637 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
638 }
639 drop(rows);
640
641 let latest_buffer_versions = self
642 .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
643 .await?;
644
645 let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
646
647 let observed_buffer_versions = self
648 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
649 .await?;
650
651 let observed_channel_messages = self
652 .observed_channel_messages(&channel_ids, user_id, &*tx)
653 .await?;
654
655 Ok(ChannelsForUser {
656 channel_memberships,
657 channels,
658 channel_participants,
659 latest_buffer_versions,
660 latest_channel_messages,
661 observed_buffer_versions,
662 observed_channel_messages,
663 })
664 }
665
666 /// Sets the role for the specified channel member.
667 pub async fn set_channel_member_role(
668 &self,
669 channel_id: ChannelId,
670 admin_id: UserId,
671 for_user: UserId,
672 role: ChannelRole,
673 ) -> Result<SetMemberRoleResult> {
674 self.transaction(|tx| async move {
675 let channel = self.get_channel_internal(channel_id, &*tx).await?;
676 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
677 .await?;
678
679 let membership = channel_member::Entity::find()
680 .filter(
681 channel_member::Column::ChannelId
682 .eq(channel_id)
683 .and(channel_member::Column::UserId.eq(for_user)),
684 )
685 .one(&*tx)
686 .await?;
687
688 let Some(membership) = membership else {
689 Err(anyhow!("no such member"))?
690 };
691
692 let mut update = membership.into_active_model();
693 update.role = ActiveValue::Set(role);
694 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
695
696 if updated.accepted {
697 Ok(SetMemberRoleResult::MembershipUpdated(
698 self.calculate_membership_updated(&channel, for_user, &*tx)
699 .await?,
700 ))
701 } else {
702 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
703 channel,
704 )))
705 }
706 })
707 .await
708 }
709
710 /// Returns the details for the specified channel member.
711 pub async fn get_channel_participant_details(
712 &self,
713 channel_id: ChannelId,
714 user_id: UserId,
715 ) -> Result<Vec<proto::ChannelMember>> {
716 let (role, members) = self
717 .transaction(move |tx| async move {
718 let channel = self.get_channel_internal(channel_id, &*tx).await?;
719 let role = self
720 .check_user_is_channel_participant(&channel, user_id, &*tx)
721 .await?;
722 Ok((
723 role,
724 self.get_channel_participant_details_internal(&channel, &*tx)
725 .await?,
726 ))
727 })
728 .await?;
729
730 if role == ChannelRole::Admin {
731 Ok(members
732 .into_iter()
733 .map(|channel_member| proto::ChannelMember {
734 role: channel_member.role.into(),
735 user_id: channel_member.user_id.to_proto(),
736 kind: if channel_member.accepted {
737 Kind::Member
738 } else {
739 Kind::Invitee
740 }
741 .into(),
742 })
743 .collect())
744 } else {
745 return Ok(members
746 .into_iter()
747 .filter_map(|member| {
748 if !member.accepted {
749 return None;
750 }
751 Some(proto::ChannelMember {
752 role: member.role.into(),
753 user_id: member.user_id.to_proto(),
754 kind: Kind::Member.into(),
755 })
756 })
757 .collect());
758 }
759 }
760
761 async fn get_channel_participant_details_internal(
762 &self,
763 channel: &channel::Model,
764 tx: &DatabaseTransaction,
765 ) -> Result<Vec<channel_member::Model>> {
766 Ok(channel_member::Entity::find()
767 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
768 .all(tx)
769 .await?)
770 }
771
772 /// Returns the participants in the given channel.
773 pub async fn get_channel_participants(
774 &self,
775 channel: &channel::Model,
776 tx: &DatabaseTransaction,
777 ) -> Result<Vec<UserId>> {
778 let participants = self
779 .get_channel_participant_details_internal(channel, &*tx)
780 .await?;
781 Ok(participants
782 .into_iter()
783 .map(|member| member.user_id)
784 .collect())
785 }
786
787 /// Returns whether the given user is an admin in the specified channel.
788 pub async fn check_user_is_channel_admin(
789 &self,
790 channel: &channel::Model,
791 user_id: UserId,
792 tx: &DatabaseTransaction,
793 ) -> Result<ChannelRole> {
794 let role = self.channel_role_for_user(channel, user_id, tx).await?;
795 match role {
796 Some(ChannelRole::Admin) => Ok(role.unwrap()),
797 Some(ChannelRole::Member)
798 | Some(ChannelRole::Banned)
799 | Some(ChannelRole::Guest)
800 | None => Err(anyhow!(
801 "user is not a channel admin or channel does not exist"
802 ))?,
803 }
804 }
805
806 /// Returns whether the given user is a member of the specified channel.
807 pub async fn check_user_is_channel_member(
808 &self,
809 channel: &channel::Model,
810 user_id: UserId,
811 tx: &DatabaseTransaction,
812 ) -> Result<ChannelRole> {
813 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
814 match channel_role {
815 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
816 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
817 "user is not a channel member or channel does not exist"
818 ))?,
819 }
820 }
821
822 /// Returns whether the given user is a participant in the specified channel.
823 pub async fn check_user_is_channel_participant(
824 &self,
825 channel: &channel::Model,
826 user_id: UserId,
827 tx: &DatabaseTransaction,
828 ) -> Result<ChannelRole> {
829 let role = self.channel_role_for_user(channel, user_id, tx).await?;
830 match role {
831 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
832 Ok(role.unwrap())
833 }
834 Some(ChannelRole::Banned) | None => Err(anyhow!(
835 "user is not a channel participant or channel does not exist"
836 ))?,
837 }
838 }
839
840 /// Returns a user's pending invite for the given channel, if one exists.
841 pub async fn pending_invite_for_channel(
842 &self,
843 channel: &channel::Model,
844 user_id: UserId,
845 tx: &DatabaseTransaction,
846 ) -> Result<Option<channel_member::Model>> {
847 let row = channel_member::Entity::find()
848 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
849 .filter(channel_member::Column::UserId.eq(user_id))
850 .filter(channel_member::Column::Accepted.eq(false))
851 .one(&*tx)
852 .await?;
853
854 Ok(row)
855 }
856
857 /// Returns the role for a user in the given channel.
858 pub async fn channel_role_for_user(
859 &self,
860 channel: &channel::Model,
861 user_id: UserId,
862 tx: &DatabaseTransaction,
863 ) -> Result<Option<ChannelRole>> {
864 let membership = channel_member::Entity::find()
865 .filter(
866 channel_member::Column::ChannelId
867 .eq(channel.root_id())
868 .and(channel_member::Column::UserId.eq(user_id))
869 .and(channel_member::Column::Accepted.eq(true)),
870 )
871 .one(&*tx)
872 .await?;
873
874 let Some(membership) = membership else {
875 return Ok(None);
876 };
877
878 if !membership.role.can_see_channel(channel.visibility) {
879 return Ok(None);
880 }
881
882 Ok(Some(membership.role))
883 }
884
885 // Get the descendants of the given set if channels, ordered by their
886 // path.
887 pub(crate) async fn get_channel_descendants_excluding_self(
888 &self,
889 channels: impl IntoIterator<Item = &channel::Model>,
890 tx: &DatabaseTransaction,
891 ) -> Result<Vec<channel::Model>> {
892 let mut filter = Condition::any();
893 for channel in channels.into_iter() {
894 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
895 }
896
897 if filter.is_empty() {
898 return Ok(vec![]);
899 }
900
901 Ok(channel::Entity::find()
902 .filter(filter)
903 .order_by_asc(Expr::cust("parent_path || id || '/'"))
904 .all(tx)
905 .await?)
906 }
907
908 /// Returns the channel with the given ID.
909 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
910 self.transaction(|tx| async move {
911 let channel = self.get_channel_internal(channel_id, &*tx).await?;
912 self.check_user_is_channel_participant(&channel, user_id, &*tx)
913 .await?;
914
915 Ok(Channel::from_model(channel))
916 })
917 .await
918 }
919
920 pub(crate) async fn get_channel_internal(
921 &self,
922 channel_id: ChannelId,
923 tx: &DatabaseTransaction,
924 ) -> Result<channel::Model> {
925 Ok(channel::Entity::find_by_id(channel_id)
926 .one(&*tx)
927 .await?
928 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
929 }
930
931 pub(crate) async fn get_or_create_channel_room(
932 &self,
933 channel_id: ChannelId,
934 live_kit_room: &str,
935 tx: &DatabaseTransaction,
936 ) -> Result<RoomId> {
937 let room = room::Entity::find()
938 .filter(room::Column::ChannelId.eq(channel_id))
939 .one(&*tx)
940 .await?;
941
942 let room_id = if let Some(room) = room {
943 room.id
944 } else {
945 let result = room::Entity::insert(room::ActiveModel {
946 channel_id: ActiveValue::Set(Some(channel_id)),
947 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
948 ..Default::default()
949 })
950 .exec(&*tx)
951 .await?;
952
953 result.last_insert_id
954 };
955
956 Ok(room_id)
957 }
958
959 /// Move a channel from one parent to another
960 pub async fn move_channel(
961 &self,
962 channel_id: ChannelId,
963 new_parent_id: ChannelId,
964 admin_id: UserId,
965 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
966 self.transaction(|tx| async move {
967 let channel = self.get_channel_internal(channel_id, &*tx).await?;
968 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
969 .await?;
970 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
971
972 if new_parent.root_id() != channel.root_id() {
973 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
974 }
975
976 if new_parent
977 .ancestors_including_self()
978 .any(|id| id == channel.id)
979 {
980 Err(anyhow!(ErrorCode::CircularNesting))?;
981 }
982
983 if channel.visibility == ChannelVisibility::Public
984 && new_parent.visibility != ChannelVisibility::Public
985 {
986 Err(anyhow!(ErrorCode::BadPublicNesting))?;
987 }
988
989 let root_id = channel.root_id();
990 let old_path = format!("{}{}/", channel.parent_path, channel.id);
991 let new_path = format!("{}{}/", new_parent.path(), channel.id);
992
993 let mut model = channel.into_active_model();
994 model.parent_path = ActiveValue::Set(new_parent.path());
995 let channel = model.update(&*tx).await?;
996
997 let descendent_ids =
998 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
999 self.pool.get_database_backend(),
1000 "
1001 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1002 WHERE parent_path LIKE $3 || '%'
1003 RETURNING id
1004 ",
1005 [old_path.clone().into(), new_path.into(), old_path.into()],
1006 ))
1007 .all(&*tx)
1008 .await?;
1009
1010 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1011
1012 let channels = channel::Entity::find()
1013 .filter(channel::Column::Id.is_in(all_moved_ids))
1014 .all(&*tx)
1015 .await?
1016 .into_iter()
1017 .map(|c| Channel::from_model(c))
1018 .collect::<Vec<_>>();
1019
1020 let channel_members = channel_member::Entity::find()
1021 .filter(channel_member::Column::ChannelId.eq(root_id))
1022 .all(&*tx)
1023 .await?;
1024
1025 Ok((channels, channel_members))
1026 })
1027 .await
1028 }
1029}
1030
1031#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1032enum QueryIds {
1033 Id,
1034}
1035
1036#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1037enum QueryUserIds {
1038 UserId,
1039}