1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 /// Adds a user to the specified channel.
101 pub async fn join_channel(
102 &self,
103 channel_id: ChannelId,
104 user_id: UserId,
105 connection: ConnectionId,
106 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
107 self.transaction(move |tx| async move {
108 let channel = self.get_channel_internal(channel_id, &*tx).await?;
109 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
110
111 let mut accept_invite_result = None;
112
113 if role.is_none() {
114 if let Some(invitation) = self
115 .pending_invite_for_channel(&channel, user_id, &*tx)
116 .await?
117 {
118 // note, this may be a parent channel
119 role = Some(invitation.role);
120 channel_member::Entity::update(channel_member::ActiveModel {
121 accepted: ActiveValue::Set(true),
122 ..invitation.into_active_model()
123 })
124 .exec(&*tx)
125 .await?;
126
127 accept_invite_result = Some(
128 self.calculate_membership_updated(&channel, user_id, &*tx)
129 .await?,
130 );
131
132 debug_assert!(
133 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
134 );
135 } else if channel.visibility == ChannelVisibility::Public {
136 role = Some(ChannelRole::Guest);
137 channel_member::Entity::insert(channel_member::ActiveModel {
138 id: ActiveValue::NotSet,
139 channel_id: ActiveValue::Set(channel.root_id()),
140 user_id: ActiveValue::Set(user_id),
141 accepted: ActiveValue::Set(true),
142 role: ActiveValue::Set(ChannelRole::Guest),
143 })
144 .exec(&*tx)
145 .await?;
146
147 accept_invite_result = Some(
148 self.calculate_membership_updated(&channel, user_id, &*tx)
149 .await?,
150 );
151
152 debug_assert!(
153 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
154 );
155 }
156 }
157
158 if role.is_none() || role == Some(ChannelRole::Banned) {
159 Err(ErrorCode::Forbidden.anyhow())?
160 }
161 let role = role.unwrap();
162
163 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
164 let room_id = self
165 .get_or_create_channel_room(channel_id, &live_kit_room, &*tx)
166 .await?;
167
168 self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
169 .await
170 .map(|jr| (jr, accept_invite_result, role))
171 })
172 .await
173 }
174
175 /// Sets the visibility of the given channel.
176 pub async fn set_channel_visibility(
177 &self,
178 channel_id: ChannelId,
179 visibility: ChannelVisibility,
180 admin_id: UserId,
181 ) -> Result<(Channel, Vec<channel_member::Model>)> {
182 self.transaction(move |tx| async move {
183 let channel = self.get_channel_internal(channel_id, &*tx).await?;
184 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
185 .await?;
186
187 if visibility == ChannelVisibility::Public {
188 if let Some(parent_id) = channel.parent_id() {
189 let parent = self.get_channel_internal(parent_id, &*tx).await?;
190
191 if parent.visibility != ChannelVisibility::Public {
192 Err(ErrorCode::BadPublicNesting
193 .with_tag("direction", "parent")
194 .anyhow())?;
195 }
196 }
197 } else if visibility == ChannelVisibility::Members {
198 if self
199 .get_channel_descendants_excluding_self([&channel], &*tx)
200 .await?
201 .into_iter()
202 .any(|channel| channel.visibility == ChannelVisibility::Public)
203 {
204 Err(ErrorCode::BadPublicNesting
205 .with_tag("direction", "children")
206 .anyhow())?;
207 }
208 }
209
210 let mut model = channel.into_active_model();
211 model.visibility = ActiveValue::Set(visibility);
212 let channel = model.update(&*tx).await?;
213
214 let channel_members = channel_member::Entity::find()
215 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
216 .all(&*tx)
217 .await?;
218
219 Ok((Channel::from_model(channel), channel_members))
220 })
221 .await
222 }
223
224 #[cfg(test)]
225 pub async fn set_channel_requires_zed_cla(
226 &self,
227 channel_id: ChannelId,
228 requires_zed_cla: bool,
229 ) -> Result<()> {
230 self.transaction(move |tx| async move {
231 let channel = self.get_channel_internal(channel_id, &*tx).await?;
232 let mut model = channel.into_active_model();
233 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
234 model.update(&*tx).await?;
235 Ok(())
236 })
237 .await
238 }
239
240 /// Deletes the channel with the specified ID.
241 pub async fn delete_channel(
242 &self,
243 channel_id: ChannelId,
244 user_id: UserId,
245 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
246 self.transaction(move |tx| async move {
247 let channel = self.get_channel_internal(channel_id, &*tx).await?;
248 self.check_user_is_channel_admin(&channel, user_id, &*tx)
249 .await?;
250
251 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
252 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
253 .select_only()
254 .column(channel_member::Column::UserId)
255 .distinct()
256 .into_values::<_, QueryUserIds>()
257 .all(&*tx)
258 .await?;
259
260 let channels_to_remove = self
261 .get_channel_descendants_excluding_self([&channel], &*tx)
262 .await?
263 .into_iter()
264 .map(|channel| channel.id)
265 .chain(Some(channel_id))
266 .collect::<Vec<_>>();
267
268 channel::Entity::delete_many()
269 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
270 .exec(&*tx)
271 .await?;
272
273 Ok((channels_to_remove, members_to_notify))
274 })
275 .await
276 }
277
278 /// Invites a user to a channel as a member.
279 pub async fn invite_channel_member(
280 &self,
281 channel_id: ChannelId,
282 invitee_id: UserId,
283 inviter_id: UserId,
284 role: ChannelRole,
285 ) -> Result<InviteMemberResult> {
286 self.transaction(move |tx| async move {
287 let channel = self.get_channel_internal(channel_id, &*tx).await?;
288 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
289 .await?;
290 if !channel.is_root() {
291 Err(ErrorCode::NotARootChannel.anyhow())?
292 }
293
294 channel_member::ActiveModel {
295 id: ActiveValue::NotSet,
296 channel_id: ActiveValue::Set(channel_id),
297 user_id: ActiveValue::Set(invitee_id),
298 accepted: ActiveValue::Set(false),
299 role: ActiveValue::Set(role),
300 }
301 .insert(&*tx)
302 .await?;
303
304 let channel = Channel::from_model(channel);
305
306 let notifications = self
307 .create_notification(
308 invitee_id,
309 rpc::Notification::ChannelInvitation {
310 channel_id: channel_id.to_proto(),
311 channel_name: channel.name.clone(),
312 inviter_id: inviter_id.to_proto(),
313 },
314 true,
315 &*tx,
316 )
317 .await?
318 .into_iter()
319 .collect();
320
321 Ok(InviteMemberResult {
322 channel,
323 notifications,
324 })
325 })
326 .await
327 }
328
329 fn sanitize_channel_name(name: &str) -> Result<&str> {
330 let new_name = name.trim().trim_start_matches('#');
331 if new_name == "" {
332 Err(anyhow!("channel name can't be blank"))?;
333 }
334 Ok(new_name)
335 }
336
337 /// Renames the specified channel.
338 pub async fn rename_channel(
339 &self,
340 channel_id: ChannelId,
341 admin_id: UserId,
342 new_name: &str,
343 ) -> Result<(Channel, Vec<channel_member::Model>)> {
344 self.transaction(move |tx| async move {
345 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
346
347 let channel = self.get_channel_internal(channel_id, &*tx).await?;
348 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
349 .await?;
350
351 let mut model = channel.into_active_model();
352 model.name = ActiveValue::Set(new_name.clone());
353 let channel = model.update(&*tx).await?;
354
355 let channel_members = channel_member::Entity::find()
356 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
357 .all(&*tx)
358 .await?;
359
360 Ok((Channel::from_model(channel), channel_members))
361 })
362 .await
363 }
364
365 /// accept or decline an invite to join a channel
366 pub async fn respond_to_channel_invite(
367 &self,
368 channel_id: ChannelId,
369 user_id: UserId,
370 accept: bool,
371 ) -> Result<RespondToChannelInvite> {
372 self.transaction(move |tx| async move {
373 let channel = self.get_channel_internal(channel_id, &*tx).await?;
374
375 let membership_update = if accept {
376 let rows_affected = channel_member::Entity::update_many()
377 .set(channel_member::ActiveModel {
378 accepted: ActiveValue::Set(accept),
379 ..Default::default()
380 })
381 .filter(
382 channel_member::Column::ChannelId
383 .eq(channel_id)
384 .and(channel_member::Column::UserId.eq(user_id))
385 .and(channel_member::Column::Accepted.eq(false)),
386 )
387 .exec(&*tx)
388 .await?
389 .rows_affected;
390
391 if rows_affected == 0 {
392 Err(anyhow!("no such invitation"))?;
393 }
394
395 Some(
396 self.calculate_membership_updated(&channel, user_id, &*tx)
397 .await?,
398 )
399 } else {
400 let rows_affected = channel_member::Entity::delete_many()
401 .filter(
402 channel_member::Column::ChannelId
403 .eq(channel_id)
404 .and(channel_member::Column::UserId.eq(user_id))
405 .and(channel_member::Column::Accepted.eq(false)),
406 )
407 .exec(&*tx)
408 .await?
409 .rows_affected;
410 if rows_affected == 0 {
411 Err(anyhow!("no such invitation"))?;
412 }
413
414 None
415 };
416
417 Ok(RespondToChannelInvite {
418 membership_update,
419 notifications: self
420 .mark_notification_as_read_with_response(
421 user_id,
422 &rpc::Notification::ChannelInvitation {
423 channel_id: channel_id.to_proto(),
424 channel_name: Default::default(),
425 inviter_id: Default::default(),
426 },
427 accept,
428 &*tx,
429 )
430 .await?
431 .into_iter()
432 .collect(),
433 })
434 })
435 .await
436 }
437
438 async fn calculate_membership_updated(
439 &self,
440 channel: &channel::Model,
441 user_id: UserId,
442 tx: &DatabaseTransaction,
443 ) -> Result<MembershipUpdated> {
444 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
445 let removed_channels = self
446 .get_channel_descendants_excluding_self([channel], &*tx)
447 .await?
448 .into_iter()
449 .map(|channel| channel.id)
450 .chain([channel.id])
451 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
452 .collect::<Vec<_>>();
453
454 Ok(MembershipUpdated {
455 channel_id: channel.id,
456 new_channels,
457 removed_channels,
458 })
459 }
460
461 /// Removes a channel member.
462 pub async fn remove_channel_member(
463 &self,
464 channel_id: ChannelId,
465 member_id: UserId,
466 admin_id: UserId,
467 ) -> Result<RemoveChannelMemberResult> {
468 self.transaction(|tx| async move {
469 let channel = self.get_channel_internal(channel_id, &*tx).await?;
470
471 if member_id != admin_id {
472 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
473 .await?;
474 }
475
476 let result = channel_member::Entity::delete_many()
477 .filter(
478 channel_member::Column::ChannelId
479 .eq(channel_id)
480 .and(channel_member::Column::UserId.eq(member_id)),
481 )
482 .exec(&*tx)
483 .await?;
484
485 if result.rows_affected == 0 {
486 Err(anyhow!("no such member"))?;
487 }
488
489 Ok(RemoveChannelMemberResult {
490 membership_update: self
491 .calculate_membership_updated(&channel, member_id, &*tx)
492 .await?,
493 notification_id: self
494 .remove_notification(
495 member_id,
496 rpc::Notification::ChannelInvitation {
497 channel_id: channel_id.to_proto(),
498 channel_name: Default::default(),
499 inviter_id: Default::default(),
500 },
501 &*tx,
502 )
503 .await?,
504 })
505 })
506 .await
507 }
508
509 /// Returns all channel invites for the user with the given ID.
510 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
511 self.transaction(|tx| async move {
512 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
513
514 let channel_invites = channel_member::Entity::find()
515 .filter(
516 channel_member::Column::UserId
517 .eq(user_id)
518 .and(channel_member::Column::Accepted.eq(false)),
519 )
520 .all(&*tx)
521 .await?;
522
523 for invite in channel_invites {
524 role_for_channel.insert(invite.channel_id, invite.role);
525 }
526
527 let channels = channel::Entity::find()
528 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
529 .all(&*tx)
530 .await?;
531
532 let channels = channels
533 .into_iter()
534 .filter_map(|channel| Some(Channel::from_model(channel)))
535 .collect();
536
537 Ok(channels)
538 })
539 .await
540 }
541
542 /// Returns all channels for the user with the given ID.
543 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
544 self.transaction(|tx| async move {
545 let tx = tx;
546
547 self.get_user_channels(user_id, None, &tx).await
548 })
549 .await
550 }
551
552 /// Returns all channels for the user with the given ID that are descendants
553 /// of the specified ancestor channel.
554 pub async fn get_user_channels(
555 &self,
556 user_id: UserId,
557 ancestor_channel: Option<&channel::Model>,
558 tx: &DatabaseTransaction,
559 ) -> Result<ChannelsForUser> {
560 let mut filter = channel_member::Column::UserId
561 .eq(user_id)
562 .and(channel_member::Column::Accepted.eq(true));
563
564 if let Some(ancestor) = ancestor_channel {
565 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
566 }
567
568 let channel_memberships = channel_member::Entity::find()
569 .filter(filter)
570 .all(&*tx)
571 .await?;
572
573 let channels = channel::Entity::find()
574 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
575 .all(&*tx)
576 .await?;
577
578 let mut descendants = self
579 .get_channel_descendants_excluding_self(channels.iter(), &*tx)
580 .await?;
581
582 for channel in channels {
583 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
584 descendants.insert(ix, channel);
585 }
586 }
587
588 let roles_by_channel_id = channel_memberships
589 .iter()
590 .map(|membership| (membership.channel_id, membership.role))
591 .collect::<HashMap<_, _>>();
592
593 let channels: Vec<Channel> = descendants
594 .into_iter()
595 .filter_map(|channel| {
596 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
597 if parent_role.can_see_channel(channel.visibility) {
598 Some(Channel::from_model(channel))
599 } else {
600 None
601 }
602 })
603 .collect();
604
605 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
606 enum QueryUserIdsAndChannelIds {
607 ChannelId,
608 UserId,
609 }
610
611 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
612 {
613 let mut rows = room_participant::Entity::find()
614 .inner_join(room::Entity)
615 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
616 .select_only()
617 .column(room::Column::ChannelId)
618 .column(room_participant::Column::UserId)
619 .into_values::<_, QueryUserIdsAndChannelIds>()
620 .stream(&*tx)
621 .await?;
622 while let Some(row) = rows.next().await {
623 let row: (ChannelId, UserId) = row?;
624 channel_participants.entry(row.0).or_default().push(row.1)
625 }
626 }
627
628 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
629
630 let mut channel_ids_by_buffer_id = HashMap::default();
631 let mut rows = buffer::Entity::find()
632 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
633 .stream(&*tx)
634 .await?;
635 while let Some(row) = rows.next().await {
636 let row = row?;
637 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
638 }
639 drop(rows);
640
641 let latest_buffer_versions = self
642 .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
643 .await?;
644
645 let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
646
647 let observed_buffer_versions = self
648 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
649 .await?;
650
651 let observed_channel_messages = self
652 .observed_channel_messages(&channel_ids, user_id, &*tx)
653 .await?;
654
655 let hosted_projects = self
656 .get_hosted_projects(&channel_ids, &roles_by_channel_id, &*tx)
657 .await?;
658
659 Ok(ChannelsForUser {
660 channel_memberships,
661 channels,
662 hosted_projects,
663 channel_participants,
664 latest_buffer_versions,
665 latest_channel_messages,
666 observed_buffer_versions,
667 observed_channel_messages,
668 })
669 }
670
671 /// Sets the role for the specified channel member.
672 pub async fn set_channel_member_role(
673 &self,
674 channel_id: ChannelId,
675 admin_id: UserId,
676 for_user: UserId,
677 role: ChannelRole,
678 ) -> Result<SetMemberRoleResult> {
679 self.transaction(|tx| async move {
680 let channel = self.get_channel_internal(channel_id, &*tx).await?;
681 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
682 .await?;
683
684 let membership = channel_member::Entity::find()
685 .filter(
686 channel_member::Column::ChannelId
687 .eq(channel_id)
688 .and(channel_member::Column::UserId.eq(for_user)),
689 )
690 .one(&*tx)
691 .await?;
692
693 let Some(membership) = membership else {
694 Err(anyhow!("no such member"))?
695 };
696
697 let mut update = membership.into_active_model();
698 update.role = ActiveValue::Set(role);
699 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
700
701 if updated.accepted {
702 Ok(SetMemberRoleResult::MembershipUpdated(
703 self.calculate_membership_updated(&channel, for_user, &*tx)
704 .await?,
705 ))
706 } else {
707 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
708 channel,
709 )))
710 }
711 })
712 .await
713 }
714
715 /// Returns the details for the specified channel member.
716 pub async fn get_channel_participant_details(
717 &self,
718 channel_id: ChannelId,
719 user_id: UserId,
720 ) -> Result<Vec<proto::ChannelMember>> {
721 let (role, members) = self
722 .transaction(move |tx| async move {
723 let channel = self.get_channel_internal(channel_id, &*tx).await?;
724 let role = self
725 .check_user_is_channel_participant(&channel, user_id, &*tx)
726 .await?;
727 Ok((
728 role,
729 self.get_channel_participant_details_internal(&channel, &*tx)
730 .await?,
731 ))
732 })
733 .await?;
734
735 if role == ChannelRole::Admin {
736 Ok(members
737 .into_iter()
738 .map(|channel_member| proto::ChannelMember {
739 role: channel_member.role.into(),
740 user_id: channel_member.user_id.to_proto(),
741 kind: if channel_member.accepted {
742 Kind::Member
743 } else {
744 Kind::Invitee
745 }
746 .into(),
747 })
748 .collect())
749 } else {
750 return Ok(members
751 .into_iter()
752 .filter_map(|member| {
753 if !member.accepted {
754 return None;
755 }
756 Some(proto::ChannelMember {
757 role: member.role.into(),
758 user_id: member.user_id.to_proto(),
759 kind: Kind::Member.into(),
760 })
761 })
762 .collect());
763 }
764 }
765
766 async fn get_channel_participant_details_internal(
767 &self,
768 channel: &channel::Model,
769 tx: &DatabaseTransaction,
770 ) -> Result<Vec<channel_member::Model>> {
771 Ok(channel_member::Entity::find()
772 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
773 .all(tx)
774 .await?)
775 }
776
777 /// Returns the participants in the given channel.
778 pub async fn get_channel_participants(
779 &self,
780 channel: &channel::Model,
781 tx: &DatabaseTransaction,
782 ) -> Result<Vec<UserId>> {
783 let participants = self
784 .get_channel_participant_details_internal(channel, &*tx)
785 .await?;
786 Ok(participants
787 .into_iter()
788 .map(|member| member.user_id)
789 .collect())
790 }
791
792 /// Returns whether the given user is an admin in the specified channel.
793 pub async fn check_user_is_channel_admin(
794 &self,
795 channel: &channel::Model,
796 user_id: UserId,
797 tx: &DatabaseTransaction,
798 ) -> Result<ChannelRole> {
799 let role = self.channel_role_for_user(channel, user_id, tx).await?;
800 match role {
801 Some(ChannelRole::Admin) => Ok(role.unwrap()),
802 Some(ChannelRole::Member)
803 | Some(ChannelRole::Talker)
804 | Some(ChannelRole::Banned)
805 | Some(ChannelRole::Guest)
806 | None => Err(anyhow!(
807 "user is not a channel admin or channel does not exist"
808 ))?,
809 }
810 }
811
812 /// Returns whether the given user is a member of the specified channel.
813 pub async fn check_user_is_channel_member(
814 &self,
815 channel: &channel::Model,
816 user_id: UserId,
817 tx: &DatabaseTransaction,
818 ) -> Result<ChannelRole> {
819 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
820 match channel_role {
821 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
822 Some(ChannelRole::Banned)
823 | Some(ChannelRole::Guest)
824 | Some(ChannelRole::Talker)
825 | None => Err(anyhow!(
826 "user is not a channel member or channel does not exist"
827 ))?,
828 }
829 }
830
831 /// Returns whether the given user is a participant in the specified channel.
832 pub async fn check_user_is_channel_participant(
833 &self,
834 channel: &channel::Model,
835 user_id: UserId,
836 tx: &DatabaseTransaction,
837 ) -> Result<ChannelRole> {
838 let role = self.channel_role_for_user(channel, user_id, tx).await?;
839 match role {
840 Some(ChannelRole::Admin)
841 | Some(ChannelRole::Member)
842 | Some(ChannelRole::Guest)
843 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
844 Some(ChannelRole::Banned) | None => Err(anyhow!(
845 "user is not a channel participant or channel does not exist"
846 ))?,
847 }
848 }
849
850 /// Returns a user's pending invite for the given channel, if one exists.
851 pub async fn pending_invite_for_channel(
852 &self,
853 channel: &channel::Model,
854 user_id: UserId,
855 tx: &DatabaseTransaction,
856 ) -> Result<Option<channel_member::Model>> {
857 let row = channel_member::Entity::find()
858 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
859 .filter(channel_member::Column::UserId.eq(user_id))
860 .filter(channel_member::Column::Accepted.eq(false))
861 .one(&*tx)
862 .await?;
863
864 Ok(row)
865 }
866
867 /// Returns the role for a user in the given channel.
868 pub async fn channel_role_for_user(
869 &self,
870 channel: &channel::Model,
871 user_id: UserId,
872 tx: &DatabaseTransaction,
873 ) -> Result<Option<ChannelRole>> {
874 let membership = channel_member::Entity::find()
875 .filter(
876 channel_member::Column::ChannelId
877 .eq(channel.root_id())
878 .and(channel_member::Column::UserId.eq(user_id))
879 .and(channel_member::Column::Accepted.eq(true)),
880 )
881 .one(&*tx)
882 .await?;
883
884 let Some(membership) = membership else {
885 return Ok(None);
886 };
887
888 if !membership.role.can_see_channel(channel.visibility) {
889 return Ok(None);
890 }
891
892 Ok(Some(membership.role))
893 }
894
895 // Get the descendants of the given set if channels, ordered by their
896 // path.
897 pub(crate) async fn get_channel_descendants_excluding_self(
898 &self,
899 channels: impl IntoIterator<Item = &channel::Model>,
900 tx: &DatabaseTransaction,
901 ) -> Result<Vec<channel::Model>> {
902 let mut filter = Condition::any();
903 for channel in channels.into_iter() {
904 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
905 }
906
907 if filter.is_empty() {
908 return Ok(vec![]);
909 }
910
911 Ok(channel::Entity::find()
912 .filter(filter)
913 .order_by_asc(Expr::cust("parent_path || id || '/'"))
914 .all(tx)
915 .await?)
916 }
917
918 /// Returns the channel with the given ID.
919 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
920 self.transaction(|tx| async move {
921 let channel = self.get_channel_internal(channel_id, &*tx).await?;
922 self.check_user_is_channel_participant(&channel, user_id, &*tx)
923 .await?;
924
925 Ok(Channel::from_model(channel))
926 })
927 .await
928 }
929
930 pub(crate) async fn get_channel_internal(
931 &self,
932 channel_id: ChannelId,
933 tx: &DatabaseTransaction,
934 ) -> Result<channel::Model> {
935 Ok(channel::Entity::find_by_id(channel_id)
936 .one(&*tx)
937 .await?
938 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
939 }
940
941 pub(crate) async fn get_or_create_channel_room(
942 &self,
943 channel_id: ChannelId,
944 live_kit_room: &str,
945 tx: &DatabaseTransaction,
946 ) -> Result<RoomId> {
947 let room = room::Entity::find()
948 .filter(room::Column::ChannelId.eq(channel_id))
949 .one(&*tx)
950 .await?;
951
952 let room_id = if let Some(room) = room {
953 room.id
954 } else {
955 let result = room::Entity::insert(room::ActiveModel {
956 channel_id: ActiveValue::Set(Some(channel_id)),
957 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
958 ..Default::default()
959 })
960 .exec(&*tx)
961 .await?;
962
963 result.last_insert_id
964 };
965
966 Ok(room_id)
967 }
968
969 /// Move a channel from one parent to another
970 pub async fn move_channel(
971 &self,
972 channel_id: ChannelId,
973 new_parent_id: ChannelId,
974 admin_id: UserId,
975 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
976 self.transaction(|tx| async move {
977 let channel = self.get_channel_internal(channel_id, &*tx).await?;
978 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
979 .await?;
980 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
981
982 if new_parent.root_id() != channel.root_id() {
983 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
984 }
985
986 if new_parent
987 .ancestors_including_self()
988 .any(|id| id == channel.id)
989 {
990 Err(anyhow!(ErrorCode::CircularNesting))?;
991 }
992
993 if channel.visibility == ChannelVisibility::Public
994 && new_parent.visibility != ChannelVisibility::Public
995 {
996 Err(anyhow!(ErrorCode::BadPublicNesting))?;
997 }
998
999 let root_id = channel.root_id();
1000 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1001 let new_path = format!("{}{}/", new_parent.path(), channel.id);
1002
1003 let mut model = channel.into_active_model();
1004 model.parent_path = ActiveValue::Set(new_parent.path());
1005 let channel = model.update(&*tx).await?;
1006
1007 let descendent_ids =
1008 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1009 self.pool.get_database_backend(),
1010 "
1011 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1012 WHERE parent_path LIKE $3 || '%'
1013 RETURNING id
1014 ",
1015 [old_path.clone().into(), new_path.into(), old_path.into()],
1016 ))
1017 .all(&*tx)
1018 .await?;
1019
1020 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1021
1022 let channels = channel::Entity::find()
1023 .filter(channel::Column::Id.is_in(all_moved_ids))
1024 .all(&*tx)
1025 .await?
1026 .into_iter()
1027 .map(|c| Channel::from_model(c))
1028 .collect::<Vec<_>>();
1029
1030 let channel_members = channel_member::Entity::find()
1031 .filter(channel_member::Column::ChannelId.eq(root_id))
1032 .all(&*tx)
1033 .await?;
1034
1035 Ok((channels, channel_members))
1036 })
1037 .await
1038 }
1039}
1040
1041#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1042enum QueryIds {
1043 Id,
1044}
1045
1046#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1047enum QueryUserIds {
1048 UserId,
1049}