1use super::*;
2use anyhow::Context as _;
3use rpc::{
4 ErrorCode, ErrorCodeExt,
5 proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
6};
7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
8
9impl Database {
10 #[cfg(feature = "test-support")]
11 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
12 self.transaction(move |tx| async move {
13 let mut channels = Vec::new();
14 let mut rows = channel::Entity::find().stream(&*tx).await?;
15 while let Some(row) = rows.next().await {
16 let row = row?;
17 channels.push((row.id, row.name));
18 }
19 Ok(channels)
20 })
21 .await
22 }
23
24 #[cfg(feature = "test-support")]
25 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
26 Ok(self.create_channel(name, None, creator_id).await?.0.id)
27 }
28
29 #[cfg(feature = "test-support")]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .0
40 .id)
41 }
42
43 /// Creates a new channel.
44 pub async fn create_channel(
45 &self,
46 name: &str,
47 parent_channel_id: Option<ChannelId>,
48 admin_id: UserId,
49 ) -> Result<(channel::Model, Option<channel_member::Model>)> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let parent_path = parent
63 .as_ref()
64 .map_or(String::new(), |parent| parent.path());
65
66 // Find the maximum channel_order among siblings to set the new channel at the end
67 let max_order = if parent_path.is_empty() {
68 0
69 } else {
70 max_order(&parent_path, &tx).await?
71 };
72
73 log::info!(
74 "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
75 name,
76 parent_path,
77 max_order,
78 max_order + 1
79 );
80
81 let channel = channel::ActiveModel {
82 id: ActiveValue::NotSet,
83 name: ActiveValue::Set(name.to_string()),
84 visibility: ActiveValue::Set(ChannelVisibility::Members),
85 parent_path: ActiveValue::Set(parent_path),
86 requires_zed_cla: ActiveValue::NotSet,
87 channel_order: ActiveValue::Set(max_order + 1),
88 }
89 .insert(&*tx)
90 .await?;
91
92 if parent.is_none() {
93 membership = Some(
94 channel_member::ActiveModel {
95 id: ActiveValue::NotSet,
96 channel_id: ActiveValue::Set(channel.id),
97 user_id: ActiveValue::Set(admin_id),
98 accepted: ActiveValue::Set(true),
99 role: ActiveValue::Set(ChannelRole::Admin),
100 }
101 .insert(&*tx)
102 .await?,
103 );
104 }
105
106 Ok((channel, membership))
107 })
108 .await
109 }
110
111 /// Adds a user to the specified channel.
112 pub async fn join_channel(
113 &self,
114 channel_id: ChannelId,
115 user_id: UserId,
116 connection: ConnectionId,
117 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
118 self.transaction(move |tx| async move {
119 let channel = self.get_channel_internal(channel_id, &tx).await?;
120 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
121
122 let mut accept_invite_result = None;
123
124 if role.is_none() {
125 if let Some(invitation) = self
126 .pending_invite_for_channel(&channel, user_id, &tx)
127 .await?
128 {
129 // note, this may be a parent channel
130 role = Some(invitation.role);
131 channel_member::Entity::update(channel_member::ActiveModel {
132 accepted: ActiveValue::Set(true),
133 ..invitation.into_active_model()
134 })
135 .exec(&*tx)
136 .await?;
137
138 accept_invite_result = Some(
139 self.calculate_membership_updated(&channel, user_id, &tx)
140 .await?,
141 );
142
143 debug_assert!(
144 self.channel_role_for_user(&channel, user_id, &tx).await? == role
145 );
146 } else if channel.visibility == ChannelVisibility::Public {
147 role = Some(ChannelRole::Guest);
148 channel_member::Entity::insert(channel_member::ActiveModel {
149 id: ActiveValue::NotSet,
150 channel_id: ActiveValue::Set(channel.root_id()),
151 user_id: ActiveValue::Set(user_id),
152 accepted: ActiveValue::Set(true),
153 role: ActiveValue::Set(ChannelRole::Guest),
154 })
155 .exec(&*tx)
156 .await?;
157
158 accept_invite_result = Some(
159 self.calculate_membership_updated(&channel, user_id, &tx)
160 .await?,
161 );
162
163 debug_assert!(
164 self.channel_role_for_user(&channel, user_id, &tx).await? == role
165 );
166 }
167 }
168
169 if role.is_none() || role == Some(ChannelRole::Banned) {
170 Err(ErrorCode::Forbidden.anyhow())?
171 }
172 let role = role.unwrap();
173
174 let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
175 let room_id = self
176 .get_or_create_channel_room(channel_id, &livekit_room, &tx)
177 .await?;
178
179 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
180 .await
181 .map(|jr| (jr, accept_invite_result, role))
182 })
183 .await
184 }
185
186 /// Sets the visibility of the given channel.
187 pub async fn set_channel_visibility(
188 &self,
189 channel_id: ChannelId,
190 visibility: ChannelVisibility,
191 admin_id: UserId,
192 ) -> Result<channel::Model> {
193 self.transaction(move |tx| async move {
194 let channel = self.get_channel_internal(channel_id, &tx).await?;
195 self.check_user_is_channel_admin(&channel, admin_id, &tx)
196 .await?;
197
198 if visibility == ChannelVisibility::Public {
199 if let Some(parent_id) = channel.parent_id() {
200 let parent = self.get_channel_internal(parent_id, &tx).await?;
201
202 if parent.visibility != ChannelVisibility::Public {
203 Err(ErrorCode::BadPublicNesting
204 .with_tag("direction", "parent")
205 .anyhow())?;
206 }
207 }
208 } else if visibility == ChannelVisibility::Members
209 && self
210 .get_channel_descendants_excluding_self([&channel], &tx)
211 .await?
212 .into_iter()
213 .any(|channel| channel.visibility == ChannelVisibility::Public)
214 {
215 Err(ErrorCode::BadPublicNesting
216 .with_tag("direction", "children")
217 .anyhow())?;
218 }
219
220 let mut model = channel.into_active_model();
221 model.visibility = ActiveValue::Set(visibility);
222 let channel = model.update(&*tx).await?;
223
224 Ok(channel)
225 })
226 .await
227 }
228
229 #[cfg(feature = "test-support")]
230 pub async fn set_channel_requires_zed_cla(
231 &self,
232 channel_id: ChannelId,
233 requires_zed_cla: bool,
234 ) -> Result<()> {
235 self.transaction(move |tx| async move {
236 let channel = self.get_channel_internal(channel_id, &tx).await?;
237 let mut model = channel.into_active_model();
238 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
239 model.update(&*tx).await?;
240 Ok(())
241 })
242 .await
243 }
244
245 /// Deletes the channel with the specified ID.
246 pub async fn delete_channel(
247 &self,
248 channel_id: ChannelId,
249 user_id: UserId,
250 ) -> Result<(ChannelId, Vec<ChannelId>)> {
251 self.transaction(move |tx| async move {
252 let channel = self.get_channel_internal(channel_id, &tx).await?;
253 self.check_user_is_channel_admin(&channel, user_id, &tx)
254 .await?;
255
256 let channels_to_remove = self
257 .get_channel_descendants_excluding_self([&channel], &tx)
258 .await?
259 .into_iter()
260 .map(|channel| channel.id)
261 .chain(Some(channel_id))
262 .collect::<Vec<_>>();
263
264 let channel_has_active_participants = room_participant::Entity::find()
265 .inner_join(room::Entity)
266 .filter(room::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
267 .count(&*tx)
268 .await?
269 > 0;
270
271 if channel_has_active_participants {
272 Err(anyhow!("can't delete channel while a call is in progress"))?;
273 }
274
275 channel::Entity::delete_many()
276 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
277 .exec(&*tx)
278 .await?;
279
280 Ok((channel.root_id(), channels_to_remove))
281 })
282 .await
283 }
284
285 /// Invites a user to a channel as a member.
286 pub async fn invite_channel_member(
287 &self,
288 channel_id: ChannelId,
289 invitee_id: UserId,
290 inviter_id: UserId,
291 role: ChannelRole,
292 ) -> Result<InviteMemberResult> {
293 self.transaction(move |tx| async move {
294 let channel = self.get_channel_internal(channel_id, &tx).await?;
295 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
296 .await?;
297 if !channel.is_root() {
298 Err(ErrorCode::NotARootChannel.anyhow())?
299 }
300
301 channel_member::ActiveModel {
302 id: ActiveValue::NotSet,
303 channel_id: ActiveValue::Set(channel_id),
304 user_id: ActiveValue::Set(invitee_id),
305 accepted: ActiveValue::Set(false),
306 role: ActiveValue::Set(role),
307 }
308 .insert(&*tx)
309 .await?;
310
311 let channel = Channel::from_model(channel);
312
313 let notifications = self
314 .create_notification(
315 invitee_id,
316 rpc::Notification::ChannelInvitation {
317 channel_id: channel_id.to_proto(),
318 channel_name: channel.name.clone(),
319 inviter_id: inviter_id.to_proto(),
320 },
321 true,
322 &tx,
323 )
324 .await?
325 .into_iter()
326 .collect();
327
328 Ok(InviteMemberResult {
329 channel,
330 notifications,
331 })
332 })
333 .await
334 }
335
336 fn sanitize_channel_name(name: &str) -> Result<&str> {
337 let new_name = name.trim().trim_start_matches('#');
338 if new_name.is_empty() {
339 Err(anyhow!("channel name can't be blank"))?;
340 }
341 Ok(new_name)
342 }
343
344 /// Renames the specified channel.
345 pub async fn rename_channel(
346 &self,
347 channel_id: ChannelId,
348 admin_id: UserId,
349 new_name: &str,
350 ) -> Result<channel::Model> {
351 self.transaction(move |tx| async move {
352 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
353
354 let channel = self.get_channel_internal(channel_id, &tx).await?;
355 self.check_user_is_channel_admin(&channel, admin_id, &tx)
356 .await?;
357
358 let mut model = channel.into_active_model();
359 model.name = ActiveValue::Set(new_name.clone());
360 let channel = model.update(&*tx).await?;
361
362 Ok(channel)
363 })
364 .await
365 }
366
367 /// accept or decline an invite to join a channel
368 pub async fn respond_to_channel_invite(
369 &self,
370 channel_id: ChannelId,
371 user_id: UserId,
372 accept: bool,
373 ) -> Result<RespondToChannelInvite> {
374 self.transaction(move |tx| async move {
375 let channel = self.get_channel_internal(channel_id, &tx).await?;
376
377 let membership_update = if accept {
378 let rows_affected = channel_member::Entity::update_many()
379 .set(channel_member::ActiveModel {
380 accepted: ActiveValue::Set(accept),
381 ..Default::default()
382 })
383 .filter(
384 channel_member::Column::ChannelId
385 .eq(channel_id)
386 .and(channel_member::Column::UserId.eq(user_id))
387 .and(channel_member::Column::Accepted.eq(false)),
388 )
389 .exec(&*tx)
390 .await?
391 .rows_affected;
392
393 if rows_affected == 0 {
394 Err(anyhow!("no such invitation"))?;
395 }
396
397 Some(
398 self.calculate_membership_updated(&channel, user_id, &tx)
399 .await?,
400 )
401 } else {
402 let rows_affected = channel_member::Entity::delete_many()
403 .filter(
404 channel_member::Column::ChannelId
405 .eq(channel_id)
406 .and(channel_member::Column::UserId.eq(user_id))
407 .and(channel_member::Column::Accepted.eq(false)),
408 )
409 .exec(&*tx)
410 .await?
411 .rows_affected;
412 if rows_affected == 0 {
413 Err(anyhow!("no such invitation"))?;
414 }
415
416 None
417 };
418
419 Ok(RespondToChannelInvite {
420 membership_update,
421 notifications: self
422 .mark_notification_as_read_with_response(
423 user_id,
424 &rpc::Notification::ChannelInvitation {
425 channel_id: channel_id.to_proto(),
426 channel_name: Default::default(),
427 inviter_id: Default::default(),
428 },
429 accept,
430 &tx,
431 )
432 .await?
433 .into_iter()
434 .collect(),
435 })
436 })
437 .await
438 }
439
440 async fn calculate_membership_updated(
441 &self,
442 channel: &channel::Model,
443 user_id: UserId,
444 tx: &DatabaseTransaction,
445 ) -> Result<MembershipUpdated> {
446 let new_channels = self
447 .get_user_channels(user_id, Some(channel), false, tx)
448 .await?;
449 let removed_channels = self
450 .get_channel_descendants_excluding_self([channel], tx)
451 .await?
452 .into_iter()
453 .map(|channel| channel.id)
454 .chain([channel.id])
455 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
456 .collect::<Vec<_>>();
457
458 Ok(MembershipUpdated {
459 channel_id: channel.id,
460 new_channels,
461 removed_channels,
462 })
463 }
464
465 /// Removes a channel member.
466 pub async fn remove_channel_member(
467 &self,
468 channel_id: ChannelId,
469 member_id: UserId,
470 admin_id: UserId,
471 ) -> Result<RemoveChannelMemberResult> {
472 self.transaction(|tx| async move {
473 let channel = self.get_channel_internal(channel_id, &tx).await?;
474
475 if member_id != admin_id {
476 self.check_user_is_channel_admin(&channel, admin_id, &tx)
477 .await?;
478 }
479
480 let result = channel_member::Entity::delete_many()
481 .filter(
482 channel_member::Column::ChannelId
483 .eq(channel_id)
484 .and(channel_member::Column::UserId.eq(member_id)),
485 )
486 .exec(&*tx)
487 .await?;
488
489 if result.rows_affected == 0 {
490 Err(anyhow!("no such member"))?;
491 }
492
493 Ok(RemoveChannelMemberResult {
494 membership_update: self
495 .calculate_membership_updated(&channel, member_id, &tx)
496 .await?,
497 notification_id: self
498 .remove_notification(
499 member_id,
500 rpc::Notification::ChannelInvitation {
501 channel_id: channel_id.to_proto(),
502 channel_name: Default::default(),
503 inviter_id: Default::default(),
504 },
505 &tx,
506 )
507 .await?,
508 })
509 })
510 .await
511 }
512
513 /// Returns all channels for the user with the given ID.
514 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
515 self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
516 .await
517 }
518
519 /// Returns all channels for the user with the given ID that are descendants
520 /// of the specified ancestor channel.
521 pub async fn get_user_channels(
522 &self,
523 user_id: UserId,
524 ancestor_channel: Option<&channel::Model>,
525 include_invites: bool,
526 tx: &DatabaseTransaction,
527 ) -> Result<ChannelsForUser> {
528 let mut filter = channel_member::Column::UserId.eq(user_id);
529 if !include_invites {
530 filter = filter.and(channel_member::Column::Accepted.eq(true))
531 }
532 if let Some(ancestor) = ancestor_channel {
533 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
534 }
535
536 let mut channels = Vec::<channel::Model>::new();
537 let mut invited_channels = Vec::<Channel>::new();
538 let mut channel_memberships = Vec::<channel_member::Model>::new();
539 let mut rows = channel_member::Entity::find()
540 .filter(filter)
541 .inner_join(channel::Entity)
542 .select_also(channel::Entity)
543 .stream(tx)
544 .await?;
545 while let Some(row) = rows.next().await {
546 if let (membership, Some(channel)) = row? {
547 if membership.accepted {
548 channel_memberships.push(membership);
549 channels.push(channel);
550 } else {
551 invited_channels.push(Channel::from_model(channel));
552 }
553 }
554 }
555 drop(rows);
556
557 let mut descendants = self
558 .get_channel_descendants_excluding_self(channels.iter(), tx)
559 .await?;
560
561 descendants.extend(channels);
562
563 let roles_by_channel_id = channel_memberships
564 .iter()
565 .map(|membership| (membership.channel_id, membership.role))
566 .collect::<HashMap<_, _>>();
567
568 let channels: Vec<Channel> = descendants
569 .into_iter()
570 .filter_map(|channel| {
571 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
572 if parent_role.can_see_channel(channel.visibility) {
573 Some(Channel::from_model(channel))
574 } else {
575 None
576 }
577 })
578 .collect();
579
580 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
581 enum QueryUserIdsAndChannelIds {
582 ChannelId,
583 UserId,
584 }
585
586 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
587 {
588 let mut rows = room_participant::Entity::find()
589 .inner_join(room::Entity)
590 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
591 .select_only()
592 .column(room::Column::ChannelId)
593 .column(room_participant::Column::UserId)
594 .into_values::<_, QueryUserIdsAndChannelIds>()
595 .stream(tx)
596 .await?;
597 while let Some(row) = rows.next().await {
598 let row: (ChannelId, UserId) = row?;
599 channel_participants.entry(row.0).or_default().push(row.1)
600 }
601 }
602
603 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
604
605 let mut channel_ids_by_buffer_id = HashMap::default();
606 let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
607 let mut rows = buffer::Entity::find()
608 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
609 .stream(tx)
610 .await?;
611 while let Some(row) = rows.next().await {
612 let row = row?;
613 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
614 latest_buffer_versions.push(ChannelBufferVersion {
615 channel_id: row.channel_id.0 as u64,
616 epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
617 version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
618 .latest_operation_lamport_timestamp
619 .zip(row.latest_operation_replica_id)
620 {
621 vec![VectorClockEntry {
622 timestamp: latest_lamport_timestamp as u32,
623 replica_id: latest_replica_id as u32,
624 }]
625 } else {
626 vec![]
627 },
628 });
629 }
630 drop(rows);
631
632 let observed_buffer_versions = self
633 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
634 .await?;
635
636 Ok(ChannelsForUser {
637 channel_memberships,
638 channels,
639 invited_channels,
640 channel_participants,
641 latest_buffer_versions,
642 observed_buffer_versions,
643 })
644 }
645
646 /// Sets the role for the specified channel member.
647 pub async fn set_channel_member_role(
648 &self,
649 channel_id: ChannelId,
650 admin_id: UserId,
651 for_user: UserId,
652 role: ChannelRole,
653 ) -> Result<SetMemberRoleResult> {
654 self.transaction(|tx| async move {
655 let channel = self.get_channel_internal(channel_id, &tx).await?;
656 self.check_user_is_channel_admin(&channel, admin_id, &tx)
657 .await?;
658
659 let membership = channel_member::Entity::find()
660 .filter(
661 channel_member::Column::ChannelId
662 .eq(channel_id)
663 .and(channel_member::Column::UserId.eq(for_user)),
664 )
665 .one(&*tx)
666 .await?
667 .context("no such member")?;
668
669 let mut update = membership.into_active_model();
670 update.role = ActiveValue::Set(role);
671 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
672
673 if updated.accepted {
674 Ok(SetMemberRoleResult::MembershipUpdated(
675 self.calculate_membership_updated(&channel, for_user, &tx)
676 .await?,
677 ))
678 } else {
679 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
680 channel,
681 )))
682 }
683 })
684 .await
685 }
686
687 /// Returns the details for the specified channel member.
688 pub async fn get_channel_participant_details(
689 &self,
690 channel_id: ChannelId,
691 filter: &str,
692 limit: u64,
693 user_id: UserId,
694 ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
695 let members = self
696 .transaction(move |tx| async move {
697 let channel = self.get_channel_internal(channel_id, &tx).await?;
698 self.check_user_is_channel_participant(&channel, user_id, &tx)
699 .await?;
700 let mut query = channel_member::Entity::find()
701 .find_also_related(user::Entity)
702 .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
703
704 if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
705 query = query.filter(Expr::cust_with_values(
706 "UPPER(github_login) LIKE ?",
707 [Self::fuzzy_like_string(&filter.to_uppercase())],
708 ))
709 } else {
710 query = query.filter(Expr::cust_with_values(
711 "github_login ILIKE $1",
712 [Self::fuzzy_like_string(filter)],
713 ))
714 }
715 let members = query.order_by(
716 Expr::cust(
717 "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
718 ),
719 sea_orm::Order::Asc,
720 )
721 .limit(limit)
722 .all(&*tx)
723 .await?;
724
725 Ok(members)
726 })
727 .await?;
728
729 let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
730
731 let members = members
732 .into_iter()
733 .map(|(member, user)| {
734 if let Some(user) = user {
735 users.push(proto::User {
736 id: user.id.to_proto(),
737 avatar_url: format!(
738 "https://avatars.githubusercontent.com/u/{}?s=128&v=4",
739 user.github_user_id
740 ),
741 github_login: user.github_login,
742 name: user.name,
743 })
744 }
745 proto::ChannelMember {
746 role: member.role.into(),
747 user_id: member.user_id.to_proto(),
748 kind: if member.accepted {
749 Kind::Member
750 } else {
751 Kind::Invitee
752 }
753 .into(),
754 }
755 })
756 .collect();
757
758 Ok((members, users))
759 }
760
761 /// Returns whether the given user is an admin in the specified channel.
762 pub async fn check_user_is_channel_admin(
763 &self,
764 channel: &channel::Model,
765 user_id: UserId,
766 tx: &DatabaseTransaction,
767 ) -> Result<ChannelRole> {
768 let role = self.channel_role_for_user(channel, user_id, tx).await?;
769 match role {
770 Some(ChannelRole::Admin) => Ok(role.unwrap()),
771 Some(ChannelRole::Member)
772 | Some(ChannelRole::Talker)
773 | Some(ChannelRole::Banned)
774 | Some(ChannelRole::Guest)
775 | None => Err(anyhow!(
776 "user is not a channel admin or channel does not exist"
777 ))?,
778 }
779 }
780
781 /// Returns whether the given user is a member of the specified channel.
782 pub async fn check_user_is_channel_member(
783 &self,
784 channel: &channel::Model,
785 user_id: UserId,
786 tx: &DatabaseTransaction,
787 ) -> Result<ChannelRole> {
788 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
789 match channel_role {
790 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
791 Some(ChannelRole::Banned)
792 | Some(ChannelRole::Guest)
793 | Some(ChannelRole::Talker)
794 | None => Err(anyhow!(
795 "user is not a channel member or channel does not exist"
796 ))?,
797 }
798 }
799
800 /// Returns whether the given user is a participant in the specified channel.
801 pub async fn check_user_is_channel_participant(
802 &self,
803 channel: &channel::Model,
804 user_id: UserId,
805 tx: &DatabaseTransaction,
806 ) -> Result<ChannelRole> {
807 let role = self.channel_role_for_user(channel, user_id, tx).await?;
808 match role {
809 Some(ChannelRole::Admin)
810 | Some(ChannelRole::Member)
811 | Some(ChannelRole::Guest)
812 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
813 Some(ChannelRole::Banned) | None => Err(anyhow!(
814 "user is not a channel participant or channel does not exist"
815 ))?,
816 }
817 }
818
819 /// Returns a user's pending invite for the given channel, if one exists.
820 pub async fn pending_invite_for_channel(
821 &self,
822 channel: &channel::Model,
823 user_id: UserId,
824 tx: &DatabaseTransaction,
825 ) -> Result<Option<channel_member::Model>> {
826 let row = channel_member::Entity::find()
827 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
828 .filter(channel_member::Column::UserId.eq(user_id))
829 .filter(channel_member::Column::Accepted.eq(false))
830 .one(tx)
831 .await?;
832
833 Ok(row)
834 }
835
836 /// Returns the role for a user in the given channel.
837 pub async fn channel_role_for_user(
838 &self,
839 channel: &channel::Model,
840 user_id: UserId,
841 tx: &DatabaseTransaction,
842 ) -> Result<Option<ChannelRole>> {
843 let membership = channel_member::Entity::find()
844 .filter(
845 channel_member::Column::ChannelId
846 .eq(channel.root_id())
847 .and(channel_member::Column::UserId.eq(user_id))
848 .and(channel_member::Column::Accepted.eq(true)),
849 )
850 .one(tx)
851 .await?;
852
853 let Some(membership) = membership else {
854 return Ok(None);
855 };
856
857 if !membership.role.can_see_channel(channel.visibility) {
858 return Ok(None);
859 }
860
861 Ok(Some(membership.role))
862 }
863
864 // Get the descendants of the given set if channels, ordered by their
865 // path.
866 pub(crate) async fn get_channel_descendants_excluding_self(
867 &self,
868 channels: impl IntoIterator<Item = &channel::Model>,
869 tx: &DatabaseTransaction,
870 ) -> Result<Vec<channel::Model>> {
871 let mut filter = Condition::any();
872 for channel in channels.into_iter() {
873 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
874 }
875
876 if filter.is_empty() {
877 return Ok(vec![]);
878 }
879
880 Ok(channel::Entity::find()
881 .filter(filter)
882 .order_by_asc(Expr::cust("parent_path || id || '/'"))
883 .all(tx)
884 .await?)
885 }
886
887 /// Returns the channel with the given ID.
888 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
889 self.transaction(|tx| async move {
890 let channel = self.get_channel_internal(channel_id, &tx).await?;
891 self.check_user_is_channel_participant(&channel, user_id, &tx)
892 .await?;
893
894 Ok(Channel::from_model(channel))
895 })
896 .await
897 }
898
899 pub async fn get_channel_internal(
900 &self,
901 channel_id: ChannelId,
902 tx: &DatabaseTransaction,
903 ) -> Result<channel::Model> {
904 Ok(channel::Entity::find_by_id(channel_id)
905 .one(tx)
906 .await?
907 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
908 }
909
910 pub(crate) async fn get_or_create_channel_room(
911 &self,
912 channel_id: ChannelId,
913 livekit_room: &str,
914 tx: &DatabaseTransaction,
915 ) -> Result<RoomId> {
916 let room = room::Entity::find()
917 .filter(room::Column::ChannelId.eq(channel_id))
918 .one(tx)
919 .await?;
920
921 let room_id = if let Some(room) = room {
922 room.id
923 } else {
924 let result = room::Entity::insert(room::ActiveModel {
925 channel_id: ActiveValue::Set(Some(channel_id)),
926 live_kit_room: ActiveValue::Set(livekit_room.to_string()),
927 ..Default::default()
928 })
929 .exec(tx)
930 .await?;
931
932 result.last_insert_id
933 };
934
935 Ok(room_id)
936 }
937
938 /// Move a channel from one parent to another
939 pub async fn move_channel(
940 &self,
941 channel_id: ChannelId,
942 new_parent_id: ChannelId,
943 admin_id: UserId,
944 ) -> Result<(ChannelId, Vec<Channel>)> {
945 self.transaction(|tx| async move {
946 let channel = self.get_channel_internal(channel_id, &tx).await?;
947 self.check_user_is_channel_admin(&channel, admin_id, &tx)
948 .await?;
949 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
950
951 if new_parent.root_id() != channel.root_id() {
952 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
953 }
954
955 if new_parent
956 .ancestors_including_self()
957 .any(|id| id == channel.id)
958 {
959 Err(anyhow!(ErrorCode::CircularNesting))?;
960 }
961
962 if channel.visibility == ChannelVisibility::Public
963 && new_parent.visibility != ChannelVisibility::Public
964 {
965 Err(anyhow!(ErrorCode::BadPublicNesting))?;
966 }
967
968 let root_id = channel.root_id();
969 let new_parent_path = new_parent.path();
970 let old_path = format!("{}{}/", channel.parent_path, channel.id);
971 let new_path = format!("{}{}/", &new_parent_path, channel.id);
972 let new_order = max_order(&new_parent_path, &tx).await? + 1;
973
974 let mut model = channel.into_active_model();
975 model.parent_path = ActiveValue::Set(new_parent.path());
976 model.channel_order = ActiveValue::Set(new_order);
977 let channel = model.update(&*tx).await?;
978
979 let descendent_ids =
980 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
981 self.pool.get_database_backend(),
982 "
983 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
984 WHERE parent_path LIKE $3 || '%'
985 RETURNING id
986 ",
987 [old_path.clone().into(), new_path.into(), old_path.into()],
988 ))
989 .all(&*tx)
990 .await?;
991
992 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
993
994 let channels = channel::Entity::find()
995 .filter(channel::Column::Id.is_in(all_moved_ids))
996 .all(&*tx)
997 .await?
998 .into_iter()
999 .map(Channel::from_model)
1000 .collect::<Vec<_>>();
1001
1002 Ok((root_id, channels))
1003 })
1004 .await
1005 }
1006
1007 pub async fn reorder_channel(
1008 &self,
1009 channel_id: ChannelId,
1010 direction: proto::reorder_channel::Direction,
1011 user_id: UserId,
1012 ) -> Result<Vec<Channel>> {
1013 self.transaction(|tx| async move {
1014 let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1015
1016 if channel.is_root() {
1017 log::info!("Skipping reorder of root channel {}", channel.id,);
1018 return Ok(vec![]);
1019 }
1020
1021 log::info!(
1022 "Reordering channel {} (parent_path: '{}', order: {})",
1023 channel.id,
1024 channel.parent_path,
1025 channel.channel_order
1026 );
1027
1028 // Check if user is admin of the channel
1029 self.check_user_is_channel_admin(&channel, user_id, &tx)
1030 .await?;
1031
1032 // Find the sibling channel to swap with
1033 let sibling_channel = match direction {
1034 proto::reorder_channel::Direction::Up => {
1035 log::info!(
1036 "Looking for sibling with parent_path='{}' and order < {}",
1037 channel.parent_path,
1038 channel.channel_order
1039 );
1040 // Find channel with highest order less than current
1041 channel::Entity::find()
1042 .filter(
1043 channel::Column::ParentPath
1044 .eq(&channel.parent_path)
1045 .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1046 )
1047 .order_by_desc(channel::Column::ChannelOrder)
1048 .one(&*tx)
1049 .await?
1050 }
1051 proto::reorder_channel::Direction::Down => {
1052 log::info!(
1053 "Looking for sibling with parent_path='{}' and order > {}",
1054 channel.parent_path,
1055 channel.channel_order
1056 );
1057 // Find channel with lowest order greater than current
1058 channel::Entity::find()
1059 .filter(
1060 channel::Column::ParentPath
1061 .eq(&channel.parent_path)
1062 .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1063 )
1064 .order_by_asc(channel::Column::ChannelOrder)
1065 .one(&*tx)
1066 .await?
1067 }
1068 };
1069
1070 let mut sibling_channel = match sibling_channel {
1071 Some(sibling) => {
1072 log::info!(
1073 "Found sibling {} (parent_path: '{}', order: {})",
1074 sibling.id,
1075 sibling.parent_path,
1076 sibling.channel_order
1077 );
1078 sibling
1079 }
1080 None => {
1081 log::warn!("No sibling found to swap with");
1082 // No sibling to swap with
1083 return Ok(vec![]);
1084 }
1085 };
1086
1087 let current_order = channel.channel_order;
1088 let sibling_order = sibling_channel.channel_order;
1089
1090 channel::ActiveModel {
1091 id: ActiveValue::Unchanged(sibling_channel.id),
1092 channel_order: ActiveValue::Set(current_order),
1093 ..Default::default()
1094 }
1095 .update(&*tx)
1096 .await?;
1097 sibling_channel.channel_order = current_order;
1098
1099 channel::ActiveModel {
1100 id: ActiveValue::Unchanged(channel.id),
1101 channel_order: ActiveValue::Set(sibling_order),
1102 ..Default::default()
1103 }
1104 .update(&*tx)
1105 .await?;
1106 channel.channel_order = sibling_order;
1107
1108 log::info!(
1109 "Reorder complete. Swapped channels {} and {}",
1110 channel.id,
1111 sibling_channel.id
1112 );
1113
1114 let swapped_channels = vec![
1115 Channel::from_model(channel),
1116 Channel::from_model(sibling_channel),
1117 ];
1118
1119 Ok(swapped_channels)
1120 })
1121 .await
1122 }
1123}
1124
1125async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1126 let max_order = channel::Entity::find()
1127 .filter(channel::Column::ParentPath.eq(parent_path))
1128 .select_only()
1129 .column_as(channel::Column::ChannelOrder.max(), "max_order")
1130 .into_tuple::<Option<i32>>()
1131 .one(&**tx)
1132 .await?
1133 .flatten()
1134 .unwrap_or(0);
1135
1136 Ok(max_order)
1137}
1138
1139#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1140enum QueryIds {
1141 Id,
1142}