1use super::*;
2use rpc::{
3 proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
4 ErrorCode, ErrorCodeExt,
5};
6use sea_orm::TryGetableMany;
7
8impl Database {
9 #[cfg(test)]
10 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
11 self.transaction(move |tx| async move {
12 let mut channels = Vec::new();
13 let mut rows = channel::Entity::find().stream(&*tx).await?;
14 while let Some(row) = rows.next().await {
15 let row = row?;
16 channels.push((row.id, row.name));
17 }
18 Ok(channels)
19 })
20 .await
21 }
22
23 #[cfg(test)]
24 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
25 Ok(self.create_channel(name, None, creator_id).await?.0.id)
26 }
27
28 #[cfg(test)]
29 pub async fn create_sub_channel(
30 &self,
31 name: &str,
32 parent: ChannelId,
33 creator_id: UserId,
34 ) -> Result<ChannelId> {
35 Ok(self
36 .create_channel(name, Some(parent), creator_id)
37 .await?
38 .0
39 .id)
40 }
41
42 /// Creates a new channel.
43 pub async fn create_channel(
44 &self,
45 name: &str,
46 parent_channel_id: Option<ChannelId>,
47 admin_id: UserId,
48 ) -> Result<(
49 Channel,
50 Option<channel_member::Model>,
51 Vec<channel_member::Model>,
52 )> {
53 let name = Self::sanitize_channel_name(name)?;
54 self.transaction(move |tx| async move {
55 let mut parent = None;
56 let mut membership = None;
57
58 if let Some(parent_channel_id) = parent_channel_id {
59 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
60 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
61 .await?;
62 parent = Some(parent_channel);
63 }
64
65 let channel = channel::ActiveModel {
66 id: ActiveValue::NotSet,
67 name: ActiveValue::Set(name.to_string()),
68 visibility: ActiveValue::Set(ChannelVisibility::Members),
69 parent_path: ActiveValue::Set(
70 parent
71 .as_ref()
72 .map_or(String::new(), |parent| parent.path()),
73 ),
74 requires_zed_cla: ActiveValue::NotSet,
75 }
76 .insert(&*tx)
77 .await?;
78
79 if parent.is_none() {
80 membership = Some(
81 channel_member::ActiveModel {
82 id: ActiveValue::NotSet,
83 channel_id: ActiveValue::Set(channel.id),
84 user_id: ActiveValue::Set(admin_id),
85 accepted: ActiveValue::Set(true),
86 role: ActiveValue::Set(ChannelRole::Admin),
87 }
88 .insert(&*tx)
89 .await?,
90 );
91 }
92
93 let channel_members = channel_member::Entity::find()
94 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
95 .all(&*tx)
96 .await?;
97
98 Ok((Channel::from_model(channel), membership, channel_members))
99 })
100 .await
101 }
102
103 /// Adds a user to the specified channel.
104 pub async fn join_channel(
105 &self,
106 channel_id: ChannelId,
107 user_id: UserId,
108 connection: ConnectionId,
109 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
110 self.transaction(move |tx| async move {
111 let channel = self.get_channel_internal(channel_id, &tx).await?;
112 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
113
114 let mut accept_invite_result = None;
115
116 if role.is_none() {
117 if let Some(invitation) = self
118 .pending_invite_for_channel(&channel, user_id, &tx)
119 .await?
120 {
121 // note, this may be a parent channel
122 role = Some(invitation.role);
123 channel_member::Entity::update(channel_member::ActiveModel {
124 accepted: ActiveValue::Set(true),
125 ..invitation.into_active_model()
126 })
127 .exec(&*tx)
128 .await?;
129
130 accept_invite_result = Some(
131 self.calculate_membership_updated(&channel, user_id, &tx)
132 .await?,
133 );
134
135 debug_assert!(
136 self.channel_role_for_user(&channel, user_id, &tx).await? == role
137 );
138 } else if channel.visibility == ChannelVisibility::Public {
139 role = Some(ChannelRole::Guest);
140 channel_member::Entity::insert(channel_member::ActiveModel {
141 id: ActiveValue::NotSet,
142 channel_id: ActiveValue::Set(channel.root_id()),
143 user_id: ActiveValue::Set(user_id),
144 accepted: ActiveValue::Set(true),
145 role: ActiveValue::Set(ChannelRole::Guest),
146 })
147 .exec(&*tx)
148 .await?;
149
150 accept_invite_result = Some(
151 self.calculate_membership_updated(&channel, user_id, &tx)
152 .await?,
153 );
154
155 debug_assert!(
156 self.channel_role_for_user(&channel, user_id, &tx).await? == role
157 );
158 }
159 }
160
161 if role.is_none() || role == Some(ChannelRole::Banned) {
162 Err(ErrorCode::Forbidden.anyhow())?
163 }
164 let role = role.unwrap();
165
166 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
167 let room_id = self
168 .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
169 .await?;
170
171 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
172 .await
173 .map(|jr| (jr, accept_invite_result, role))
174 })
175 .await
176 }
177
178 /// Sets the visibility of the given channel.
179 pub async fn set_channel_visibility(
180 &self,
181 channel_id: ChannelId,
182 visibility: ChannelVisibility,
183 admin_id: UserId,
184 ) -> Result<(Channel, Vec<channel_member::Model>)> {
185 self.transaction(move |tx| async move {
186 let channel = self.get_channel_internal(channel_id, &tx).await?;
187 self.check_user_is_channel_admin(&channel, admin_id, &tx)
188 .await?;
189
190 if visibility == ChannelVisibility::Public {
191 if let Some(parent_id) = channel.parent_id() {
192 let parent = self.get_channel_internal(parent_id, &tx).await?;
193
194 if parent.visibility != ChannelVisibility::Public {
195 Err(ErrorCode::BadPublicNesting
196 .with_tag("direction", "parent")
197 .anyhow())?;
198 }
199 }
200 } else if visibility == ChannelVisibility::Members {
201 if self
202 .get_channel_descendants_excluding_self([&channel], &tx)
203 .await?
204 .into_iter()
205 .any(|channel| channel.visibility == ChannelVisibility::Public)
206 {
207 Err(ErrorCode::BadPublicNesting
208 .with_tag("direction", "children")
209 .anyhow())?;
210 }
211 }
212
213 let mut model = channel.into_active_model();
214 model.visibility = ActiveValue::Set(visibility);
215 let channel = model.update(&*tx).await?;
216
217 let channel_members = channel_member::Entity::find()
218 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
219 .all(&*tx)
220 .await?;
221
222 Ok((Channel::from_model(channel), channel_members))
223 })
224 .await
225 }
226
227 #[cfg(test)]
228 pub async fn set_channel_requires_zed_cla(
229 &self,
230 channel_id: ChannelId,
231 requires_zed_cla: bool,
232 ) -> Result<()> {
233 self.transaction(move |tx| async move {
234 let channel = self.get_channel_internal(channel_id, &tx).await?;
235 let mut model = channel.into_active_model();
236 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
237 model.update(&*tx).await?;
238 Ok(())
239 })
240 .await
241 }
242
243 /// Deletes the channel with the specified ID.
244 pub async fn delete_channel(
245 &self,
246 channel_id: ChannelId,
247 user_id: UserId,
248 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
249 self.transaction(move |tx| async move {
250 let channel = self.get_channel_internal(channel_id, &tx).await?;
251 self.check_user_is_channel_admin(&channel, user_id, &tx)
252 .await?;
253
254 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
255 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
256 .select_only()
257 .column(channel_member::Column::UserId)
258 .distinct()
259 .into_values::<_, QueryUserIds>()
260 .all(&*tx)
261 .await?;
262
263 let channels_to_remove = self
264 .get_channel_descendants_excluding_self([&channel], &tx)
265 .await?
266 .into_iter()
267 .map(|channel| channel.id)
268 .chain(Some(channel_id))
269 .collect::<Vec<_>>();
270
271 channel::Entity::delete_many()
272 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
273 .exec(&*tx)
274 .await?;
275
276 Ok((channels_to_remove, members_to_notify))
277 })
278 .await
279 }
280
281 /// Invites a user to a channel as a member.
282 pub async fn invite_channel_member(
283 &self,
284 channel_id: ChannelId,
285 invitee_id: UserId,
286 inviter_id: UserId,
287 role: ChannelRole,
288 ) -> Result<InviteMemberResult> {
289 self.transaction(move |tx| async move {
290 let channel = self.get_channel_internal(channel_id, &tx).await?;
291 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
292 .await?;
293 if !channel.is_root() {
294 Err(ErrorCode::NotARootChannel.anyhow())?
295 }
296
297 channel_member::ActiveModel {
298 id: ActiveValue::NotSet,
299 channel_id: ActiveValue::Set(channel_id),
300 user_id: ActiveValue::Set(invitee_id),
301 accepted: ActiveValue::Set(false),
302 role: ActiveValue::Set(role),
303 }
304 .insert(&*tx)
305 .await?;
306
307 let channel = Channel::from_model(channel);
308
309 let notifications = self
310 .create_notification(
311 invitee_id,
312 rpc::Notification::ChannelInvitation {
313 channel_id: channel_id.to_proto(),
314 channel_name: channel.name.clone(),
315 inviter_id: inviter_id.to_proto(),
316 },
317 true,
318 &tx,
319 )
320 .await?
321 .into_iter()
322 .collect();
323
324 Ok(InviteMemberResult {
325 channel,
326 notifications,
327 })
328 })
329 .await
330 }
331
332 fn sanitize_channel_name(name: &str) -> Result<&str> {
333 let new_name = name.trim().trim_start_matches('#');
334 if new_name == "" {
335 Err(anyhow!("channel name can't be blank"))?;
336 }
337 Ok(new_name)
338 }
339
340 /// Renames the specified channel.
341 pub async fn rename_channel(
342 &self,
343 channel_id: ChannelId,
344 admin_id: UserId,
345 new_name: &str,
346 ) -> Result<(Channel, Vec<channel_member::Model>)> {
347 self.transaction(move |tx| async move {
348 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
349
350 let channel = self.get_channel_internal(channel_id, &tx).await?;
351 self.check_user_is_channel_admin(&channel, admin_id, &tx)
352 .await?;
353
354 let mut model = channel.into_active_model();
355 model.name = ActiveValue::Set(new_name.clone());
356 let channel = model.update(&*tx).await?;
357
358 let channel_members = channel_member::Entity::find()
359 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
360 .all(&*tx)
361 .await?;
362
363 Ok((Channel::from_model(channel), channel_members))
364 })
365 .await
366 }
367
368 /// accept or decline an invite to join a channel
369 pub async fn respond_to_channel_invite(
370 &self,
371 channel_id: ChannelId,
372 user_id: UserId,
373 accept: bool,
374 ) -> Result<RespondToChannelInvite> {
375 self.transaction(move |tx| async move {
376 let channel = self.get_channel_internal(channel_id, &tx).await?;
377
378 let membership_update = if accept {
379 let rows_affected = channel_member::Entity::update_many()
380 .set(channel_member::ActiveModel {
381 accepted: ActiveValue::Set(accept),
382 ..Default::default()
383 })
384 .filter(
385 channel_member::Column::ChannelId
386 .eq(channel_id)
387 .and(channel_member::Column::UserId.eq(user_id))
388 .and(channel_member::Column::Accepted.eq(false)),
389 )
390 .exec(&*tx)
391 .await?
392 .rows_affected;
393
394 if rows_affected == 0 {
395 Err(anyhow!("no such invitation"))?;
396 }
397
398 Some(
399 self.calculate_membership_updated(&channel, user_id, &tx)
400 .await?,
401 )
402 } else {
403 let rows_affected = channel_member::Entity::delete_many()
404 .filter(
405 channel_member::Column::ChannelId
406 .eq(channel_id)
407 .and(channel_member::Column::UserId.eq(user_id))
408 .and(channel_member::Column::Accepted.eq(false)),
409 )
410 .exec(&*tx)
411 .await?
412 .rows_affected;
413 if rows_affected == 0 {
414 Err(anyhow!("no such invitation"))?;
415 }
416
417 None
418 };
419
420 Ok(RespondToChannelInvite {
421 membership_update,
422 notifications: self
423 .mark_notification_as_read_with_response(
424 user_id,
425 &rpc::Notification::ChannelInvitation {
426 channel_id: channel_id.to_proto(),
427 channel_name: Default::default(),
428 inviter_id: Default::default(),
429 },
430 accept,
431 &tx,
432 )
433 .await?
434 .into_iter()
435 .collect(),
436 })
437 })
438 .await
439 }
440
441 async fn calculate_membership_updated(
442 &self,
443 channel: &channel::Model,
444 user_id: UserId,
445 tx: &DatabaseTransaction,
446 ) -> Result<MembershipUpdated> {
447 let new_channels = self.get_user_channels(user_id, Some(channel), tx).await?;
448 let removed_channels = self
449 .get_channel_descendants_excluding_self([channel], tx)
450 .await?
451 .into_iter()
452 .map(|channel| channel.id)
453 .chain([channel.id])
454 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
455 .collect::<Vec<_>>();
456
457 Ok(MembershipUpdated {
458 channel_id: channel.id,
459 new_channels,
460 removed_channels,
461 })
462 }
463
464 /// Removes a channel member.
465 pub async fn remove_channel_member(
466 &self,
467 channel_id: ChannelId,
468 member_id: UserId,
469 admin_id: UserId,
470 ) -> Result<RemoveChannelMemberResult> {
471 self.transaction(|tx| async move {
472 let channel = self.get_channel_internal(channel_id, &tx).await?;
473
474 if member_id != admin_id {
475 self.check_user_is_channel_admin(&channel, admin_id, &tx)
476 .await?;
477 }
478
479 let result = channel_member::Entity::delete_many()
480 .filter(
481 channel_member::Column::ChannelId
482 .eq(channel_id)
483 .and(channel_member::Column::UserId.eq(member_id)),
484 )
485 .exec(&*tx)
486 .await?;
487
488 if result.rows_affected == 0 {
489 Err(anyhow!("no such member"))?;
490 }
491
492 Ok(RemoveChannelMemberResult {
493 membership_update: self
494 .calculate_membership_updated(&channel, member_id, &tx)
495 .await?,
496 notification_id: self
497 .remove_notification(
498 member_id,
499 rpc::Notification::ChannelInvitation {
500 channel_id: channel_id.to_proto(),
501 channel_name: Default::default(),
502 inviter_id: Default::default(),
503 },
504 &tx,
505 )
506 .await?,
507 })
508 })
509 .await
510 }
511
512 /// Returns all channel invites for the user with the given ID.
513 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
514 self.transaction(|tx| async move {
515 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
516
517 let channel_invites = channel_member::Entity::find()
518 .filter(
519 channel_member::Column::UserId
520 .eq(user_id)
521 .and(channel_member::Column::Accepted.eq(false)),
522 )
523 .all(&*tx)
524 .await?;
525
526 for invite in channel_invites {
527 role_for_channel.insert(invite.channel_id, invite.role);
528 }
529
530 let channels = channel::Entity::find()
531 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
532 .all(&*tx)
533 .await?;
534
535 let channels = channels.into_iter().map(Channel::from_model).collect();
536
537 Ok(channels)
538 })
539 .await
540 }
541
542 /// Returns all channels for the user with the given ID.
543 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
544 self.transaction(|tx| async move {
545 let tx = tx;
546
547 self.get_user_channels(user_id, None, &tx).await
548 })
549 .await
550 }
551
552 /// Returns all channels for the user with the given ID that are descendants
553 /// of the specified ancestor channel.
554 pub async fn get_user_channels(
555 &self,
556 user_id: UserId,
557 ancestor_channel: Option<&channel::Model>,
558 tx: &DatabaseTransaction,
559 ) -> Result<ChannelsForUser> {
560 let mut filter = channel_member::Column::UserId
561 .eq(user_id)
562 .and(channel_member::Column::Accepted.eq(true));
563
564 if let Some(ancestor) = ancestor_channel {
565 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
566 }
567
568 let channel_memberships = channel_member::Entity::find()
569 .filter(filter)
570 .all(tx)
571 .await?;
572
573 let channels = channel::Entity::find()
574 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
575 .all(tx)
576 .await?;
577
578 let mut descendants = self
579 .get_channel_descendants_excluding_self(channels.iter(), tx)
580 .await?;
581
582 for channel in channels {
583 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
584 descendants.insert(ix, channel);
585 }
586 }
587
588 let roles_by_channel_id = channel_memberships
589 .iter()
590 .map(|membership| (membership.channel_id, membership.role))
591 .collect::<HashMap<_, _>>();
592
593 let channels: Vec<Channel> = descendants
594 .into_iter()
595 .filter_map(|channel| {
596 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
597 if parent_role.can_see_channel(channel.visibility) {
598 Some(Channel::from_model(channel))
599 } else {
600 None
601 }
602 })
603 .collect();
604
605 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
606 enum QueryUserIdsAndChannelIds {
607 ChannelId,
608 UserId,
609 }
610
611 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
612 {
613 let mut rows = room_participant::Entity::find()
614 .inner_join(room::Entity)
615 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
616 .select_only()
617 .column(room::Column::ChannelId)
618 .column(room_participant::Column::UserId)
619 .into_values::<_, QueryUserIdsAndChannelIds>()
620 .stream(tx)
621 .await?;
622 while let Some(row) = rows.next().await {
623 let row: (ChannelId, UserId) = row?;
624 channel_participants.entry(row.0).or_default().push(row.1)
625 }
626 }
627
628 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
629
630 let mut channel_ids_by_buffer_id = HashMap::default();
631 let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
632 let mut rows = buffer::Entity::find()
633 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
634 .stream(tx)
635 .await?;
636 while let Some(row) = rows.next().await {
637 let row = row?;
638 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
639 latest_buffer_versions.push(ChannelBufferVersion {
640 channel_id: row.channel_id.0 as u64,
641 epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
642 version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
643 .latest_operation_lamport_timestamp
644 .zip(row.latest_operation_replica_id)
645 {
646 vec![VectorClockEntry {
647 timestamp: latest_lamport_timestamp as u32,
648 replica_id: latest_replica_id as u32,
649 }]
650 } else {
651 vec![]
652 },
653 });
654 }
655 drop(rows);
656
657 let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
658
659 let observed_buffer_versions = self
660 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
661 .await?;
662
663 let observed_channel_messages = self
664 .observed_channel_messages(&channel_ids, user_id, tx)
665 .await?;
666
667 let hosted_projects = self
668 .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
669 .await?;
670
671 Ok(ChannelsForUser {
672 channel_memberships,
673 channels,
674 hosted_projects,
675 channel_participants,
676 latest_buffer_versions,
677 latest_channel_messages,
678 observed_buffer_versions,
679 observed_channel_messages,
680 })
681 }
682
683 /// Sets the role for the specified channel member.
684 pub async fn set_channel_member_role(
685 &self,
686 channel_id: ChannelId,
687 admin_id: UserId,
688 for_user: UserId,
689 role: ChannelRole,
690 ) -> Result<SetMemberRoleResult> {
691 self.transaction(|tx| async move {
692 let channel = self.get_channel_internal(channel_id, &tx).await?;
693 self.check_user_is_channel_admin(&channel, admin_id, &tx)
694 .await?;
695
696 let membership = channel_member::Entity::find()
697 .filter(
698 channel_member::Column::ChannelId
699 .eq(channel_id)
700 .and(channel_member::Column::UserId.eq(for_user)),
701 )
702 .one(&*tx)
703 .await?;
704
705 let Some(membership) = membership else {
706 Err(anyhow!("no such member"))?
707 };
708
709 let mut update = membership.into_active_model();
710 update.role = ActiveValue::Set(role);
711 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
712
713 if updated.accepted {
714 Ok(SetMemberRoleResult::MembershipUpdated(
715 self.calculate_membership_updated(&channel, for_user, &tx)
716 .await?,
717 ))
718 } else {
719 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
720 channel,
721 )))
722 }
723 })
724 .await
725 }
726
727 /// Returns the details for the specified channel member.
728 pub async fn get_channel_participant_details(
729 &self,
730 channel_id: ChannelId,
731 user_id: UserId,
732 ) -> Result<Vec<proto::ChannelMember>> {
733 let (role, members) = self
734 .transaction(move |tx| async move {
735 let channel = self.get_channel_internal(channel_id, &tx).await?;
736 let role = self
737 .check_user_is_channel_participant(&channel, user_id, &tx)
738 .await?;
739 Ok((
740 role,
741 self.get_channel_participant_details_internal(&channel, &tx)
742 .await?,
743 ))
744 })
745 .await?;
746
747 if role == ChannelRole::Admin {
748 Ok(members
749 .into_iter()
750 .map(|channel_member| proto::ChannelMember {
751 role: channel_member.role.into(),
752 user_id: channel_member.user_id.to_proto(),
753 kind: if channel_member.accepted {
754 Kind::Member
755 } else {
756 Kind::Invitee
757 }
758 .into(),
759 })
760 .collect())
761 } else {
762 return Ok(members
763 .into_iter()
764 .filter_map(|member| {
765 if !member.accepted {
766 return None;
767 }
768 Some(proto::ChannelMember {
769 role: member.role.into(),
770 user_id: member.user_id.to_proto(),
771 kind: Kind::Member.into(),
772 })
773 })
774 .collect());
775 }
776 }
777
778 async fn get_channel_participant_details_internal(
779 &self,
780 channel: &channel::Model,
781 tx: &DatabaseTransaction,
782 ) -> Result<Vec<channel_member::Model>> {
783 Ok(channel_member::Entity::find()
784 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
785 .all(tx)
786 .await?)
787 }
788
789 /// Returns the participants in the given channel.
790 pub async fn get_channel_participants(
791 &self,
792 channel: &channel::Model,
793 tx: &DatabaseTransaction,
794 ) -> Result<Vec<UserId>> {
795 let participants = self
796 .get_channel_participant_details_internal(channel, tx)
797 .await?;
798 Ok(participants
799 .into_iter()
800 .map(|member| member.user_id)
801 .collect())
802 }
803
804 /// Returns whether the given user is an admin in the specified channel.
805 pub async fn check_user_is_channel_admin(
806 &self,
807 channel: &channel::Model,
808 user_id: UserId,
809 tx: &DatabaseTransaction,
810 ) -> Result<ChannelRole> {
811 let role = self.channel_role_for_user(channel, user_id, tx).await?;
812 match role {
813 Some(ChannelRole::Admin) => Ok(role.unwrap()),
814 Some(ChannelRole::Member)
815 | Some(ChannelRole::Talker)
816 | Some(ChannelRole::Banned)
817 | Some(ChannelRole::Guest)
818 | None => Err(anyhow!(
819 "user is not a channel admin or channel does not exist"
820 ))?,
821 }
822 }
823
824 /// Returns whether the given user is a member of the specified channel.
825 pub async fn check_user_is_channel_member(
826 &self,
827 channel: &channel::Model,
828 user_id: UserId,
829 tx: &DatabaseTransaction,
830 ) -> Result<ChannelRole> {
831 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
832 match channel_role {
833 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
834 Some(ChannelRole::Banned)
835 | Some(ChannelRole::Guest)
836 | Some(ChannelRole::Talker)
837 | None => Err(anyhow!(
838 "user is not a channel member or channel does not exist"
839 ))?,
840 }
841 }
842
843 /// Returns whether the given user is a participant in the specified channel.
844 pub async fn check_user_is_channel_participant(
845 &self,
846 channel: &channel::Model,
847 user_id: UserId,
848 tx: &DatabaseTransaction,
849 ) -> Result<ChannelRole> {
850 let role = self.channel_role_for_user(channel, user_id, tx).await?;
851 match role {
852 Some(ChannelRole::Admin)
853 | Some(ChannelRole::Member)
854 | Some(ChannelRole::Guest)
855 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
856 Some(ChannelRole::Banned) | None => Err(anyhow!(
857 "user is not a channel participant or channel does not exist"
858 ))?,
859 }
860 }
861
862 /// Returns a user's pending invite for the given channel, if one exists.
863 pub async fn pending_invite_for_channel(
864 &self,
865 channel: &channel::Model,
866 user_id: UserId,
867 tx: &DatabaseTransaction,
868 ) -> Result<Option<channel_member::Model>> {
869 let row = channel_member::Entity::find()
870 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
871 .filter(channel_member::Column::UserId.eq(user_id))
872 .filter(channel_member::Column::Accepted.eq(false))
873 .one(tx)
874 .await?;
875
876 Ok(row)
877 }
878
879 /// Returns the role for a user in the given channel.
880 pub async fn channel_role_for_user(
881 &self,
882 channel: &channel::Model,
883 user_id: UserId,
884 tx: &DatabaseTransaction,
885 ) -> Result<Option<ChannelRole>> {
886 let membership = channel_member::Entity::find()
887 .filter(
888 channel_member::Column::ChannelId
889 .eq(channel.root_id())
890 .and(channel_member::Column::UserId.eq(user_id))
891 .and(channel_member::Column::Accepted.eq(true)),
892 )
893 .one(tx)
894 .await?;
895
896 let Some(membership) = membership else {
897 return Ok(None);
898 };
899
900 if !membership.role.can_see_channel(channel.visibility) {
901 return Ok(None);
902 }
903
904 Ok(Some(membership.role))
905 }
906
907 // Get the descendants of the given set if channels, ordered by their
908 // path.
909 pub(crate) async fn get_channel_descendants_excluding_self(
910 &self,
911 channels: impl IntoIterator<Item = &channel::Model>,
912 tx: &DatabaseTransaction,
913 ) -> Result<Vec<channel::Model>> {
914 let mut filter = Condition::any();
915 for channel in channels.into_iter() {
916 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
917 }
918
919 if filter.is_empty() {
920 return Ok(vec![]);
921 }
922
923 Ok(channel::Entity::find()
924 .filter(filter)
925 .order_by_asc(Expr::cust("parent_path || id || '/'"))
926 .all(tx)
927 .await?)
928 }
929
930 /// Returns the channel with the given ID.
931 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
932 self.transaction(|tx| async move {
933 let channel = self.get_channel_internal(channel_id, &tx).await?;
934 self.check_user_is_channel_participant(&channel, user_id, &tx)
935 .await?;
936
937 Ok(Channel::from_model(channel))
938 })
939 .await
940 }
941
942 pub(crate) async fn get_channel_internal(
943 &self,
944 channel_id: ChannelId,
945 tx: &DatabaseTransaction,
946 ) -> Result<channel::Model> {
947 Ok(channel::Entity::find_by_id(channel_id)
948 .one(tx)
949 .await?
950 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
951 }
952
953 pub(crate) async fn get_or_create_channel_room(
954 &self,
955 channel_id: ChannelId,
956 live_kit_room: &str,
957 tx: &DatabaseTransaction,
958 ) -> Result<RoomId> {
959 let room = room::Entity::find()
960 .filter(room::Column::ChannelId.eq(channel_id))
961 .one(tx)
962 .await?;
963
964 let room_id = if let Some(room) = room {
965 room.id
966 } else {
967 let result = room::Entity::insert(room::ActiveModel {
968 channel_id: ActiveValue::Set(Some(channel_id)),
969 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
970 ..Default::default()
971 })
972 .exec(tx)
973 .await?;
974
975 result.last_insert_id
976 };
977
978 Ok(room_id)
979 }
980
981 /// Move a channel from one parent to another
982 pub async fn move_channel(
983 &self,
984 channel_id: ChannelId,
985 new_parent_id: ChannelId,
986 admin_id: UserId,
987 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
988 self.transaction(|tx| async move {
989 let channel = self.get_channel_internal(channel_id, &tx).await?;
990 self.check_user_is_channel_admin(&channel, admin_id, &tx)
991 .await?;
992 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
993
994 if new_parent.root_id() != channel.root_id() {
995 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
996 }
997
998 if new_parent
999 .ancestors_including_self()
1000 .any(|id| id == channel.id)
1001 {
1002 Err(anyhow!(ErrorCode::CircularNesting))?;
1003 }
1004
1005 if channel.visibility == ChannelVisibility::Public
1006 && new_parent.visibility != ChannelVisibility::Public
1007 {
1008 Err(anyhow!(ErrorCode::BadPublicNesting))?;
1009 }
1010
1011 let root_id = channel.root_id();
1012 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1013 let new_path = format!("{}{}/", new_parent.path(), channel.id);
1014
1015 let mut model = channel.into_active_model();
1016 model.parent_path = ActiveValue::Set(new_parent.path());
1017 let channel = model.update(&*tx).await?;
1018
1019 let descendent_ids =
1020 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1021 self.pool.get_database_backend(),
1022 "
1023 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1024 WHERE parent_path LIKE $3 || '%'
1025 RETURNING id
1026 ",
1027 [old_path.clone().into(), new_path.into(), old_path.into()],
1028 ))
1029 .all(&*tx)
1030 .await?;
1031
1032 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1033
1034 let channels = channel::Entity::find()
1035 .filter(channel::Column::Id.is_in(all_moved_ids))
1036 .all(&*tx)
1037 .await?
1038 .into_iter()
1039 .map(|c| Channel::from_model(c))
1040 .collect::<Vec<_>>();
1041
1042 let channel_members = channel_member::Entity::find()
1043 .filter(channel_member::Column::ChannelId.eq(root_id))
1044 .all(&*tx)
1045 .await?;
1046
1047 Ok((channels, channel_members))
1048 })
1049 .await
1050 }
1051}
1052
1053#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1054enum QueryIds {
1055 Id,
1056}
1057
1058#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1059enum QueryUserIds {
1060 UserId,
1061}