1use super::*;
2use rpc::{
3 proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
4 ErrorCode, ErrorCodeExt,
5};
6use sea_orm::TryGetableMany;
7
8impl Database {
9 #[cfg(test)]
10 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
11 self.transaction(move |tx| async move {
12 let mut channels = Vec::new();
13 let mut rows = channel::Entity::find().stream(&*tx).await?;
14 while let Some(row) = rows.next().await {
15 let row = row?;
16 channels.push((row.id, row.name));
17 }
18 Ok(channels)
19 })
20 .await
21 }
22
23 #[cfg(test)]
24 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
25 Ok(self.create_channel(name, None, creator_id).await?.0.id)
26 }
27
28 #[cfg(test)]
29 pub async fn create_sub_channel(
30 &self,
31 name: &str,
32 parent: ChannelId,
33 creator_id: UserId,
34 ) -> Result<ChannelId> {
35 Ok(self
36 .create_channel(name, Some(parent), creator_id)
37 .await?
38 .0
39 .id)
40 }
41
42 /// Creates a new channel.
43 pub async fn create_channel(
44 &self,
45 name: &str,
46 parent_channel_id: Option<ChannelId>,
47 admin_id: UserId,
48 ) -> Result<(channel::Model, Option<channel_member::Model>)> {
49 let name = Self::sanitize_channel_name(name)?;
50 self.transaction(move |tx| async move {
51 let mut parent = None;
52 let mut membership = None;
53
54 if let Some(parent_channel_id) = parent_channel_id {
55 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
56 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
57 .await?;
58 parent = Some(parent_channel);
59 }
60
61 let channel = channel::ActiveModel {
62 id: ActiveValue::NotSet,
63 name: ActiveValue::Set(name.to_string()),
64 visibility: ActiveValue::Set(ChannelVisibility::Members),
65 parent_path: ActiveValue::Set(
66 parent
67 .as_ref()
68 .map_or(String::new(), |parent| parent.path()),
69 ),
70 requires_zed_cla: ActiveValue::NotSet,
71 }
72 .insert(&*tx)
73 .await?;
74
75 if parent.is_none() {
76 membership = Some(
77 channel_member::ActiveModel {
78 id: ActiveValue::NotSet,
79 channel_id: ActiveValue::Set(channel.id),
80 user_id: ActiveValue::Set(admin_id),
81 accepted: ActiveValue::Set(true),
82 role: ActiveValue::Set(ChannelRole::Admin),
83 }
84 .insert(&*tx)
85 .await?,
86 );
87 }
88
89 Ok((channel, membership))
90 })
91 .await
92 }
93
94 /// Adds a user to the specified channel.
95 pub async fn join_channel(
96 &self,
97 channel_id: ChannelId,
98 user_id: UserId,
99 connection: ConnectionId,
100 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
101 self.transaction(move |tx| async move {
102 let channel = self.get_channel_internal(channel_id, &tx).await?;
103 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
104
105 let mut accept_invite_result = None;
106
107 if role.is_none() {
108 if let Some(invitation) = self
109 .pending_invite_for_channel(&channel, user_id, &tx)
110 .await?
111 {
112 // note, this may be a parent channel
113 role = Some(invitation.role);
114 channel_member::Entity::update(channel_member::ActiveModel {
115 accepted: ActiveValue::Set(true),
116 ..invitation.into_active_model()
117 })
118 .exec(&*tx)
119 .await?;
120
121 accept_invite_result = Some(
122 self.calculate_membership_updated(&channel, user_id, &tx)
123 .await?,
124 );
125
126 debug_assert!(
127 self.channel_role_for_user(&channel, user_id, &tx).await? == role
128 );
129 } else if channel.visibility == ChannelVisibility::Public {
130 role = Some(ChannelRole::Guest);
131 channel_member::Entity::insert(channel_member::ActiveModel {
132 id: ActiveValue::NotSet,
133 channel_id: ActiveValue::Set(channel.root_id()),
134 user_id: ActiveValue::Set(user_id),
135 accepted: ActiveValue::Set(true),
136 role: ActiveValue::Set(ChannelRole::Guest),
137 })
138 .exec(&*tx)
139 .await?;
140
141 accept_invite_result = Some(
142 self.calculate_membership_updated(&channel, user_id, &tx)
143 .await?,
144 );
145
146 debug_assert!(
147 self.channel_role_for_user(&channel, user_id, &tx).await? == role
148 );
149 }
150 }
151
152 if role.is_none() || role == Some(ChannelRole::Banned) {
153 Err(ErrorCode::Forbidden.anyhow())?
154 }
155 let role = role.unwrap();
156
157 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
158 let room_id = self
159 .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
160 .await?;
161
162 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
163 .await
164 .map(|jr| (jr, accept_invite_result, role))
165 })
166 .await
167 }
168
169 /// Sets the visibility of the given channel.
170 pub async fn set_channel_visibility(
171 &self,
172 channel_id: ChannelId,
173 visibility: ChannelVisibility,
174 admin_id: UserId,
175 ) -> Result<channel::Model> {
176 self.transaction(move |tx| async move {
177 let channel = self.get_channel_internal(channel_id, &tx).await?;
178 self.check_user_is_channel_admin(&channel, admin_id, &tx)
179 .await?;
180
181 if visibility == ChannelVisibility::Public {
182 if let Some(parent_id) = channel.parent_id() {
183 let parent = self.get_channel_internal(parent_id, &tx).await?;
184
185 if parent.visibility != ChannelVisibility::Public {
186 Err(ErrorCode::BadPublicNesting
187 .with_tag("direction", "parent")
188 .anyhow())?;
189 }
190 }
191 } else if visibility == ChannelVisibility::Members {
192 if self
193 .get_channel_descendants_excluding_self([&channel], &tx)
194 .await?
195 .into_iter()
196 .any(|channel| channel.visibility == ChannelVisibility::Public)
197 {
198 Err(ErrorCode::BadPublicNesting
199 .with_tag("direction", "children")
200 .anyhow())?;
201 }
202 }
203
204 let mut model = channel.into_active_model();
205 model.visibility = ActiveValue::Set(visibility);
206 let channel = model.update(&*tx).await?;
207
208 Ok(channel)
209 })
210 .await
211 }
212
213 #[cfg(test)]
214 pub async fn set_channel_requires_zed_cla(
215 &self,
216 channel_id: ChannelId,
217 requires_zed_cla: bool,
218 ) -> Result<()> {
219 self.transaction(move |tx| async move {
220 let channel = self.get_channel_internal(channel_id, &tx).await?;
221 let mut model = channel.into_active_model();
222 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
223 model.update(&*tx).await?;
224 Ok(())
225 })
226 .await
227 }
228
229 /// Deletes the channel with the specified ID.
230 pub async fn delete_channel(
231 &self,
232 channel_id: ChannelId,
233 user_id: UserId,
234 ) -> Result<(ChannelId, Vec<ChannelId>)> {
235 self.transaction(move |tx| async move {
236 let channel = self.get_channel_internal(channel_id, &tx).await?;
237 self.check_user_is_channel_admin(&channel, user_id, &tx)
238 .await?;
239
240 let channels_to_remove = self
241 .get_channel_descendants_excluding_self([&channel], &tx)
242 .await?
243 .into_iter()
244 .map(|channel| channel.id)
245 .chain(Some(channel_id))
246 .collect::<Vec<_>>();
247
248 channel::Entity::delete_many()
249 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
250 .exec(&*tx)
251 .await?;
252
253 Ok((channel.root_id(), channels_to_remove))
254 })
255 .await
256 }
257
258 /// Invites a user to a channel as a member.
259 pub async fn invite_channel_member(
260 &self,
261 channel_id: ChannelId,
262 invitee_id: UserId,
263 inviter_id: UserId,
264 role: ChannelRole,
265 ) -> Result<InviteMemberResult> {
266 self.transaction(move |tx| async move {
267 let channel = self.get_channel_internal(channel_id, &tx).await?;
268 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
269 .await?;
270 if !channel.is_root() {
271 Err(ErrorCode::NotARootChannel.anyhow())?
272 }
273
274 channel_member::ActiveModel {
275 id: ActiveValue::NotSet,
276 channel_id: ActiveValue::Set(channel_id),
277 user_id: ActiveValue::Set(invitee_id),
278 accepted: ActiveValue::Set(false),
279 role: ActiveValue::Set(role),
280 }
281 .insert(&*tx)
282 .await?;
283
284 let channel = Channel::from_model(channel);
285
286 let notifications = self
287 .create_notification(
288 invitee_id,
289 rpc::Notification::ChannelInvitation {
290 channel_id: channel_id.to_proto(),
291 channel_name: channel.name.clone(),
292 inviter_id: inviter_id.to_proto(),
293 },
294 true,
295 &tx,
296 )
297 .await?
298 .into_iter()
299 .collect();
300
301 Ok(InviteMemberResult {
302 channel,
303 notifications,
304 })
305 })
306 .await
307 }
308
309 fn sanitize_channel_name(name: &str) -> Result<&str> {
310 let new_name = name.trim().trim_start_matches('#');
311 if new_name == "" {
312 Err(anyhow!("channel name can't be blank"))?;
313 }
314 Ok(new_name)
315 }
316
317 /// Renames the specified channel.
318 pub async fn rename_channel(
319 &self,
320 channel_id: ChannelId,
321 admin_id: UserId,
322 new_name: &str,
323 ) -> Result<channel::Model> {
324 self.transaction(move |tx| async move {
325 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
326
327 let channel = self.get_channel_internal(channel_id, &tx).await?;
328 self.check_user_is_channel_admin(&channel, admin_id, &tx)
329 .await?;
330
331 let mut model = channel.into_active_model();
332 model.name = ActiveValue::Set(new_name.clone());
333 let channel = model.update(&*tx).await?;
334
335 Ok(channel)
336 })
337 .await
338 }
339
340 /// accept or decline an invite to join a channel
341 pub async fn respond_to_channel_invite(
342 &self,
343 channel_id: ChannelId,
344 user_id: UserId,
345 accept: bool,
346 ) -> Result<RespondToChannelInvite> {
347 self.transaction(move |tx| async move {
348 let channel = self.get_channel_internal(channel_id, &tx).await?;
349
350 let membership_update = if accept {
351 let rows_affected = channel_member::Entity::update_many()
352 .set(channel_member::ActiveModel {
353 accepted: ActiveValue::Set(accept),
354 ..Default::default()
355 })
356 .filter(
357 channel_member::Column::ChannelId
358 .eq(channel_id)
359 .and(channel_member::Column::UserId.eq(user_id))
360 .and(channel_member::Column::Accepted.eq(false)),
361 )
362 .exec(&*tx)
363 .await?
364 .rows_affected;
365
366 if rows_affected == 0 {
367 Err(anyhow!("no such invitation"))?;
368 }
369
370 Some(
371 self.calculate_membership_updated(&channel, user_id, &tx)
372 .await?,
373 )
374 } else {
375 let rows_affected = channel_member::Entity::delete_many()
376 .filter(
377 channel_member::Column::ChannelId
378 .eq(channel_id)
379 .and(channel_member::Column::UserId.eq(user_id))
380 .and(channel_member::Column::Accepted.eq(false)),
381 )
382 .exec(&*tx)
383 .await?
384 .rows_affected;
385 if rows_affected == 0 {
386 Err(anyhow!("no such invitation"))?;
387 }
388
389 None
390 };
391
392 Ok(RespondToChannelInvite {
393 membership_update,
394 notifications: self
395 .mark_notification_as_read_with_response(
396 user_id,
397 &rpc::Notification::ChannelInvitation {
398 channel_id: channel_id.to_proto(),
399 channel_name: Default::default(),
400 inviter_id: Default::default(),
401 },
402 accept,
403 &tx,
404 )
405 .await?
406 .into_iter()
407 .collect(),
408 })
409 })
410 .await
411 }
412
413 async fn calculate_membership_updated(
414 &self,
415 channel: &channel::Model,
416 user_id: UserId,
417 tx: &DatabaseTransaction,
418 ) -> Result<MembershipUpdated> {
419 let new_channels = self.get_user_channels(user_id, Some(channel), tx).await?;
420 let removed_channels = self
421 .get_channel_descendants_excluding_self([channel], tx)
422 .await?
423 .into_iter()
424 .map(|channel| channel.id)
425 .chain([channel.id])
426 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
427 .collect::<Vec<_>>();
428
429 Ok(MembershipUpdated {
430 channel_id: channel.id,
431 new_channels,
432 removed_channels,
433 })
434 }
435
436 /// Removes a channel member.
437 pub async fn remove_channel_member(
438 &self,
439 channel_id: ChannelId,
440 member_id: UserId,
441 admin_id: UserId,
442 ) -> Result<RemoveChannelMemberResult> {
443 self.transaction(|tx| async move {
444 let channel = self.get_channel_internal(channel_id, &tx).await?;
445
446 if member_id != admin_id {
447 self.check_user_is_channel_admin(&channel, admin_id, &tx)
448 .await?;
449 }
450
451 let result = channel_member::Entity::delete_many()
452 .filter(
453 channel_member::Column::ChannelId
454 .eq(channel_id)
455 .and(channel_member::Column::UserId.eq(member_id)),
456 )
457 .exec(&*tx)
458 .await?;
459
460 if result.rows_affected == 0 {
461 Err(anyhow!("no such member"))?;
462 }
463
464 Ok(RemoveChannelMemberResult {
465 membership_update: self
466 .calculate_membership_updated(&channel, member_id, &tx)
467 .await?,
468 notification_id: self
469 .remove_notification(
470 member_id,
471 rpc::Notification::ChannelInvitation {
472 channel_id: channel_id.to_proto(),
473 channel_name: Default::default(),
474 inviter_id: Default::default(),
475 },
476 &tx,
477 )
478 .await?,
479 })
480 })
481 .await
482 }
483
484 /// Returns all channel invites for the user with the given ID.
485 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
486 self.transaction(|tx| async move {
487 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
488
489 let channel_invites = channel_member::Entity::find()
490 .filter(
491 channel_member::Column::UserId
492 .eq(user_id)
493 .and(channel_member::Column::Accepted.eq(false)),
494 )
495 .all(&*tx)
496 .await?;
497
498 for invite in channel_invites {
499 role_for_channel.insert(invite.channel_id, invite.role);
500 }
501
502 let channels = channel::Entity::find()
503 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
504 .all(&*tx)
505 .await?;
506
507 let channels = channels.into_iter().map(Channel::from_model).collect();
508
509 Ok(channels)
510 })
511 .await
512 }
513
514 /// Returns all channels for the user with the given ID.
515 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
516 self.transaction(|tx| async move {
517 let tx = tx;
518
519 self.get_user_channels(user_id, None, &tx).await
520 })
521 .await
522 }
523
524 /// Returns all channels for the user with the given ID that are descendants
525 /// of the specified ancestor channel.
526 pub async fn get_user_channels(
527 &self,
528 user_id: UserId,
529 ancestor_channel: Option<&channel::Model>,
530 tx: &DatabaseTransaction,
531 ) -> Result<ChannelsForUser> {
532 let mut filter = channel_member::Column::UserId
533 .eq(user_id)
534 .and(channel_member::Column::Accepted.eq(true));
535
536 if let Some(ancestor) = ancestor_channel {
537 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
538 }
539
540 let channel_memberships = channel_member::Entity::find()
541 .filter(filter)
542 .all(tx)
543 .await?;
544
545 let channels = channel::Entity::find()
546 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
547 .all(tx)
548 .await?;
549
550 let mut descendants = self
551 .get_channel_descendants_excluding_self(channels.iter(), tx)
552 .await?;
553
554 for channel in channels {
555 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
556 descendants.insert(ix, channel);
557 }
558 }
559
560 let roles_by_channel_id = channel_memberships
561 .iter()
562 .map(|membership| (membership.channel_id, membership.role))
563 .collect::<HashMap<_, _>>();
564
565 let channels: Vec<Channel> = descendants
566 .into_iter()
567 .filter_map(|channel| {
568 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
569 if parent_role.can_see_channel(channel.visibility) {
570 Some(Channel::from_model(channel))
571 } else {
572 None
573 }
574 })
575 .collect();
576
577 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
578 enum QueryUserIdsAndChannelIds {
579 ChannelId,
580 UserId,
581 }
582
583 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
584 {
585 let mut rows = room_participant::Entity::find()
586 .inner_join(room::Entity)
587 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
588 .select_only()
589 .column(room::Column::ChannelId)
590 .column(room_participant::Column::UserId)
591 .into_values::<_, QueryUserIdsAndChannelIds>()
592 .stream(tx)
593 .await?;
594 while let Some(row) = rows.next().await {
595 let row: (ChannelId, UserId) = row?;
596 channel_participants.entry(row.0).or_default().push(row.1)
597 }
598 }
599
600 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
601
602 let mut channel_ids_by_buffer_id = HashMap::default();
603 let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
604 let mut rows = buffer::Entity::find()
605 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
606 .stream(tx)
607 .await?;
608 while let Some(row) = rows.next().await {
609 let row = row?;
610 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
611 latest_buffer_versions.push(ChannelBufferVersion {
612 channel_id: row.channel_id.0 as u64,
613 epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
614 version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
615 .latest_operation_lamport_timestamp
616 .zip(row.latest_operation_replica_id)
617 {
618 vec![VectorClockEntry {
619 timestamp: latest_lamport_timestamp as u32,
620 replica_id: latest_replica_id as u32,
621 }]
622 } else {
623 vec![]
624 },
625 });
626 }
627 drop(rows);
628
629 let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
630
631 let observed_buffer_versions = self
632 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
633 .await?;
634
635 let observed_channel_messages = self
636 .observed_channel_messages(&channel_ids, user_id, tx)
637 .await?;
638
639 let hosted_projects = self
640 .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
641 .await?;
642
643 let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
644 let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
645
646 Ok(ChannelsForUser {
647 channel_memberships,
648 channels,
649 hosted_projects,
650 dev_servers,
651 remote_projects,
652 channel_participants,
653 latest_buffer_versions,
654 latest_channel_messages,
655 observed_buffer_versions,
656 observed_channel_messages,
657 })
658 }
659
660 /// Sets the role for the specified channel member.
661 pub async fn set_channel_member_role(
662 &self,
663 channel_id: ChannelId,
664 admin_id: UserId,
665 for_user: UserId,
666 role: ChannelRole,
667 ) -> Result<SetMemberRoleResult> {
668 self.transaction(|tx| async move {
669 let channel = self.get_channel_internal(channel_id, &tx).await?;
670 self.check_user_is_channel_admin(&channel, admin_id, &tx)
671 .await?;
672
673 let membership = channel_member::Entity::find()
674 .filter(
675 channel_member::Column::ChannelId
676 .eq(channel_id)
677 .and(channel_member::Column::UserId.eq(for_user)),
678 )
679 .one(&*tx)
680 .await?;
681
682 let Some(membership) = membership else {
683 Err(anyhow!("no such member"))?
684 };
685
686 let mut update = membership.into_active_model();
687 update.role = ActiveValue::Set(role);
688 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
689
690 if updated.accepted {
691 Ok(SetMemberRoleResult::MembershipUpdated(
692 self.calculate_membership_updated(&channel, for_user, &tx)
693 .await?,
694 ))
695 } else {
696 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
697 channel,
698 )))
699 }
700 })
701 .await
702 }
703
704 /// Returns the details for the specified channel member.
705 pub async fn get_channel_participant_details(
706 &self,
707 channel_id: ChannelId,
708 user_id: UserId,
709 ) -> Result<Vec<proto::ChannelMember>> {
710 let (role, members) = self
711 .transaction(move |tx| async move {
712 let channel = self.get_channel_internal(channel_id, &tx).await?;
713 let role = self
714 .check_user_is_channel_participant(&channel, user_id, &tx)
715 .await?;
716 Ok((
717 role,
718 self.get_channel_participant_details_internal(&channel, &tx)
719 .await?,
720 ))
721 })
722 .await?;
723
724 if role == ChannelRole::Admin {
725 Ok(members
726 .into_iter()
727 .map(|channel_member| proto::ChannelMember {
728 role: channel_member.role.into(),
729 user_id: channel_member.user_id.to_proto(),
730 kind: if channel_member.accepted {
731 Kind::Member
732 } else {
733 Kind::Invitee
734 }
735 .into(),
736 })
737 .collect())
738 } else {
739 return Ok(members
740 .into_iter()
741 .filter_map(|member| {
742 if !member.accepted {
743 return None;
744 }
745 Some(proto::ChannelMember {
746 role: member.role.into(),
747 user_id: member.user_id.to_proto(),
748 kind: Kind::Member.into(),
749 })
750 })
751 .collect());
752 }
753 }
754
755 async fn get_channel_participant_details_internal(
756 &self,
757 channel: &channel::Model,
758 tx: &DatabaseTransaction,
759 ) -> Result<Vec<channel_member::Model>> {
760 Ok(channel_member::Entity::find()
761 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
762 .all(tx)
763 .await?)
764 }
765
766 /// Returns the participants in the given channel.
767 pub async fn get_channel_participants(
768 &self,
769 channel: &channel::Model,
770 tx: &DatabaseTransaction,
771 ) -> Result<Vec<UserId>> {
772 let participants = self
773 .get_channel_participant_details_internal(channel, tx)
774 .await?;
775 Ok(participants
776 .into_iter()
777 .map(|member| member.user_id)
778 .collect())
779 }
780
781 /// Returns whether the given user is an admin in the specified channel.
782 pub async fn check_user_is_channel_admin(
783 &self,
784 channel: &channel::Model,
785 user_id: UserId,
786 tx: &DatabaseTransaction,
787 ) -> Result<ChannelRole> {
788 let role = self.channel_role_for_user(channel, user_id, tx).await?;
789 match role {
790 Some(ChannelRole::Admin) => Ok(role.unwrap()),
791 Some(ChannelRole::Member)
792 | Some(ChannelRole::Talker)
793 | Some(ChannelRole::Banned)
794 | Some(ChannelRole::Guest)
795 | None => Err(anyhow!(
796 "user is not a channel admin or channel does not exist"
797 ))?,
798 }
799 }
800
801 /// Returns whether the given user is a member of the specified channel.
802 pub async fn check_user_is_channel_member(
803 &self,
804 channel: &channel::Model,
805 user_id: UserId,
806 tx: &DatabaseTransaction,
807 ) -> Result<ChannelRole> {
808 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
809 match channel_role {
810 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
811 Some(ChannelRole::Banned)
812 | Some(ChannelRole::Guest)
813 | Some(ChannelRole::Talker)
814 | None => Err(anyhow!(
815 "user is not a channel member or channel does not exist"
816 ))?,
817 }
818 }
819
820 /// Returns whether the given user is a participant in the specified channel.
821 pub async fn check_user_is_channel_participant(
822 &self,
823 channel: &channel::Model,
824 user_id: UserId,
825 tx: &DatabaseTransaction,
826 ) -> Result<ChannelRole> {
827 let role = self.channel_role_for_user(channel, user_id, tx).await?;
828 match role {
829 Some(ChannelRole::Admin)
830 | Some(ChannelRole::Member)
831 | Some(ChannelRole::Guest)
832 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
833 Some(ChannelRole::Banned) | None => Err(anyhow!(
834 "user is not a channel participant or channel does not exist"
835 ))?,
836 }
837 }
838
839 /// Returns a user's pending invite for the given channel, if one exists.
840 pub async fn pending_invite_for_channel(
841 &self,
842 channel: &channel::Model,
843 user_id: UserId,
844 tx: &DatabaseTransaction,
845 ) -> Result<Option<channel_member::Model>> {
846 let row = channel_member::Entity::find()
847 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
848 .filter(channel_member::Column::UserId.eq(user_id))
849 .filter(channel_member::Column::Accepted.eq(false))
850 .one(tx)
851 .await?;
852
853 Ok(row)
854 }
855
856 /// Returns the role for a user in the given channel.
857 pub async fn channel_role_for_user(
858 &self,
859 channel: &channel::Model,
860 user_id: UserId,
861 tx: &DatabaseTransaction,
862 ) -> Result<Option<ChannelRole>> {
863 let membership = channel_member::Entity::find()
864 .filter(
865 channel_member::Column::ChannelId
866 .eq(channel.root_id())
867 .and(channel_member::Column::UserId.eq(user_id))
868 .and(channel_member::Column::Accepted.eq(true)),
869 )
870 .one(tx)
871 .await?;
872
873 let Some(membership) = membership else {
874 return Ok(None);
875 };
876
877 if !membership.role.can_see_channel(channel.visibility) {
878 return Ok(None);
879 }
880
881 Ok(Some(membership.role))
882 }
883
884 // Get the descendants of the given set if channels, ordered by their
885 // path.
886 pub(crate) async fn get_channel_descendants_excluding_self(
887 &self,
888 channels: impl IntoIterator<Item = &channel::Model>,
889 tx: &DatabaseTransaction,
890 ) -> Result<Vec<channel::Model>> {
891 let mut filter = Condition::any();
892 for channel in channels.into_iter() {
893 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
894 }
895
896 if filter.is_empty() {
897 return Ok(vec![]);
898 }
899
900 Ok(channel::Entity::find()
901 .filter(filter)
902 .order_by_asc(Expr::cust("parent_path || id || '/'"))
903 .all(tx)
904 .await?)
905 }
906
907 /// Returns the channel with the given ID.
908 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
909 self.transaction(|tx| async move {
910 let channel = self.get_channel_internal(channel_id, &tx).await?;
911 self.check_user_is_channel_participant(&channel, user_id, &tx)
912 .await?;
913
914 Ok(Channel::from_model(channel))
915 })
916 .await
917 }
918
919 pub(crate) async fn get_channel_internal(
920 &self,
921 channel_id: ChannelId,
922 tx: &DatabaseTransaction,
923 ) -> Result<channel::Model> {
924 Ok(channel::Entity::find_by_id(channel_id)
925 .one(tx)
926 .await?
927 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
928 }
929
930 pub(crate) async fn get_or_create_channel_room(
931 &self,
932 channel_id: ChannelId,
933 live_kit_room: &str,
934 tx: &DatabaseTransaction,
935 ) -> Result<RoomId> {
936 let room = room::Entity::find()
937 .filter(room::Column::ChannelId.eq(channel_id))
938 .one(tx)
939 .await?;
940
941 let room_id = if let Some(room) = room {
942 room.id
943 } else {
944 let result = room::Entity::insert(room::ActiveModel {
945 channel_id: ActiveValue::Set(Some(channel_id)),
946 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
947 ..Default::default()
948 })
949 .exec(tx)
950 .await?;
951
952 result.last_insert_id
953 };
954
955 Ok(room_id)
956 }
957
958 /// Move a channel from one parent to another
959 pub async fn move_channel(
960 &self,
961 channel_id: ChannelId,
962 new_parent_id: ChannelId,
963 admin_id: UserId,
964 ) -> Result<(ChannelId, Vec<Channel>)> {
965 self.transaction(|tx| async move {
966 let channel = self.get_channel_internal(channel_id, &tx).await?;
967 self.check_user_is_channel_admin(&channel, admin_id, &tx)
968 .await?;
969 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
970
971 if new_parent.root_id() != channel.root_id() {
972 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
973 }
974
975 if new_parent
976 .ancestors_including_self()
977 .any(|id| id == channel.id)
978 {
979 Err(anyhow!(ErrorCode::CircularNesting))?;
980 }
981
982 if channel.visibility == ChannelVisibility::Public
983 && new_parent.visibility != ChannelVisibility::Public
984 {
985 Err(anyhow!(ErrorCode::BadPublicNesting))?;
986 }
987
988 let root_id = channel.root_id();
989 let old_path = format!("{}{}/", channel.parent_path, channel.id);
990 let new_path = format!("{}{}/", new_parent.path(), channel.id);
991
992 let mut model = channel.into_active_model();
993 model.parent_path = ActiveValue::Set(new_parent.path());
994 let channel = model.update(&*tx).await?;
995
996 let descendent_ids =
997 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
998 self.pool.get_database_backend(),
999 "
1000 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1001 WHERE parent_path LIKE $3 || '%'
1002 RETURNING id
1003 ",
1004 [old_path.clone().into(), new_path.into(), old_path.into()],
1005 ))
1006 .all(&*tx)
1007 .await?;
1008
1009 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1010
1011 let channels = channel::Entity::find()
1012 .filter(channel::Column::Id.is_in(all_moved_ids))
1013 .all(&*tx)
1014 .await?
1015 .into_iter()
1016 .map(|c| Channel::from_model(c))
1017 .collect::<Vec<_>>();
1018
1019 Ok((root_id, channels))
1020 })
1021 .await
1022 }
1023}
1024
1025#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1026enum QueryIds {
1027 Id,
1028}
1029
1030#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1031enum QueryUserIds {
1032 UserId,
1033}