1use super::*;
2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
3use sea_orm::TryGetableMany;
4
5impl Database {
6 #[cfg(test)]
7 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
8 self.transaction(move |tx| async move {
9 let mut channels = Vec::new();
10 let mut rows = channel::Entity::find().stream(&*tx).await?;
11 while let Some(row) = rows.next().await {
12 let row = row?;
13 channels.push((row.id, row.name));
14 }
15 Ok(channels)
16 })
17 .await
18 }
19
20 #[cfg(test)]
21 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
22 Ok(self.create_channel(name, None, creator_id).await?.0.id)
23 }
24
25 #[cfg(test)]
26 pub async fn create_sub_channel(
27 &self,
28 name: &str,
29 parent: ChannelId,
30 creator_id: UserId,
31 ) -> Result<ChannelId> {
32 Ok(self
33 .create_channel(name, Some(parent), creator_id)
34 .await?
35 .0
36 .id)
37 }
38
39 /// Creates a new channel.
40 pub async fn create_channel(
41 &self,
42 name: &str,
43 parent_channel_id: Option<ChannelId>,
44 admin_id: UserId,
45 ) -> Result<(
46 Channel,
47 Option<channel_member::Model>,
48 Vec<channel_member::Model>,
49 )> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 let channel_members = channel_member::Entity::find()
91 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
92 .all(&*tx)
93 .await?;
94
95 Ok((Channel::from_model(channel), membership, channel_members))
96 })
97 .await
98 }
99
100 pub async fn set_in_channel_call(
101 &self,
102 channel_id: ChannelId,
103 user_id: UserId,
104 in_call: bool,
105 ) -> Result<(proto::Room, ChannelRole)> {
106 self.transaction(move |tx| async move {
107 let channel = self.get_channel_internal(channel_id, &*tx).await?;
108 let role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
109 if role.is_none() || role == Some(ChannelRole::Banned) {
110 Err(ErrorCode::Forbidden.anyhow())?
111 }
112 let role = role.unwrap();
113
114 let Some(room) = room::Entity::find()
115 .filter(room::Column::ChannelId.eq(channel_id))
116 .one(&*tx)
117 .await?
118 else {
119 Err(anyhow!("no room exists"))?
120 };
121
122 let result = room_participant::Entity::update_many()
123 .filter(
124 Condition::all()
125 .add(room_participant::Column::RoomId.eq(room.id))
126 .add(room_participant::Column::UserId.eq(user_id)),
127 )
128 .set(room_participant::ActiveModel {
129 in_call: ActiveValue::Set(in_call),
130 ..Default::default()
131 })
132 .exec(&*tx)
133 .await?;
134
135 if result.rows_affected != 1 {
136 Err(anyhow!("not in channel"))?
137 }
138
139 let room = self.get_room(room.id, &*tx).await?;
140 Ok((room, role))
141 })
142 .await
143 }
144
145 /// Adds a user to the specified channel.
146 pub async fn join_channel(
147 &self,
148 channel_id: ChannelId,
149 user_id: UserId,
150 autojoin: bool,
151 connection: ConnectionId,
152 environment: &str,
153 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
154 self.transaction(move |tx| async move {
155 let channel = self.get_channel_internal(channel_id, &*tx).await?;
156 let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
157
158 let mut accept_invite_result = None;
159
160 if role.is_none() {
161 if let Some(invitation) = self
162 .pending_invite_for_channel(&channel, user_id, &*tx)
163 .await?
164 {
165 // note, this may be a parent channel
166 role = Some(invitation.role);
167 channel_member::Entity::update(channel_member::ActiveModel {
168 accepted: ActiveValue::Set(true),
169 ..invitation.into_active_model()
170 })
171 .exec(&*tx)
172 .await?;
173
174 accept_invite_result = Some(
175 self.calculate_membership_updated(&channel, user_id, &*tx)
176 .await?,
177 );
178
179 debug_assert!(
180 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
181 );
182 } else if channel.visibility == ChannelVisibility::Public {
183 role = Some(ChannelRole::Guest);
184 channel_member::Entity::insert(channel_member::ActiveModel {
185 id: ActiveValue::NotSet,
186 channel_id: ActiveValue::Set(channel.root_id()),
187 user_id: ActiveValue::Set(user_id),
188 accepted: ActiveValue::Set(true),
189 role: ActiveValue::Set(ChannelRole::Guest),
190 })
191 .exec(&*tx)
192 .await?;
193
194 accept_invite_result = Some(
195 self.calculate_membership_updated(&channel, user_id, &*tx)
196 .await?,
197 );
198
199 debug_assert!(
200 self.channel_role_for_user(&channel, user_id, &*tx).await? == role
201 );
202 }
203 }
204
205 if role.is_none() || role == Some(ChannelRole::Banned) {
206 Err(ErrorCode::Forbidden.anyhow())?
207 }
208 let role = role.unwrap();
209
210 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
211 let room_id = self
212 .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
213 .await?;
214
215 self.join_channel_room_internal(room_id, user_id, autojoin, connection, role, &*tx)
216 .await
217 .map(|jr| (jr, accept_invite_result, role))
218 })
219 .await
220 }
221
222 /// Sets the visibility of the given channel.
223 pub async fn set_channel_visibility(
224 &self,
225 channel_id: ChannelId,
226 visibility: ChannelVisibility,
227 admin_id: UserId,
228 ) -> Result<(Channel, Vec<channel_member::Model>)> {
229 self.transaction(move |tx| async move {
230 let channel = self.get_channel_internal(channel_id, &*tx).await?;
231 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
232 .await?;
233
234 if visibility == ChannelVisibility::Public {
235 if let Some(parent_id) = channel.parent_id() {
236 let parent = self.get_channel_internal(parent_id, &*tx).await?;
237
238 if parent.visibility != ChannelVisibility::Public {
239 Err(ErrorCode::BadPublicNesting
240 .with_tag("direction", "parent")
241 .anyhow())?;
242 }
243 }
244 } else if visibility == ChannelVisibility::Members {
245 if self
246 .get_channel_descendants_excluding_self([&channel], &*tx)
247 .await?
248 .into_iter()
249 .any(|channel| channel.visibility == ChannelVisibility::Public)
250 {
251 Err(ErrorCode::BadPublicNesting
252 .with_tag("direction", "children")
253 .anyhow())?;
254 }
255 }
256
257 let mut model = channel.into_active_model();
258 model.visibility = ActiveValue::Set(visibility);
259 let channel = model.update(&*tx).await?;
260
261 let channel_members = channel_member::Entity::find()
262 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
263 .all(&*tx)
264 .await?;
265
266 Ok((Channel::from_model(channel), channel_members))
267 })
268 .await
269 }
270
271 #[cfg(test)]
272 pub async fn set_channel_requires_zed_cla(
273 &self,
274 channel_id: ChannelId,
275 requires_zed_cla: bool,
276 ) -> Result<()> {
277 self.transaction(move |tx| async move {
278 let channel = self.get_channel_internal(channel_id, &*tx).await?;
279 let mut model = channel.into_active_model();
280 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
281 model.update(&*tx).await?;
282 Ok(())
283 })
284 .await
285 }
286
287 /// Deletes the channel with the specified ID.
288 pub async fn delete_channel(
289 &self,
290 channel_id: ChannelId,
291 user_id: UserId,
292 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
293 self.transaction(move |tx| async move {
294 let channel = self.get_channel_internal(channel_id, &*tx).await?;
295 self.check_user_is_channel_admin(&channel, user_id, &*tx)
296 .await?;
297
298 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
299 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
300 .select_only()
301 .column(channel_member::Column::UserId)
302 .distinct()
303 .into_values::<_, QueryUserIds>()
304 .all(&*tx)
305 .await?;
306
307 let channels_to_remove = self
308 .get_channel_descendants_excluding_self([&channel], &*tx)
309 .await?
310 .into_iter()
311 .map(|channel| channel.id)
312 .chain(Some(channel_id))
313 .collect::<Vec<_>>();
314
315 channel::Entity::delete_many()
316 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
317 .exec(&*tx)
318 .await?;
319
320 Ok((channels_to_remove, members_to_notify))
321 })
322 .await
323 }
324
325 /// Invites a user to a channel as a member.
326 pub async fn invite_channel_member(
327 &self,
328 channel_id: ChannelId,
329 invitee_id: UserId,
330 inviter_id: UserId,
331 role: ChannelRole,
332 ) -> Result<InviteMemberResult> {
333 self.transaction(move |tx| async move {
334 let channel = self.get_channel_internal(channel_id, &*tx).await?;
335 self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
336 .await?;
337 if !channel.is_root() {
338 Err(ErrorCode::NotARootChannel.anyhow())?
339 }
340
341 channel_member::ActiveModel {
342 id: ActiveValue::NotSet,
343 channel_id: ActiveValue::Set(channel_id),
344 user_id: ActiveValue::Set(invitee_id),
345 accepted: ActiveValue::Set(false),
346 role: ActiveValue::Set(role),
347 }
348 .insert(&*tx)
349 .await?;
350
351 let channel = Channel::from_model(channel);
352
353 let notifications = self
354 .create_notification(
355 invitee_id,
356 rpc::Notification::ChannelInvitation {
357 channel_id: channel_id.to_proto(),
358 channel_name: channel.name.clone(),
359 inviter_id: inviter_id.to_proto(),
360 },
361 true,
362 &*tx,
363 )
364 .await?
365 .into_iter()
366 .collect();
367
368 Ok(InviteMemberResult {
369 channel,
370 notifications,
371 })
372 })
373 .await
374 }
375
376 fn sanitize_channel_name(name: &str) -> Result<&str> {
377 let new_name = name.trim().trim_start_matches('#');
378 if new_name == "" {
379 Err(anyhow!("channel name can't be blank"))?;
380 }
381 Ok(new_name)
382 }
383
384 /// Renames the specified channel.
385 pub async fn rename_channel(
386 &self,
387 channel_id: ChannelId,
388 admin_id: UserId,
389 new_name: &str,
390 ) -> Result<(Channel, Vec<channel_member::Model>)> {
391 self.transaction(move |tx| async move {
392 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
393
394 let channel = self.get_channel_internal(channel_id, &*tx).await?;
395 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
396 .await?;
397
398 let mut model = channel.into_active_model();
399 model.name = ActiveValue::Set(new_name.clone());
400 let channel = model.update(&*tx).await?;
401
402 let channel_members = channel_member::Entity::find()
403 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
404 .all(&*tx)
405 .await?;
406
407 Ok((Channel::from_model(channel), channel_members))
408 })
409 .await
410 }
411
412 /// accept or decline an invite to join a channel
413 pub async fn respond_to_channel_invite(
414 &self,
415 channel_id: ChannelId,
416 user_id: UserId,
417 accept: bool,
418 ) -> Result<RespondToChannelInvite> {
419 self.transaction(move |tx| async move {
420 let channel = self.get_channel_internal(channel_id, &*tx).await?;
421
422 let membership_update = if accept {
423 let rows_affected = channel_member::Entity::update_many()
424 .set(channel_member::ActiveModel {
425 accepted: ActiveValue::Set(accept),
426 ..Default::default()
427 })
428 .filter(
429 channel_member::Column::ChannelId
430 .eq(channel_id)
431 .and(channel_member::Column::UserId.eq(user_id))
432 .and(channel_member::Column::Accepted.eq(false)),
433 )
434 .exec(&*tx)
435 .await?
436 .rows_affected;
437
438 if rows_affected == 0 {
439 Err(anyhow!("no such invitation"))?;
440 }
441
442 Some(
443 self.calculate_membership_updated(&channel, user_id, &*tx)
444 .await?,
445 )
446 } else {
447 let rows_affected = channel_member::Entity::delete_many()
448 .filter(
449 channel_member::Column::ChannelId
450 .eq(channel_id)
451 .and(channel_member::Column::UserId.eq(user_id))
452 .and(channel_member::Column::Accepted.eq(false)),
453 )
454 .exec(&*tx)
455 .await?
456 .rows_affected;
457 if rows_affected == 0 {
458 Err(anyhow!("no such invitation"))?;
459 }
460
461 None
462 };
463
464 Ok(RespondToChannelInvite {
465 membership_update,
466 notifications: self
467 .mark_notification_as_read_with_response(
468 user_id,
469 &rpc::Notification::ChannelInvitation {
470 channel_id: channel_id.to_proto(),
471 channel_name: Default::default(),
472 inviter_id: Default::default(),
473 },
474 accept,
475 &*tx,
476 )
477 .await?
478 .into_iter()
479 .collect(),
480 })
481 })
482 .await
483 }
484
485 async fn calculate_membership_updated(
486 &self,
487 channel: &channel::Model,
488 user_id: UserId,
489 tx: &DatabaseTransaction,
490 ) -> Result<MembershipUpdated> {
491 let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
492 let removed_channels = self
493 .get_channel_descendants_excluding_self([channel], &*tx)
494 .await?
495 .into_iter()
496 .map(|channel| channel.id)
497 .chain([channel.id])
498 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
499 .collect::<Vec<_>>();
500
501 Ok(MembershipUpdated {
502 channel_id: channel.id,
503 new_channels,
504 removed_channels,
505 })
506 }
507
508 /// Removes a channel member.
509 pub async fn remove_channel_member(
510 &self,
511 channel_id: ChannelId,
512 member_id: UserId,
513 admin_id: UserId,
514 ) -> Result<RemoveChannelMemberResult> {
515 self.transaction(|tx| async move {
516 let channel = self.get_channel_internal(channel_id, &*tx).await?;
517
518 if member_id != admin_id {
519 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
520 .await?;
521 }
522
523 let result = channel_member::Entity::delete_many()
524 .filter(
525 channel_member::Column::ChannelId
526 .eq(channel_id)
527 .and(channel_member::Column::UserId.eq(member_id)),
528 )
529 .exec(&*tx)
530 .await?;
531
532 if result.rows_affected == 0 {
533 Err(anyhow!("no such member"))?;
534 }
535
536 Ok(RemoveChannelMemberResult {
537 membership_update: self
538 .calculate_membership_updated(&channel, member_id, &*tx)
539 .await?,
540 notification_id: self
541 .remove_notification(
542 member_id,
543 rpc::Notification::ChannelInvitation {
544 channel_id: channel_id.to_proto(),
545 channel_name: Default::default(),
546 inviter_id: Default::default(),
547 },
548 &*tx,
549 )
550 .await?,
551 })
552 })
553 .await
554 }
555
556 /// Returns all channel invites for the user with the given ID.
557 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
558 self.transaction(|tx| async move {
559 let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
560
561 let channel_invites = channel_member::Entity::find()
562 .filter(
563 channel_member::Column::UserId
564 .eq(user_id)
565 .and(channel_member::Column::Accepted.eq(false)),
566 )
567 .all(&*tx)
568 .await?;
569
570 for invite in channel_invites {
571 role_for_channel.insert(invite.channel_id, invite.role);
572 }
573
574 let channels = channel::Entity::find()
575 .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
576 .all(&*tx)
577 .await?;
578
579 let channels = channels
580 .into_iter()
581 .filter_map(|channel| Some(Channel::from_model(channel)))
582 .collect();
583
584 Ok(channels)
585 })
586 .await
587 }
588
589 /// Returns all channels for the user with the given ID.
590 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
591 self.transaction(|tx| async move {
592 let tx = tx;
593
594 self.get_user_channels(user_id, None, &tx).await
595 })
596 .await
597 }
598
599 /// Returns all channels for the user with the given ID that are descendants
600 /// of the specified ancestor channel.
601 pub async fn get_user_channels(
602 &self,
603 user_id: UserId,
604 ancestor_channel: Option<&channel::Model>,
605 tx: &DatabaseTransaction,
606 ) -> Result<ChannelsForUser> {
607 let mut filter = channel_member::Column::UserId
608 .eq(user_id)
609 .and(channel_member::Column::Accepted.eq(true));
610
611 if let Some(ancestor) = ancestor_channel {
612 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
613 }
614
615 let channel_memberships = channel_member::Entity::find()
616 .filter(filter)
617 .all(&*tx)
618 .await?;
619
620 let channels = channel::Entity::find()
621 .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
622 .all(&*tx)
623 .await?;
624
625 let mut descendants = self
626 .get_channel_descendants_excluding_self(channels.iter(), &*tx)
627 .await?;
628
629 for channel in channels {
630 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
631 descendants.insert(ix, channel);
632 }
633 }
634
635 let roles_by_channel_id = channel_memberships
636 .iter()
637 .map(|membership| (membership.channel_id, membership.role))
638 .collect::<HashMap<_, _>>();
639
640 let channels: Vec<Channel> = descendants
641 .into_iter()
642 .filter_map(|channel| {
643 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
644 if parent_role.can_see_channel(channel.visibility) {
645 Some(Channel::from_model(channel))
646 } else {
647 None
648 }
649 })
650 .collect();
651
652 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
653 enum QueryUserIdsAndChannelIds {
654 ChannelId,
655 UserId,
656 }
657
658 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
659 {
660 let mut rows = room_participant::Entity::find()
661 .inner_join(room::Entity)
662 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
663 .select_only()
664 .column(room::Column::ChannelId)
665 .column(room_participant::Column::UserId)
666 .into_values::<_, QueryUserIdsAndChannelIds>()
667 .stream(&*tx)
668 .await?;
669 while let Some(row) = rows.next().await {
670 let row: (ChannelId, UserId) = row?;
671 channel_participants.entry(row.0).or_default().push(row.1)
672 }
673 }
674
675 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
676
677 let mut channel_ids_by_buffer_id = HashMap::default();
678 let mut rows = buffer::Entity::find()
679 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
680 .stream(&*tx)
681 .await?;
682 while let Some(row) = rows.next().await {
683 let row = row?;
684 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
685 }
686 drop(rows);
687
688 let latest_buffer_versions = self
689 .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
690 .await?;
691
692 let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
693
694 let observed_buffer_versions = self
695 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
696 .await?;
697
698 let observed_channel_messages = self
699 .observed_channel_messages(&channel_ids, user_id, &*tx)
700 .await?;
701
702 Ok(ChannelsForUser {
703 channel_memberships,
704 channels,
705 channel_participants,
706 latest_buffer_versions,
707 latest_channel_messages,
708 observed_buffer_versions,
709 observed_channel_messages,
710 })
711 }
712
713 /// Sets the role for the specified channel member.
714 pub async fn set_channel_member_role(
715 &self,
716 channel_id: ChannelId,
717 admin_id: UserId,
718 for_user: UserId,
719 role: ChannelRole,
720 ) -> Result<SetMemberRoleResult> {
721 self.transaction(|tx| async move {
722 let channel = self.get_channel_internal(channel_id, &*tx).await?;
723 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
724 .await?;
725
726 let membership = channel_member::Entity::find()
727 .filter(
728 channel_member::Column::ChannelId
729 .eq(channel_id)
730 .and(channel_member::Column::UserId.eq(for_user)),
731 )
732 .one(&*tx)
733 .await?;
734
735 let Some(membership) = membership else {
736 Err(anyhow!("no such member"))?
737 };
738
739 let mut update = membership.into_active_model();
740 update.role = ActiveValue::Set(role);
741 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
742
743 if updated.accepted {
744 Ok(SetMemberRoleResult::MembershipUpdated(
745 self.calculate_membership_updated(&channel, for_user, &*tx)
746 .await?,
747 ))
748 } else {
749 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
750 channel,
751 )))
752 }
753 })
754 .await
755 }
756
757 /// Returns the details for the specified channel member.
758 pub async fn get_channel_participant_details(
759 &self,
760 channel_id: ChannelId,
761 user_id: UserId,
762 ) -> Result<Vec<proto::ChannelMember>> {
763 let (role, members) = self
764 .transaction(move |tx| async move {
765 let channel = self.get_channel_internal(channel_id, &*tx).await?;
766 let role = self
767 .check_user_is_channel_participant(&channel, user_id, &*tx)
768 .await?;
769 Ok((
770 role,
771 self.get_channel_participant_details_internal(&channel, &*tx)
772 .await?,
773 ))
774 })
775 .await?;
776
777 if role == ChannelRole::Admin {
778 Ok(members
779 .into_iter()
780 .map(|channel_member| proto::ChannelMember {
781 role: channel_member.role.into(),
782 user_id: channel_member.user_id.to_proto(),
783 kind: if channel_member.accepted {
784 Kind::Member
785 } else {
786 Kind::Invitee
787 }
788 .into(),
789 })
790 .collect())
791 } else {
792 return Ok(members
793 .into_iter()
794 .filter_map(|member| {
795 if !member.accepted {
796 return None;
797 }
798 Some(proto::ChannelMember {
799 role: member.role.into(),
800 user_id: member.user_id.to_proto(),
801 kind: Kind::Member.into(),
802 })
803 })
804 .collect());
805 }
806 }
807
808 async fn get_channel_participant_details_internal(
809 &self,
810 channel: &channel::Model,
811 tx: &DatabaseTransaction,
812 ) -> Result<Vec<channel_member::Model>> {
813 Ok(channel_member::Entity::find()
814 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
815 .all(tx)
816 .await?)
817 }
818
819 /// Returns the participants in the given channel.
820 pub async fn get_channel_participants(
821 &self,
822 channel: &channel::Model,
823 tx: &DatabaseTransaction,
824 ) -> Result<Vec<UserId>> {
825 let participants = self
826 .get_channel_participant_details_internal(channel, &*tx)
827 .await?;
828 Ok(participants
829 .into_iter()
830 .map(|member| member.user_id)
831 .collect())
832 }
833
834 /// Returns whether the given user is an admin in the specified channel.
835 pub async fn check_user_is_channel_admin(
836 &self,
837 channel: &channel::Model,
838 user_id: UserId,
839 tx: &DatabaseTransaction,
840 ) -> Result<ChannelRole> {
841 let role = self.channel_role_for_user(channel, user_id, tx).await?;
842 match role {
843 Some(ChannelRole::Admin) => Ok(role.unwrap()),
844 Some(ChannelRole::Member)
845 | Some(ChannelRole::Banned)
846 | Some(ChannelRole::Guest)
847 | None => Err(anyhow!(
848 "user is not a channel admin or channel does not exist"
849 ))?,
850 }
851 }
852
853 /// Returns whether the given user is a member of the specified channel.
854 pub async fn check_user_is_channel_member(
855 &self,
856 channel: &channel::Model,
857 user_id: UserId,
858 tx: &DatabaseTransaction,
859 ) -> Result<ChannelRole> {
860 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
861 match channel_role {
862 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
863 Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
864 "user is not a channel member or channel does not exist"
865 ))?,
866 }
867 }
868
869 /// Returns whether the given user is a participant in the specified channel.
870 pub async fn check_user_is_channel_participant(
871 &self,
872 channel: &channel::Model,
873 user_id: UserId,
874 tx: &DatabaseTransaction,
875 ) -> Result<ChannelRole> {
876 let role = self.channel_role_for_user(channel, user_id, tx).await?;
877 match role {
878 Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
879 Ok(role.unwrap())
880 }
881 Some(ChannelRole::Banned) | None => Err(anyhow!(
882 "user is not a channel participant or channel does not exist"
883 ))?,
884 }
885 }
886
887 /// Returns a user's pending invite for the given channel, if one exists.
888 pub async fn pending_invite_for_channel(
889 &self,
890 channel: &channel::Model,
891 user_id: UserId,
892 tx: &DatabaseTransaction,
893 ) -> Result<Option<channel_member::Model>> {
894 let row = channel_member::Entity::find()
895 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
896 .filter(channel_member::Column::UserId.eq(user_id))
897 .filter(channel_member::Column::Accepted.eq(false))
898 .one(&*tx)
899 .await?;
900
901 Ok(row)
902 }
903
904 /// Returns the role for a user in the given channel.
905 pub async fn channel_role_for_user(
906 &self,
907 channel: &channel::Model,
908 user_id: UserId,
909 tx: &DatabaseTransaction,
910 ) -> Result<Option<ChannelRole>> {
911 let membership = channel_member::Entity::find()
912 .filter(
913 channel_member::Column::ChannelId
914 .eq(channel.root_id())
915 .and(channel_member::Column::UserId.eq(user_id))
916 .and(channel_member::Column::Accepted.eq(true)),
917 )
918 .one(&*tx)
919 .await?;
920
921 let Some(membership) = membership else {
922 return Ok(None);
923 };
924
925 if !membership.role.can_see_channel(channel.visibility) {
926 return Ok(None);
927 }
928
929 Ok(Some(membership.role))
930 }
931
932 // Get the descendants of the given set if channels, ordered by their
933 // path.
934 pub(crate) async fn get_channel_descendants_excluding_self(
935 &self,
936 channels: impl IntoIterator<Item = &channel::Model>,
937 tx: &DatabaseTransaction,
938 ) -> Result<Vec<channel::Model>> {
939 let mut filter = Condition::any();
940 for channel in channels.into_iter() {
941 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
942 }
943
944 if filter.is_empty() {
945 return Ok(vec![]);
946 }
947
948 Ok(channel::Entity::find()
949 .filter(filter)
950 .order_by_asc(Expr::cust("parent_path || id || '/'"))
951 .all(tx)
952 .await?)
953 }
954
955 /// Returns the channel with the given ID.
956 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
957 self.transaction(|tx| async move {
958 let channel = self.get_channel_internal(channel_id, &*tx).await?;
959 self.check_user_is_channel_participant(&channel, user_id, &*tx)
960 .await?;
961
962 Ok(Channel::from_model(channel))
963 })
964 .await
965 }
966
967 pub(crate) async fn get_channel_internal(
968 &self,
969 channel_id: ChannelId,
970 tx: &DatabaseTransaction,
971 ) -> Result<channel::Model> {
972 Ok(channel::Entity::find_by_id(channel_id)
973 .one(&*tx)
974 .await?
975 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
976 }
977
978 pub(crate) async fn get_or_create_channel_room(
979 &self,
980 channel_id: ChannelId,
981 live_kit_room: &str,
982 environment: &str,
983 tx: &DatabaseTransaction,
984 ) -> Result<RoomId> {
985 let room = room::Entity::find()
986 .filter(room::Column::ChannelId.eq(channel_id))
987 .one(&*tx)
988 .await?;
989
990 let room_id = if let Some(room) = room {
991 if let Some(env) = room.environment {
992 if &env != environment {
993 Err(ErrorCode::WrongReleaseChannel
994 .with_tag("required", &env)
995 .anyhow())?;
996 }
997 }
998 room.id
999 } else {
1000 let result = room::Entity::insert(room::ActiveModel {
1001 channel_id: ActiveValue::Set(Some(channel_id)),
1002 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1003 environment: ActiveValue::Set(Some(environment.to_string())),
1004 ..Default::default()
1005 })
1006 .exec(&*tx)
1007 .await?;
1008
1009 result.last_insert_id
1010 };
1011
1012 Ok(room_id)
1013 }
1014
1015 /// Move a channel from one parent to another
1016 pub async fn move_channel(
1017 &self,
1018 channel_id: ChannelId,
1019 new_parent_id: ChannelId,
1020 admin_id: UserId,
1021 ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
1022 self.transaction(|tx| async move {
1023 let channel = self.get_channel_internal(channel_id, &*tx).await?;
1024 self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1025 .await?;
1026 let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1027
1028 if new_parent.root_id() != channel.root_id() {
1029 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
1030 }
1031
1032 if new_parent
1033 .ancestors_including_self()
1034 .any(|id| id == channel.id)
1035 {
1036 Err(anyhow!(ErrorCode::CircularNesting))?;
1037 }
1038
1039 if channel.visibility == ChannelVisibility::Public
1040 && new_parent.visibility != ChannelVisibility::Public
1041 {
1042 Err(anyhow!(ErrorCode::BadPublicNesting))?;
1043 }
1044
1045 let root_id = channel.root_id();
1046 let old_path = format!("{}{}/", channel.parent_path, channel.id);
1047 let new_path = format!("{}{}/", new_parent.path(), channel.id);
1048
1049 let mut model = channel.into_active_model();
1050 model.parent_path = ActiveValue::Set(new_parent.path());
1051 let channel = model.update(&*tx).await?;
1052
1053 let descendent_ids =
1054 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1055 self.pool.get_database_backend(),
1056 "
1057 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1058 WHERE parent_path LIKE $3 || '%'
1059 RETURNING id
1060 ",
1061 [old_path.clone().into(), new_path.into(), old_path.into()],
1062 ))
1063 .all(&*tx)
1064 .await?;
1065
1066 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1067
1068 let channels = channel::Entity::find()
1069 .filter(channel::Column::Id.is_in(all_moved_ids))
1070 .all(&*tx)
1071 .await?
1072 .into_iter()
1073 .map(|c| Channel::from_model(c))
1074 .collect::<Vec<_>>();
1075
1076 let channel_members = channel_member::Entity::find()
1077 .filter(channel_member::Column::ChannelId.eq(root_id))
1078 .all(&*tx)
1079 .await?;
1080
1081 Ok((channels, channel_members))
1082 })
1083 .await
1084 }
1085}
1086
1087#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1088enum QueryIds {
1089 Id,
1090}
1091
1092#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1093enum QueryUserIds {
1094 UserId,
1095}