1use super::*;
2use anyhow::Context as _;
3use rpc::{
4 ErrorCode, ErrorCodeExt,
5 proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
6};
7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
8
9impl Database {
10 #[cfg(test)]
11 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
12 self.transaction(move |tx| async move {
13 let mut channels = Vec::new();
14 let mut rows = channel::Entity::find().stream(&*tx).await?;
15 while let Some(row) = rows.next().await {
16 let row = row?;
17 channels.push((row.id, row.name));
18 }
19 Ok(channels)
20 })
21 .await
22 }
23
24 #[cfg(test)]
25 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
26 Ok(self.create_channel(name, None, creator_id).await?.0.id)
27 }
28
29 #[cfg(test)]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .0
40 .id)
41 }
42
43 /// Creates a new channel.
44 pub async fn create_channel(
45 &self,
46 name: &str,
47 parent_channel_id: Option<ChannelId>,
48 admin_id: UserId,
49 ) -> Result<(channel::Model, Option<channel_member::Model>)> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let parent_path = parent
63 .as_ref()
64 .map_or(String::new(), |parent| parent.path());
65
66 // Find the maximum channel_order among siblings to set the new channel at the end
67 let max_order = if parent_path.is_empty() {
68 0
69 } else {
70 max_order(&parent_path, &tx).await?
71 };
72
73 log::info!(
74 "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
75 name,
76 parent_path,
77 max_order,
78 max_order + 1
79 );
80
81 let channel = channel::ActiveModel {
82 id: ActiveValue::NotSet,
83 name: ActiveValue::Set(name.to_string()),
84 visibility: ActiveValue::Set(ChannelVisibility::Members),
85 parent_path: ActiveValue::Set(parent_path),
86 requires_zed_cla: ActiveValue::NotSet,
87 channel_order: ActiveValue::Set(max_order + 1),
88 }
89 .insert(&*tx)
90 .await?;
91
92 if parent.is_none() {
93 membership = Some(
94 channel_member::ActiveModel {
95 id: ActiveValue::NotSet,
96 channel_id: ActiveValue::Set(channel.id),
97 user_id: ActiveValue::Set(admin_id),
98 accepted: ActiveValue::Set(true),
99 role: ActiveValue::Set(ChannelRole::Admin),
100 }
101 .insert(&*tx)
102 .await?,
103 );
104 }
105
106 Ok((channel, membership))
107 })
108 .await
109 }
110
111 /// Adds a user to the specified channel.
112 pub async fn join_channel(
113 &self,
114 channel_id: ChannelId,
115 user_id: UserId,
116 connection: ConnectionId,
117 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
118 self.transaction(move |tx| async move {
119 let channel = self.get_channel_internal(channel_id, &tx).await?;
120 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
121
122 let mut accept_invite_result = None;
123
124 if role.is_none() {
125 if let Some(invitation) = self
126 .pending_invite_for_channel(&channel, user_id, &tx)
127 .await?
128 {
129 // note, this may be a parent channel
130 role = Some(invitation.role);
131 channel_member::Entity::update(channel_member::ActiveModel {
132 accepted: ActiveValue::Set(true),
133 ..invitation.into_active_model()
134 })
135 .exec(&*tx)
136 .await?;
137
138 accept_invite_result = Some(
139 self.calculate_membership_updated(&channel, user_id, &tx)
140 .await?,
141 );
142
143 debug_assert!(
144 self.channel_role_for_user(&channel, user_id, &tx).await? == role
145 );
146 } else if channel.visibility == ChannelVisibility::Public {
147 role = Some(ChannelRole::Guest);
148 channel_member::Entity::insert(channel_member::ActiveModel {
149 id: ActiveValue::NotSet,
150 channel_id: ActiveValue::Set(channel.root_id()),
151 user_id: ActiveValue::Set(user_id),
152 accepted: ActiveValue::Set(true),
153 role: ActiveValue::Set(ChannelRole::Guest),
154 })
155 .exec(&*tx)
156 .await?;
157
158 accept_invite_result = Some(
159 self.calculate_membership_updated(&channel, user_id, &tx)
160 .await?,
161 );
162
163 debug_assert!(
164 self.channel_role_for_user(&channel, user_id, &tx).await? == role
165 );
166 }
167 }
168
169 if role.is_none() || role == Some(ChannelRole::Banned) {
170 Err(ErrorCode::Forbidden.anyhow())?
171 }
172 let role = role.unwrap();
173
174 let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
175 let room_id = self
176 .get_or_create_channel_room(channel_id, &livekit_room, &tx)
177 .await?;
178
179 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
180 .await
181 .map(|jr| (jr, accept_invite_result, role))
182 })
183 .await
184 }
185
186 /// Sets the visibility of the given channel.
187 pub async fn set_channel_visibility(
188 &self,
189 channel_id: ChannelId,
190 visibility: ChannelVisibility,
191 admin_id: UserId,
192 ) -> Result<channel::Model> {
193 self.transaction(move |tx| async move {
194 let channel = self.get_channel_internal(channel_id, &tx).await?;
195 self.check_user_is_channel_admin(&channel, admin_id, &tx)
196 .await?;
197
198 if visibility == ChannelVisibility::Public {
199 if let Some(parent_id) = channel.parent_id() {
200 let parent = self.get_channel_internal(parent_id, &tx).await?;
201
202 if parent.visibility != ChannelVisibility::Public {
203 Err(ErrorCode::BadPublicNesting
204 .with_tag("direction", "parent")
205 .anyhow())?;
206 }
207 }
208 } else if visibility == ChannelVisibility::Members
209 && self
210 .get_channel_descendants_excluding_self([&channel], &tx)
211 .await?
212 .into_iter()
213 .any(|channel| channel.visibility == ChannelVisibility::Public)
214 {
215 Err(ErrorCode::BadPublicNesting
216 .with_tag("direction", "children")
217 .anyhow())?;
218 }
219
220 let mut model = channel.into_active_model();
221 model.visibility = ActiveValue::Set(visibility);
222 let channel = model.update(&*tx).await?;
223
224 Ok(channel)
225 })
226 .await
227 }
228
229 #[cfg(test)]
230 pub async fn set_channel_requires_zed_cla(
231 &self,
232 channel_id: ChannelId,
233 requires_zed_cla: bool,
234 ) -> Result<()> {
235 self.transaction(move |tx| async move {
236 let channel = self.get_channel_internal(channel_id, &tx).await?;
237 let mut model = channel.into_active_model();
238 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
239 model.update(&*tx).await?;
240 Ok(())
241 })
242 .await
243 }
244
245 /// Deletes the channel with the specified ID.
246 pub async fn delete_channel(
247 &self,
248 channel_id: ChannelId,
249 user_id: UserId,
250 ) -> Result<(ChannelId, Vec<ChannelId>)> {
251 self.transaction(move |tx| async move {
252 let channel = self.get_channel_internal(channel_id, &tx).await?;
253 self.check_user_is_channel_admin(&channel, user_id, &tx)
254 .await?;
255
256 let channels_to_remove = self
257 .get_channel_descendants_excluding_self([&channel], &tx)
258 .await?
259 .into_iter()
260 .map(|channel| channel.id)
261 .chain(Some(channel_id))
262 .collect::<Vec<_>>();
263
264 channel::Entity::delete_many()
265 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
266 .exec(&*tx)
267 .await?;
268
269 Ok((channel.root_id(), channels_to_remove))
270 })
271 .await
272 }
273
274 /// Invites a user to a channel as a member.
275 pub async fn invite_channel_member(
276 &self,
277 channel_id: ChannelId,
278 invitee_id: UserId,
279 inviter_id: UserId,
280 role: ChannelRole,
281 ) -> Result<InviteMemberResult> {
282 self.transaction(move |tx| async move {
283 let channel = self.get_channel_internal(channel_id, &tx).await?;
284 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
285 .await?;
286 if !channel.is_root() {
287 Err(ErrorCode::NotARootChannel.anyhow())?
288 }
289
290 channel_member::ActiveModel {
291 id: ActiveValue::NotSet,
292 channel_id: ActiveValue::Set(channel_id),
293 user_id: ActiveValue::Set(invitee_id),
294 accepted: ActiveValue::Set(false),
295 role: ActiveValue::Set(role),
296 }
297 .insert(&*tx)
298 .await?;
299
300 let channel = Channel::from_model(channel);
301
302 let notifications = self
303 .create_notification(
304 invitee_id,
305 rpc::Notification::ChannelInvitation {
306 channel_id: channel_id.to_proto(),
307 channel_name: channel.name.clone(),
308 inviter_id: inviter_id.to_proto(),
309 },
310 true,
311 &tx,
312 )
313 .await?
314 .into_iter()
315 .collect();
316
317 Ok(InviteMemberResult {
318 channel,
319 notifications,
320 })
321 })
322 .await
323 }
324
325 fn sanitize_channel_name(name: &str) -> Result<&str> {
326 let new_name = name.trim().trim_start_matches('#');
327 if new_name.is_empty() {
328 Err(anyhow!("channel name can't be blank"))?;
329 }
330 Ok(new_name)
331 }
332
333 /// Renames the specified channel.
334 pub async fn rename_channel(
335 &self,
336 channel_id: ChannelId,
337 admin_id: UserId,
338 new_name: &str,
339 ) -> Result<channel::Model> {
340 self.transaction(move |tx| async move {
341 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
342
343 let channel = self.get_channel_internal(channel_id, &tx).await?;
344 self.check_user_is_channel_admin(&channel, admin_id, &tx)
345 .await?;
346
347 let mut model = channel.into_active_model();
348 model.name = ActiveValue::Set(new_name.clone());
349 let channel = model.update(&*tx).await?;
350
351 Ok(channel)
352 })
353 .await
354 }
355
356 /// accept or decline an invite to join a channel
357 pub async fn respond_to_channel_invite(
358 &self,
359 channel_id: ChannelId,
360 user_id: UserId,
361 accept: bool,
362 ) -> Result<RespondToChannelInvite> {
363 self.transaction(move |tx| async move {
364 let channel = self.get_channel_internal(channel_id, &tx).await?;
365
366 let membership_update = if accept {
367 let rows_affected = channel_member::Entity::update_many()
368 .set(channel_member::ActiveModel {
369 accepted: ActiveValue::Set(accept),
370 ..Default::default()
371 })
372 .filter(
373 channel_member::Column::ChannelId
374 .eq(channel_id)
375 .and(channel_member::Column::UserId.eq(user_id))
376 .and(channel_member::Column::Accepted.eq(false)),
377 )
378 .exec(&*tx)
379 .await?
380 .rows_affected;
381
382 if rows_affected == 0 {
383 Err(anyhow!("no such invitation"))?;
384 }
385
386 Some(
387 self.calculate_membership_updated(&channel, user_id, &tx)
388 .await?,
389 )
390 } else {
391 let rows_affected = channel_member::Entity::delete_many()
392 .filter(
393 channel_member::Column::ChannelId
394 .eq(channel_id)
395 .and(channel_member::Column::UserId.eq(user_id))
396 .and(channel_member::Column::Accepted.eq(false)),
397 )
398 .exec(&*tx)
399 .await?
400 .rows_affected;
401 if rows_affected == 0 {
402 Err(anyhow!("no such invitation"))?;
403 }
404
405 None
406 };
407
408 Ok(RespondToChannelInvite {
409 membership_update,
410 notifications: self
411 .mark_notification_as_read_with_response(
412 user_id,
413 &rpc::Notification::ChannelInvitation {
414 channel_id: channel_id.to_proto(),
415 channel_name: Default::default(),
416 inviter_id: Default::default(),
417 },
418 accept,
419 &tx,
420 )
421 .await?
422 .into_iter()
423 .collect(),
424 })
425 })
426 .await
427 }
428
429 async fn calculate_membership_updated(
430 &self,
431 channel: &channel::Model,
432 user_id: UserId,
433 tx: &DatabaseTransaction,
434 ) -> Result<MembershipUpdated> {
435 let new_channels = self
436 .get_user_channels(user_id, Some(channel), false, tx)
437 .await?;
438 let removed_channels = self
439 .get_channel_descendants_excluding_self([channel], tx)
440 .await?
441 .into_iter()
442 .map(|channel| channel.id)
443 .chain([channel.id])
444 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
445 .collect::<Vec<_>>();
446
447 Ok(MembershipUpdated {
448 channel_id: channel.id,
449 new_channels,
450 removed_channels,
451 })
452 }
453
454 /// Removes a channel member.
455 pub async fn remove_channel_member(
456 &self,
457 channel_id: ChannelId,
458 member_id: UserId,
459 admin_id: UserId,
460 ) -> Result<RemoveChannelMemberResult> {
461 self.transaction(|tx| async move {
462 let channel = self.get_channel_internal(channel_id, &tx).await?;
463
464 if member_id != admin_id {
465 self.check_user_is_channel_admin(&channel, admin_id, &tx)
466 .await?;
467 }
468
469 let result = channel_member::Entity::delete_many()
470 .filter(
471 channel_member::Column::ChannelId
472 .eq(channel_id)
473 .and(channel_member::Column::UserId.eq(member_id)),
474 )
475 .exec(&*tx)
476 .await?;
477
478 if result.rows_affected == 0 {
479 Err(anyhow!("no such member"))?;
480 }
481
482 Ok(RemoveChannelMemberResult {
483 membership_update: self
484 .calculate_membership_updated(&channel, member_id, &tx)
485 .await?,
486 notification_id: self
487 .remove_notification(
488 member_id,
489 rpc::Notification::ChannelInvitation {
490 channel_id: channel_id.to_proto(),
491 channel_name: Default::default(),
492 inviter_id: Default::default(),
493 },
494 &tx,
495 )
496 .await?,
497 })
498 })
499 .await
500 }
501
502 /// Returns all channels for the user with the given ID.
503 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
504 self.weak_transaction(
505 |tx| async move { self.get_user_channels(user_id, None, true, &tx).await },
506 )
507 .await
508 }
509
510 /// Returns all channels for the user with the given ID that are descendants
511 /// of the specified ancestor channel.
512 pub async fn get_user_channels(
513 &self,
514 user_id: UserId,
515 ancestor_channel: Option<&channel::Model>,
516 include_invites: bool,
517 tx: &DatabaseTransaction,
518 ) -> Result<ChannelsForUser> {
519 let mut filter = channel_member::Column::UserId.eq(user_id);
520 if !include_invites {
521 filter = filter.and(channel_member::Column::Accepted.eq(true))
522 }
523 if let Some(ancestor) = ancestor_channel {
524 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
525 }
526
527 let mut channels = Vec::<channel::Model>::new();
528 let mut invited_channels = Vec::<Channel>::new();
529 let mut channel_memberships = Vec::<channel_member::Model>::new();
530 let mut rows = channel_member::Entity::find()
531 .filter(filter)
532 .inner_join(channel::Entity)
533 .select_also(channel::Entity)
534 .stream(tx)
535 .await?;
536 while let Some(row) = rows.next().await {
537 if let (membership, Some(channel)) = row? {
538 if membership.accepted {
539 channel_memberships.push(membership);
540 channels.push(channel);
541 } else {
542 invited_channels.push(Channel::from_model(channel));
543 }
544 }
545 }
546 drop(rows);
547
548 let mut descendants = self
549 .get_channel_descendants_excluding_self(channels.iter(), tx)
550 .await?;
551
552 descendants.extend(channels);
553
554 let roles_by_channel_id = channel_memberships
555 .iter()
556 .map(|membership| (membership.channel_id, membership.role))
557 .collect::<HashMap<_, _>>();
558
559 let channels: Vec<Channel> = descendants
560 .into_iter()
561 .filter_map(|channel| {
562 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
563 if parent_role.can_see_channel(channel.visibility) {
564 Some(Channel::from_model(channel))
565 } else {
566 None
567 }
568 })
569 .collect();
570
571 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
572 enum QueryUserIdsAndChannelIds {
573 ChannelId,
574 UserId,
575 }
576
577 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
578 {
579 let mut rows = room_participant::Entity::find()
580 .inner_join(room::Entity)
581 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
582 .select_only()
583 .column(room::Column::ChannelId)
584 .column(room_participant::Column::UserId)
585 .into_values::<_, QueryUserIdsAndChannelIds>()
586 .stream(tx)
587 .await?;
588 while let Some(row) = rows.next().await {
589 let row: (ChannelId, UserId) = row?;
590 channel_participants.entry(row.0).or_default().push(row.1)
591 }
592 }
593
594 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
595
596 let mut channel_ids_by_buffer_id = HashMap::default();
597 let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
598 let mut rows = buffer::Entity::find()
599 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
600 .stream(tx)
601 .await?;
602 while let Some(row) = rows.next().await {
603 let row = row?;
604 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
605 latest_buffer_versions.push(ChannelBufferVersion {
606 channel_id: row.channel_id.0 as u64,
607 epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
608 version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
609 .latest_operation_lamport_timestamp
610 .zip(row.latest_operation_replica_id)
611 {
612 vec![VectorClockEntry {
613 timestamp: latest_lamport_timestamp as u32,
614 replica_id: latest_replica_id as u32,
615 }]
616 } else {
617 vec![]
618 },
619 });
620 }
621 drop(rows);
622
623 let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
624
625 let observed_buffer_versions = self
626 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
627 .await?;
628
629 let observed_channel_messages = self
630 .observed_channel_messages(&channel_ids, user_id, tx)
631 .await?;
632
633 Ok(ChannelsForUser {
634 channel_memberships,
635 channels,
636 invited_channels,
637 channel_participants,
638 latest_buffer_versions,
639 latest_channel_messages,
640 observed_buffer_versions,
641 observed_channel_messages,
642 })
643 }
644
645 /// Sets the role for the specified channel member.
646 pub async fn set_channel_member_role(
647 &self,
648 channel_id: ChannelId,
649 admin_id: UserId,
650 for_user: UserId,
651 role: ChannelRole,
652 ) -> Result<SetMemberRoleResult> {
653 self.transaction(|tx| async move {
654 let channel = self.get_channel_internal(channel_id, &tx).await?;
655 self.check_user_is_channel_admin(&channel, admin_id, &tx)
656 .await?;
657
658 let membership = channel_member::Entity::find()
659 .filter(
660 channel_member::Column::ChannelId
661 .eq(channel_id)
662 .and(channel_member::Column::UserId.eq(for_user)),
663 )
664 .one(&*tx)
665 .await?
666 .context("no such member")?;
667
668 let mut update = membership.into_active_model();
669 update.role = ActiveValue::Set(role);
670 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
671
672 if updated.accepted {
673 Ok(SetMemberRoleResult::MembershipUpdated(
674 self.calculate_membership_updated(&channel, for_user, &tx)
675 .await?,
676 ))
677 } else {
678 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
679 channel,
680 )))
681 }
682 })
683 .await
684 }
685
686 /// Returns the details for the specified channel member.
687 pub async fn get_channel_participant_details(
688 &self,
689 channel_id: ChannelId,
690 filter: &str,
691 limit: u64,
692 user_id: UserId,
693 ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
694 let members = self
695 .transaction(move |tx| async move {
696 let channel = self.get_channel_internal(channel_id, &tx).await?;
697 self.check_user_is_channel_participant(&channel, user_id, &tx)
698 .await?;
699 let mut query = channel_member::Entity::find()
700 .find_also_related(user::Entity)
701 .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
702
703 if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
704 query = query.filter(Expr::cust_with_values(
705 "UPPER(github_login) LIKE ?",
706 [Self::fuzzy_like_string(&filter.to_uppercase())],
707 ))
708 } else {
709 query = query.filter(Expr::cust_with_values(
710 "github_login ILIKE $1",
711 [Self::fuzzy_like_string(filter)],
712 ))
713 }
714 let members = query.order_by(
715 Expr::cust(
716 "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
717 ),
718 sea_orm::Order::Asc,
719 )
720 .limit(limit)
721 .all(&*tx)
722 .await?;
723
724 Ok(members)
725 })
726 .await?;
727
728 let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
729
730 let members = members
731 .into_iter()
732 .map(|(member, user)| {
733 if let Some(user) = user {
734 users.push(proto::User {
735 id: user.id.to_proto(),
736 avatar_url: format!(
737 "https://avatars.githubusercontent.com/u/{}?s=128&v=4",
738 user.github_user_id
739 ),
740 github_login: user.github_login,
741 name: user.name,
742 })
743 }
744 proto::ChannelMember {
745 role: member.role.into(),
746 user_id: member.user_id.to_proto(),
747 kind: if member.accepted {
748 Kind::Member
749 } else {
750 Kind::Invitee
751 }
752 .into(),
753 }
754 })
755 .collect();
756
757 Ok((members, users))
758 }
759
760 /// Returns whether the given user is an admin in the specified channel.
761 pub async fn check_user_is_channel_admin(
762 &self,
763 channel: &channel::Model,
764 user_id: UserId,
765 tx: &DatabaseTransaction,
766 ) -> Result<ChannelRole> {
767 let role = self.channel_role_for_user(channel, user_id, tx).await?;
768 match role {
769 Some(ChannelRole::Admin) => Ok(role.unwrap()),
770 Some(ChannelRole::Member)
771 | Some(ChannelRole::Talker)
772 | Some(ChannelRole::Banned)
773 | Some(ChannelRole::Guest)
774 | None => Err(anyhow!(
775 "user is not a channel admin or channel does not exist"
776 ))?,
777 }
778 }
779
780 /// Returns whether the given user is a member of the specified channel.
781 pub async fn check_user_is_channel_member(
782 &self,
783 channel: &channel::Model,
784 user_id: UserId,
785 tx: &DatabaseTransaction,
786 ) -> Result<ChannelRole> {
787 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
788 match channel_role {
789 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
790 Some(ChannelRole::Banned)
791 | Some(ChannelRole::Guest)
792 | Some(ChannelRole::Talker)
793 | None => Err(anyhow!(
794 "user is not a channel member or channel does not exist"
795 ))?,
796 }
797 }
798
799 /// Returns whether the given user is a participant in the specified channel.
800 pub async fn check_user_is_channel_participant(
801 &self,
802 channel: &channel::Model,
803 user_id: UserId,
804 tx: &DatabaseTransaction,
805 ) -> Result<ChannelRole> {
806 let role = self.channel_role_for_user(channel, user_id, tx).await?;
807 match role {
808 Some(ChannelRole::Admin)
809 | Some(ChannelRole::Member)
810 | Some(ChannelRole::Guest)
811 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
812 Some(ChannelRole::Banned) | None => Err(anyhow!(
813 "user is not a channel participant or channel does not exist"
814 ))?,
815 }
816 }
817
818 /// Returns a user's pending invite for the given channel, if one exists.
819 pub async fn pending_invite_for_channel(
820 &self,
821 channel: &channel::Model,
822 user_id: UserId,
823 tx: &DatabaseTransaction,
824 ) -> Result<Option<channel_member::Model>> {
825 let row = channel_member::Entity::find()
826 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
827 .filter(channel_member::Column::UserId.eq(user_id))
828 .filter(channel_member::Column::Accepted.eq(false))
829 .one(tx)
830 .await?;
831
832 Ok(row)
833 }
834
835 /// Returns the role for a user in the given channel.
836 pub async fn channel_role_for_user(
837 &self,
838 channel: &channel::Model,
839 user_id: UserId,
840 tx: &DatabaseTransaction,
841 ) -> Result<Option<ChannelRole>> {
842 let membership = channel_member::Entity::find()
843 .filter(
844 channel_member::Column::ChannelId
845 .eq(channel.root_id())
846 .and(channel_member::Column::UserId.eq(user_id))
847 .and(channel_member::Column::Accepted.eq(true)),
848 )
849 .one(tx)
850 .await?;
851
852 let Some(membership) = membership else {
853 return Ok(None);
854 };
855
856 if !membership.role.can_see_channel(channel.visibility) {
857 return Ok(None);
858 }
859
860 Ok(Some(membership.role))
861 }
862
863 // Get the descendants of the given set if channels, ordered by their
864 // path.
865 pub(crate) async fn get_channel_descendants_excluding_self(
866 &self,
867 channels: impl IntoIterator<Item = &channel::Model>,
868 tx: &DatabaseTransaction,
869 ) -> Result<Vec<channel::Model>> {
870 let mut filter = Condition::any();
871 for channel in channels.into_iter() {
872 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
873 }
874
875 if filter.is_empty() {
876 return Ok(vec![]);
877 }
878
879 Ok(channel::Entity::find()
880 .filter(filter)
881 .order_by_asc(Expr::cust("parent_path || id || '/'"))
882 .all(tx)
883 .await?)
884 }
885
886 /// Returns the channel with the given ID.
887 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
888 self.transaction(|tx| async move {
889 let channel = self.get_channel_internal(channel_id, &tx).await?;
890 self.check_user_is_channel_participant(&channel, user_id, &tx)
891 .await?;
892
893 Ok(Channel::from_model(channel))
894 })
895 .await
896 }
897
898 pub(crate) async fn get_channel_internal(
899 &self,
900 channel_id: ChannelId,
901 tx: &DatabaseTransaction,
902 ) -> Result<channel::Model> {
903 Ok(channel::Entity::find_by_id(channel_id)
904 .one(tx)
905 .await?
906 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
907 }
908
909 pub(crate) async fn get_or_create_channel_room(
910 &self,
911 channel_id: ChannelId,
912 livekit_room: &str,
913 tx: &DatabaseTransaction,
914 ) -> Result<RoomId> {
915 let room = room::Entity::find()
916 .filter(room::Column::ChannelId.eq(channel_id))
917 .one(tx)
918 .await?;
919
920 let room_id = if let Some(room) = room {
921 room.id
922 } else {
923 let result = room::Entity::insert(room::ActiveModel {
924 channel_id: ActiveValue::Set(Some(channel_id)),
925 live_kit_room: ActiveValue::Set(livekit_room.to_string()),
926 ..Default::default()
927 })
928 .exec(tx)
929 .await?;
930
931 result.last_insert_id
932 };
933
934 Ok(room_id)
935 }
936
937 /// Move a channel from one parent to another
938 pub async fn move_channel(
939 &self,
940 channel_id: ChannelId,
941 new_parent_id: ChannelId,
942 admin_id: UserId,
943 ) -> Result<(ChannelId, Vec<Channel>)> {
944 self.transaction(|tx| async move {
945 let channel = self.get_channel_internal(channel_id, &tx).await?;
946 self.check_user_is_channel_admin(&channel, admin_id, &tx)
947 .await?;
948 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
949
950 if new_parent.root_id() != channel.root_id() {
951 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
952 }
953
954 if new_parent
955 .ancestors_including_self()
956 .any(|id| id == channel.id)
957 {
958 Err(anyhow!(ErrorCode::CircularNesting))?;
959 }
960
961 if channel.visibility == ChannelVisibility::Public
962 && new_parent.visibility != ChannelVisibility::Public
963 {
964 Err(anyhow!(ErrorCode::BadPublicNesting))?;
965 }
966
967 let root_id = channel.root_id();
968 let new_parent_path = new_parent.path();
969 let old_path = format!("{}{}/", channel.parent_path, channel.id);
970 let new_path = format!("{}{}/", &new_parent_path, channel.id);
971 let new_order = max_order(&new_parent_path, &tx).await? + 1;
972
973 let mut model = channel.into_active_model();
974 model.parent_path = ActiveValue::Set(new_parent.path());
975 model.channel_order = ActiveValue::Set(new_order);
976 let channel = model.update(&*tx).await?;
977
978 let descendent_ids =
979 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
980 self.pool.get_database_backend(),
981 "
982 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
983 WHERE parent_path LIKE $3 || '%'
984 RETURNING id
985 ",
986 [old_path.clone().into(), new_path.into(), old_path.into()],
987 ))
988 .all(&*tx)
989 .await?;
990
991 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
992
993 let channels = channel::Entity::find()
994 .filter(channel::Column::Id.is_in(all_moved_ids))
995 .all(&*tx)
996 .await?
997 .into_iter()
998 .map(Channel::from_model)
999 .collect::<Vec<_>>();
1000
1001 Ok((root_id, channels))
1002 })
1003 .await
1004 }
1005
1006 pub async fn reorder_channel(
1007 &self,
1008 channel_id: ChannelId,
1009 direction: proto::reorder_channel::Direction,
1010 user_id: UserId,
1011 ) -> Result<Vec<Channel>> {
1012 self.transaction(|tx| async move {
1013 let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1014
1015 if channel.is_root() {
1016 log::info!("Skipping reorder of root channel {}", channel.id,);
1017 return Ok(vec![]);
1018 }
1019
1020 log::info!(
1021 "Reordering channel {} (parent_path: '{}', order: {})",
1022 channel.id,
1023 channel.parent_path,
1024 channel.channel_order
1025 );
1026
1027 // Check if user is admin of the channel
1028 self.check_user_is_channel_admin(&channel, user_id, &tx)
1029 .await?;
1030
1031 // Find the sibling channel to swap with
1032 let sibling_channel = match direction {
1033 proto::reorder_channel::Direction::Up => {
1034 log::info!(
1035 "Looking for sibling with parent_path='{}' and order < {}",
1036 channel.parent_path,
1037 channel.channel_order
1038 );
1039 // Find channel with highest order less than current
1040 channel::Entity::find()
1041 .filter(
1042 channel::Column::ParentPath
1043 .eq(&channel.parent_path)
1044 .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1045 )
1046 .order_by_desc(channel::Column::ChannelOrder)
1047 .one(&*tx)
1048 .await?
1049 }
1050 proto::reorder_channel::Direction::Down => {
1051 log::info!(
1052 "Looking for sibling with parent_path='{}' and order > {}",
1053 channel.parent_path,
1054 channel.channel_order
1055 );
1056 // Find channel with lowest order greater than current
1057 channel::Entity::find()
1058 .filter(
1059 channel::Column::ParentPath
1060 .eq(&channel.parent_path)
1061 .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1062 )
1063 .order_by_asc(channel::Column::ChannelOrder)
1064 .one(&*tx)
1065 .await?
1066 }
1067 };
1068
1069 let mut sibling_channel = match sibling_channel {
1070 Some(sibling) => {
1071 log::info!(
1072 "Found sibling {} (parent_path: '{}', order: {})",
1073 sibling.id,
1074 sibling.parent_path,
1075 sibling.channel_order
1076 );
1077 sibling
1078 }
1079 None => {
1080 log::warn!("No sibling found to swap with");
1081 // No sibling to swap with
1082 return Ok(vec![]);
1083 }
1084 };
1085
1086 let current_order = channel.channel_order;
1087 let sibling_order = sibling_channel.channel_order;
1088
1089 channel::ActiveModel {
1090 id: ActiveValue::Unchanged(sibling_channel.id),
1091 channel_order: ActiveValue::Set(current_order),
1092 ..Default::default()
1093 }
1094 .update(&*tx)
1095 .await?;
1096 sibling_channel.channel_order = current_order;
1097
1098 channel::ActiveModel {
1099 id: ActiveValue::Unchanged(channel.id),
1100 channel_order: ActiveValue::Set(sibling_order),
1101 ..Default::default()
1102 }
1103 .update(&*tx)
1104 .await?;
1105 channel.channel_order = sibling_order;
1106
1107 log::info!(
1108 "Reorder complete. Swapped channels {} and {}",
1109 channel.id,
1110 sibling_channel.id
1111 );
1112
1113 let swapped_channels = vec![
1114 Channel::from_model(channel),
1115 Channel::from_model(sibling_channel),
1116 ];
1117
1118 Ok(swapped_channels)
1119 })
1120 .await
1121 }
1122}
1123
1124async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1125 let max_order = channel::Entity::find()
1126 .filter(channel::Column::ParentPath.eq(parent_path))
1127 .select_only()
1128 .column_as(channel::Column::ChannelOrder.max(), "max_order")
1129 .into_tuple::<Option<i32>>()
1130 .one(&**tx)
1131 .await?
1132 .flatten()
1133 .unwrap_or(0);
1134
1135 Ok(max_order)
1136}
1137
1138#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1139enum QueryIds {
1140 Id,
1141}
1142
1143#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1144enum QueryUserIds {
1145 UserId,
1146}