1use super::*;
2use anyhow::Context as _;
3use rpc::{
4 ErrorCode, ErrorCodeExt,
5 proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
6};
7use sea_orm::{DbBackend, TryGetableMany};
8
9impl Database {
10 #[cfg(test)]
11 pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
12 self.transaction(move |tx| async move {
13 let mut channels = Vec::new();
14 let mut rows = channel::Entity::find().stream(&*tx).await?;
15 while let Some(row) = rows.next().await {
16 let row = row?;
17 channels.push((row.id, row.name));
18 }
19 Ok(channels)
20 })
21 .await
22 }
23
24 #[cfg(test)]
25 pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
26 Ok(self.create_channel(name, None, creator_id).await?.0.id)
27 }
28
29 #[cfg(test)]
30 pub async fn create_sub_channel(
31 &self,
32 name: &str,
33 parent: ChannelId,
34 creator_id: UserId,
35 ) -> Result<ChannelId> {
36 Ok(self
37 .create_channel(name, Some(parent), creator_id)
38 .await?
39 .0
40 .id)
41 }
42
43 /// Creates a new channel.
44 pub async fn create_channel(
45 &self,
46 name: &str,
47 parent_channel_id: Option<ChannelId>,
48 admin_id: UserId,
49 ) -> Result<(channel::Model, Option<channel_member::Model>)> {
50 let name = Self::sanitize_channel_name(name)?;
51 self.transaction(move |tx| async move {
52 let mut parent = None;
53 let mut membership = None;
54
55 if let Some(parent_channel_id) = parent_channel_id {
56 let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
57 self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
58 .await?;
59 parent = Some(parent_channel);
60 }
61
62 let channel = channel::ActiveModel {
63 id: ActiveValue::NotSet,
64 name: ActiveValue::Set(name.to_string()),
65 visibility: ActiveValue::Set(ChannelVisibility::Members),
66 parent_path: ActiveValue::Set(
67 parent
68 .as_ref()
69 .map_or(String::new(), |parent| parent.path()),
70 ),
71 requires_zed_cla: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if parent.is_none() {
77 membership = Some(
78 channel_member::ActiveModel {
79 id: ActiveValue::NotSet,
80 channel_id: ActiveValue::Set(channel.id),
81 user_id: ActiveValue::Set(admin_id),
82 accepted: ActiveValue::Set(true),
83 role: ActiveValue::Set(ChannelRole::Admin),
84 }
85 .insert(&*tx)
86 .await?,
87 );
88 }
89
90 Ok((channel, membership))
91 })
92 .await
93 }
94
95 /// Adds a user to the specified channel.
96 pub async fn join_channel(
97 &self,
98 channel_id: ChannelId,
99 user_id: UserId,
100 connection: ConnectionId,
101 ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
102 self.transaction(move |tx| async move {
103 let channel = self.get_channel_internal(channel_id, &tx).await?;
104 let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
105
106 let mut accept_invite_result = None;
107
108 if role.is_none() {
109 if let Some(invitation) = self
110 .pending_invite_for_channel(&channel, user_id, &tx)
111 .await?
112 {
113 // note, this may be a parent channel
114 role = Some(invitation.role);
115 channel_member::Entity::update(channel_member::ActiveModel {
116 accepted: ActiveValue::Set(true),
117 ..invitation.into_active_model()
118 })
119 .exec(&*tx)
120 .await?;
121
122 accept_invite_result = Some(
123 self.calculate_membership_updated(&channel, user_id, &tx)
124 .await?,
125 );
126
127 debug_assert!(
128 self.channel_role_for_user(&channel, user_id, &tx).await? == role
129 );
130 } else if channel.visibility == ChannelVisibility::Public {
131 role = Some(ChannelRole::Guest);
132 channel_member::Entity::insert(channel_member::ActiveModel {
133 id: ActiveValue::NotSet,
134 channel_id: ActiveValue::Set(channel.root_id()),
135 user_id: ActiveValue::Set(user_id),
136 accepted: ActiveValue::Set(true),
137 role: ActiveValue::Set(ChannelRole::Guest),
138 })
139 .exec(&*tx)
140 .await?;
141
142 accept_invite_result = Some(
143 self.calculate_membership_updated(&channel, user_id, &tx)
144 .await?,
145 );
146
147 debug_assert!(
148 self.channel_role_for_user(&channel, user_id, &tx).await? == role
149 );
150 }
151 }
152
153 if role.is_none() || role == Some(ChannelRole::Banned) {
154 Err(ErrorCode::Forbidden.anyhow())?
155 }
156 let role = role.unwrap();
157
158 let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
159 let room_id = self
160 .get_or_create_channel_room(channel_id, &livekit_room, &tx)
161 .await?;
162
163 self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
164 .await
165 .map(|jr| (jr, accept_invite_result, role))
166 })
167 .await
168 }
169
170 /// Sets the visibility of the given channel.
171 pub async fn set_channel_visibility(
172 &self,
173 channel_id: ChannelId,
174 visibility: ChannelVisibility,
175 admin_id: UserId,
176 ) -> Result<channel::Model> {
177 self.transaction(move |tx| async move {
178 let channel = self.get_channel_internal(channel_id, &tx).await?;
179 self.check_user_is_channel_admin(&channel, admin_id, &tx)
180 .await?;
181
182 if visibility == ChannelVisibility::Public {
183 if let Some(parent_id) = channel.parent_id() {
184 let parent = self.get_channel_internal(parent_id, &tx).await?;
185
186 if parent.visibility != ChannelVisibility::Public {
187 Err(ErrorCode::BadPublicNesting
188 .with_tag("direction", "parent")
189 .anyhow())?;
190 }
191 }
192 } else if visibility == ChannelVisibility::Members
193 && self
194 .get_channel_descendants_excluding_self([&channel], &tx)
195 .await?
196 .into_iter()
197 .any(|channel| channel.visibility == ChannelVisibility::Public)
198 {
199 Err(ErrorCode::BadPublicNesting
200 .with_tag("direction", "children")
201 .anyhow())?;
202 }
203
204 let mut model = channel.into_active_model();
205 model.visibility = ActiveValue::Set(visibility);
206 let channel = model.update(&*tx).await?;
207
208 Ok(channel)
209 })
210 .await
211 }
212
213 #[cfg(test)]
214 pub async fn set_channel_requires_zed_cla(
215 &self,
216 channel_id: ChannelId,
217 requires_zed_cla: bool,
218 ) -> Result<()> {
219 self.transaction(move |tx| async move {
220 let channel = self.get_channel_internal(channel_id, &tx).await?;
221 let mut model = channel.into_active_model();
222 model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
223 model.update(&*tx).await?;
224 Ok(())
225 })
226 .await
227 }
228
229 /// Deletes the channel with the specified ID.
230 pub async fn delete_channel(
231 &self,
232 channel_id: ChannelId,
233 user_id: UserId,
234 ) -> Result<(ChannelId, Vec<ChannelId>)> {
235 self.transaction(move |tx| async move {
236 let channel = self.get_channel_internal(channel_id, &tx).await?;
237 self.check_user_is_channel_admin(&channel, user_id, &tx)
238 .await?;
239
240 let channels_to_remove = self
241 .get_channel_descendants_excluding_self([&channel], &tx)
242 .await?
243 .into_iter()
244 .map(|channel| channel.id)
245 .chain(Some(channel_id))
246 .collect::<Vec<_>>();
247
248 channel::Entity::delete_many()
249 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
250 .exec(&*tx)
251 .await?;
252
253 Ok((channel.root_id(), channels_to_remove))
254 })
255 .await
256 }
257
258 /// Invites a user to a channel as a member.
259 pub async fn invite_channel_member(
260 &self,
261 channel_id: ChannelId,
262 invitee_id: UserId,
263 inviter_id: UserId,
264 role: ChannelRole,
265 ) -> Result<InviteMemberResult> {
266 self.transaction(move |tx| async move {
267 let channel = self.get_channel_internal(channel_id, &tx).await?;
268 self.check_user_is_channel_admin(&channel, inviter_id, &tx)
269 .await?;
270 if !channel.is_root() {
271 Err(ErrorCode::NotARootChannel.anyhow())?
272 }
273
274 channel_member::ActiveModel {
275 id: ActiveValue::NotSet,
276 channel_id: ActiveValue::Set(channel_id),
277 user_id: ActiveValue::Set(invitee_id),
278 accepted: ActiveValue::Set(false),
279 role: ActiveValue::Set(role),
280 }
281 .insert(&*tx)
282 .await?;
283
284 let channel = Channel::from_model(channel);
285
286 let notifications = self
287 .create_notification(
288 invitee_id,
289 rpc::Notification::ChannelInvitation {
290 channel_id: channel_id.to_proto(),
291 channel_name: channel.name.clone(),
292 inviter_id: inviter_id.to_proto(),
293 },
294 true,
295 &tx,
296 )
297 .await?
298 .into_iter()
299 .collect();
300
301 Ok(InviteMemberResult {
302 channel,
303 notifications,
304 })
305 })
306 .await
307 }
308
309 fn sanitize_channel_name(name: &str) -> Result<&str> {
310 let new_name = name.trim().trim_start_matches('#');
311 if new_name.is_empty() {
312 Err(anyhow!("channel name can't be blank"))?;
313 }
314 Ok(new_name)
315 }
316
317 /// Renames the specified channel.
318 pub async fn rename_channel(
319 &self,
320 channel_id: ChannelId,
321 admin_id: UserId,
322 new_name: &str,
323 ) -> Result<channel::Model> {
324 self.transaction(move |tx| async move {
325 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
326
327 let channel = self.get_channel_internal(channel_id, &tx).await?;
328 self.check_user_is_channel_admin(&channel, admin_id, &tx)
329 .await?;
330
331 let mut model = channel.into_active_model();
332 model.name = ActiveValue::Set(new_name.clone());
333 let channel = model.update(&*tx).await?;
334
335 Ok(channel)
336 })
337 .await
338 }
339
340 /// accept or decline an invite to join a channel
341 pub async fn respond_to_channel_invite(
342 &self,
343 channel_id: ChannelId,
344 user_id: UserId,
345 accept: bool,
346 ) -> Result<RespondToChannelInvite> {
347 self.transaction(move |tx| async move {
348 let channel = self.get_channel_internal(channel_id, &tx).await?;
349
350 let membership_update = if accept {
351 let rows_affected = channel_member::Entity::update_many()
352 .set(channel_member::ActiveModel {
353 accepted: ActiveValue::Set(accept),
354 ..Default::default()
355 })
356 .filter(
357 channel_member::Column::ChannelId
358 .eq(channel_id)
359 .and(channel_member::Column::UserId.eq(user_id))
360 .and(channel_member::Column::Accepted.eq(false)),
361 )
362 .exec(&*tx)
363 .await?
364 .rows_affected;
365
366 if rows_affected == 0 {
367 Err(anyhow!("no such invitation"))?;
368 }
369
370 Some(
371 self.calculate_membership_updated(&channel, user_id, &tx)
372 .await?,
373 )
374 } else {
375 let rows_affected = channel_member::Entity::delete_many()
376 .filter(
377 channel_member::Column::ChannelId
378 .eq(channel_id)
379 .and(channel_member::Column::UserId.eq(user_id))
380 .and(channel_member::Column::Accepted.eq(false)),
381 )
382 .exec(&*tx)
383 .await?
384 .rows_affected;
385 if rows_affected == 0 {
386 Err(anyhow!("no such invitation"))?;
387 }
388
389 None
390 };
391
392 Ok(RespondToChannelInvite {
393 membership_update,
394 notifications: self
395 .mark_notification_as_read_with_response(
396 user_id,
397 &rpc::Notification::ChannelInvitation {
398 channel_id: channel_id.to_proto(),
399 channel_name: Default::default(),
400 inviter_id: Default::default(),
401 },
402 accept,
403 &tx,
404 )
405 .await?
406 .into_iter()
407 .collect(),
408 })
409 })
410 .await
411 }
412
413 async fn calculate_membership_updated(
414 &self,
415 channel: &channel::Model,
416 user_id: UserId,
417 tx: &DatabaseTransaction,
418 ) -> Result<MembershipUpdated> {
419 let new_channels = self
420 .get_user_channels(user_id, Some(channel), false, tx)
421 .await?;
422 let removed_channels = self
423 .get_channel_descendants_excluding_self([channel], tx)
424 .await?
425 .into_iter()
426 .map(|channel| channel.id)
427 .chain([channel.id])
428 .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
429 .collect::<Vec<_>>();
430
431 Ok(MembershipUpdated {
432 channel_id: channel.id,
433 new_channels,
434 removed_channels,
435 })
436 }
437
438 /// Removes a channel member.
439 pub async fn remove_channel_member(
440 &self,
441 channel_id: ChannelId,
442 member_id: UserId,
443 admin_id: UserId,
444 ) -> Result<RemoveChannelMemberResult> {
445 self.transaction(|tx| async move {
446 let channel = self.get_channel_internal(channel_id, &tx).await?;
447
448 if member_id != admin_id {
449 self.check_user_is_channel_admin(&channel, admin_id, &tx)
450 .await?;
451 }
452
453 let result = channel_member::Entity::delete_many()
454 .filter(
455 channel_member::Column::ChannelId
456 .eq(channel_id)
457 .and(channel_member::Column::UserId.eq(member_id)),
458 )
459 .exec(&*tx)
460 .await?;
461
462 if result.rows_affected == 0 {
463 Err(anyhow!("no such member"))?;
464 }
465
466 Ok(RemoveChannelMemberResult {
467 membership_update: self
468 .calculate_membership_updated(&channel, member_id, &tx)
469 .await?,
470 notification_id: self
471 .remove_notification(
472 member_id,
473 rpc::Notification::ChannelInvitation {
474 channel_id: channel_id.to_proto(),
475 channel_name: Default::default(),
476 inviter_id: Default::default(),
477 },
478 &tx,
479 )
480 .await?,
481 })
482 })
483 .await
484 }
485
486 /// Returns all channels for the user with the given ID.
487 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
488 self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
489 .await
490 }
491
492 /// Returns all channels for the user with the given ID that are descendants
493 /// of the specified ancestor channel.
494 pub async fn get_user_channels(
495 &self,
496 user_id: UserId,
497 ancestor_channel: Option<&channel::Model>,
498 include_invites: bool,
499 tx: &DatabaseTransaction,
500 ) -> Result<ChannelsForUser> {
501 let mut filter = channel_member::Column::UserId.eq(user_id);
502 if !include_invites {
503 filter = filter.and(channel_member::Column::Accepted.eq(true))
504 }
505 if let Some(ancestor) = ancestor_channel {
506 filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
507 }
508
509 let mut channels = Vec::<channel::Model>::new();
510 let mut invited_channels = Vec::<Channel>::new();
511 let mut channel_memberships = Vec::<channel_member::Model>::new();
512 let mut rows = channel_member::Entity::find()
513 .filter(filter)
514 .inner_join(channel::Entity)
515 .select_also(channel::Entity)
516 .stream(tx)
517 .await?;
518 while let Some(row) = rows.next().await {
519 if let (membership, Some(channel)) = row? {
520 if membership.accepted {
521 channel_memberships.push(membership);
522 channels.push(channel);
523 } else {
524 invited_channels.push(Channel::from_model(channel));
525 }
526 }
527 }
528 drop(rows);
529
530 let mut descendants = self
531 .get_channel_descendants_excluding_self(channels.iter(), tx)
532 .await?;
533
534 for channel in channels {
535 if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
536 descendants.insert(ix, channel);
537 }
538 }
539
540 let roles_by_channel_id = channel_memberships
541 .iter()
542 .map(|membership| (membership.channel_id, membership.role))
543 .collect::<HashMap<_, _>>();
544
545 let channels: Vec<Channel> = descendants
546 .into_iter()
547 .filter_map(|channel| {
548 let parent_role = roles_by_channel_id.get(&channel.root_id())?;
549 if parent_role.can_see_channel(channel.visibility) {
550 Some(Channel::from_model(channel))
551 } else {
552 None
553 }
554 })
555 .collect();
556
557 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
558 enum QueryUserIdsAndChannelIds {
559 ChannelId,
560 UserId,
561 }
562
563 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
564 {
565 let mut rows = room_participant::Entity::find()
566 .inner_join(room::Entity)
567 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
568 .select_only()
569 .column(room::Column::ChannelId)
570 .column(room_participant::Column::UserId)
571 .into_values::<_, QueryUserIdsAndChannelIds>()
572 .stream(tx)
573 .await?;
574 while let Some(row) = rows.next().await {
575 let row: (ChannelId, UserId) = row?;
576 channel_participants.entry(row.0).or_default().push(row.1)
577 }
578 }
579
580 let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
581
582 let mut channel_ids_by_buffer_id = HashMap::default();
583 let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
584 let mut rows = buffer::Entity::find()
585 .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
586 .stream(tx)
587 .await?;
588 while let Some(row) = rows.next().await {
589 let row = row?;
590 channel_ids_by_buffer_id.insert(row.id, row.channel_id);
591 latest_buffer_versions.push(ChannelBufferVersion {
592 channel_id: row.channel_id.0 as u64,
593 epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
594 version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
595 .latest_operation_lamport_timestamp
596 .zip(row.latest_operation_replica_id)
597 {
598 vec![VectorClockEntry {
599 timestamp: latest_lamport_timestamp as u32,
600 replica_id: latest_replica_id as u32,
601 }]
602 } else {
603 vec![]
604 },
605 });
606 }
607 drop(rows);
608
609 let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
610
611 let observed_buffer_versions = self
612 .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
613 .await?;
614
615 let observed_channel_messages = self
616 .observed_channel_messages(&channel_ids, user_id, tx)
617 .await?;
618
619 Ok(ChannelsForUser {
620 channel_memberships,
621 channels,
622 invited_channels,
623 channel_participants,
624 latest_buffer_versions,
625 latest_channel_messages,
626 observed_buffer_versions,
627 observed_channel_messages,
628 })
629 }
630
631 /// Sets the role for the specified channel member.
632 pub async fn set_channel_member_role(
633 &self,
634 channel_id: ChannelId,
635 admin_id: UserId,
636 for_user: UserId,
637 role: ChannelRole,
638 ) -> Result<SetMemberRoleResult> {
639 self.transaction(|tx| async move {
640 let channel = self.get_channel_internal(channel_id, &tx).await?;
641 self.check_user_is_channel_admin(&channel, admin_id, &tx)
642 .await?;
643
644 let membership = channel_member::Entity::find()
645 .filter(
646 channel_member::Column::ChannelId
647 .eq(channel_id)
648 .and(channel_member::Column::UserId.eq(for_user)),
649 )
650 .one(&*tx)
651 .await?
652 .context("no such member")?;
653
654 let mut update = membership.into_active_model();
655 update.role = ActiveValue::Set(role);
656 let updated = channel_member::Entity::update(update).exec(&*tx).await?;
657
658 if updated.accepted {
659 Ok(SetMemberRoleResult::MembershipUpdated(
660 self.calculate_membership_updated(&channel, for_user, &tx)
661 .await?,
662 ))
663 } else {
664 Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
665 channel,
666 )))
667 }
668 })
669 .await
670 }
671
672 /// Returns the details for the specified channel member.
673 pub async fn get_channel_participant_details(
674 &self,
675 channel_id: ChannelId,
676 filter: &str,
677 limit: u64,
678 user_id: UserId,
679 ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
680 let members = self
681 .transaction(move |tx| async move {
682 let channel = self.get_channel_internal(channel_id, &tx).await?;
683 self.check_user_is_channel_participant(&channel, user_id, &tx)
684 .await?;
685 let mut query = channel_member::Entity::find()
686 .find_also_related(user::Entity)
687 .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
688
689 if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
690 query = query.filter(Expr::cust_with_values(
691 "UPPER(github_login) LIKE ?",
692 [Self::fuzzy_like_string(&filter.to_uppercase())],
693 ))
694 } else {
695 query = query.filter(Expr::cust_with_values(
696 "github_login ILIKE $1",
697 [Self::fuzzy_like_string(filter)],
698 ))
699 }
700 let members = query.order_by(
701 Expr::cust(
702 "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
703 ),
704 sea_orm::Order::Asc,
705 )
706 .limit(limit)
707 .all(&*tx)
708 .await?;
709
710 Ok(members)
711 })
712 .await?;
713
714 let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
715
716 let members = members
717 .into_iter()
718 .map(|(member, user)| {
719 if let Some(user) = user {
720 users.push(proto::User {
721 id: user.id.to_proto(),
722 avatar_url: format!(
723 "https://github.com/{}.png?size=128",
724 user.github_login
725 ),
726 github_login: user.github_login,
727 name: user.name,
728 email: user.email_address,
729 })
730 }
731 proto::ChannelMember {
732 role: member.role.into(),
733 user_id: member.user_id.to_proto(),
734 kind: if member.accepted {
735 Kind::Member
736 } else {
737 Kind::Invitee
738 }
739 .into(),
740 }
741 })
742 .collect();
743
744 Ok((members, users))
745 }
746
747 /// Returns whether the given user is an admin in the specified channel.
748 pub async fn check_user_is_channel_admin(
749 &self,
750 channel: &channel::Model,
751 user_id: UserId,
752 tx: &DatabaseTransaction,
753 ) -> Result<ChannelRole> {
754 let role = self.channel_role_for_user(channel, user_id, tx).await?;
755 match role {
756 Some(ChannelRole::Admin) => Ok(role.unwrap()),
757 Some(ChannelRole::Member)
758 | Some(ChannelRole::Talker)
759 | Some(ChannelRole::Banned)
760 | Some(ChannelRole::Guest)
761 | None => Err(anyhow!(
762 "user is not a channel admin or channel does not exist"
763 ))?,
764 }
765 }
766
767 /// Returns whether the given user is a member of the specified channel.
768 pub async fn check_user_is_channel_member(
769 &self,
770 channel: &channel::Model,
771 user_id: UserId,
772 tx: &DatabaseTransaction,
773 ) -> Result<ChannelRole> {
774 let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
775 match channel_role {
776 Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
777 Some(ChannelRole::Banned)
778 | Some(ChannelRole::Guest)
779 | Some(ChannelRole::Talker)
780 | None => Err(anyhow!(
781 "user is not a channel member or channel does not exist"
782 ))?,
783 }
784 }
785
786 /// Returns whether the given user is a participant in the specified channel.
787 pub async fn check_user_is_channel_participant(
788 &self,
789 channel: &channel::Model,
790 user_id: UserId,
791 tx: &DatabaseTransaction,
792 ) -> Result<ChannelRole> {
793 let role = self.channel_role_for_user(channel, user_id, tx).await?;
794 match role {
795 Some(ChannelRole::Admin)
796 | Some(ChannelRole::Member)
797 | Some(ChannelRole::Guest)
798 | Some(ChannelRole::Talker) => Ok(role.unwrap()),
799 Some(ChannelRole::Banned) | None => Err(anyhow!(
800 "user is not a channel participant or channel does not exist"
801 ))?,
802 }
803 }
804
805 /// Returns a user's pending invite for the given channel, if one exists.
806 pub async fn pending_invite_for_channel(
807 &self,
808 channel: &channel::Model,
809 user_id: UserId,
810 tx: &DatabaseTransaction,
811 ) -> Result<Option<channel_member::Model>> {
812 let row = channel_member::Entity::find()
813 .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
814 .filter(channel_member::Column::UserId.eq(user_id))
815 .filter(channel_member::Column::Accepted.eq(false))
816 .one(tx)
817 .await?;
818
819 Ok(row)
820 }
821
822 /// Returns the role for a user in the given channel.
823 pub async fn channel_role_for_user(
824 &self,
825 channel: &channel::Model,
826 user_id: UserId,
827 tx: &DatabaseTransaction,
828 ) -> Result<Option<ChannelRole>> {
829 let membership = channel_member::Entity::find()
830 .filter(
831 channel_member::Column::ChannelId
832 .eq(channel.root_id())
833 .and(channel_member::Column::UserId.eq(user_id))
834 .and(channel_member::Column::Accepted.eq(true)),
835 )
836 .one(tx)
837 .await?;
838
839 let Some(membership) = membership else {
840 return Ok(None);
841 };
842
843 if !membership.role.can_see_channel(channel.visibility) {
844 return Ok(None);
845 }
846
847 Ok(Some(membership.role))
848 }
849
850 // Get the descendants of the given set if channels, ordered by their
851 // path.
852 pub(crate) async fn get_channel_descendants_excluding_self(
853 &self,
854 channels: impl IntoIterator<Item = &channel::Model>,
855 tx: &DatabaseTransaction,
856 ) -> Result<Vec<channel::Model>> {
857 let mut filter = Condition::any();
858 for channel in channels.into_iter() {
859 filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
860 }
861
862 if filter.is_empty() {
863 return Ok(vec![]);
864 }
865
866 Ok(channel::Entity::find()
867 .filter(filter)
868 .order_by_asc(Expr::cust("parent_path || id || '/'"))
869 .all(tx)
870 .await?)
871 }
872
873 /// Returns the channel with the given ID.
874 pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
875 self.transaction(|tx| async move {
876 let channel = self.get_channel_internal(channel_id, &tx).await?;
877 self.check_user_is_channel_participant(&channel, user_id, &tx)
878 .await?;
879
880 Ok(Channel::from_model(channel))
881 })
882 .await
883 }
884
885 pub(crate) async fn get_channel_internal(
886 &self,
887 channel_id: ChannelId,
888 tx: &DatabaseTransaction,
889 ) -> Result<channel::Model> {
890 Ok(channel::Entity::find_by_id(channel_id)
891 .one(tx)
892 .await?
893 .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
894 }
895
896 pub(crate) async fn get_or_create_channel_room(
897 &self,
898 channel_id: ChannelId,
899 livekit_room: &str,
900 tx: &DatabaseTransaction,
901 ) -> Result<RoomId> {
902 let room = room::Entity::find()
903 .filter(room::Column::ChannelId.eq(channel_id))
904 .one(tx)
905 .await?;
906
907 let room_id = if let Some(room) = room {
908 room.id
909 } else {
910 let result = room::Entity::insert(room::ActiveModel {
911 channel_id: ActiveValue::Set(Some(channel_id)),
912 live_kit_room: ActiveValue::Set(livekit_room.to_string()),
913 ..Default::default()
914 })
915 .exec(tx)
916 .await?;
917
918 result.last_insert_id
919 };
920
921 Ok(room_id)
922 }
923
924 /// Move a channel from one parent to another
925 pub async fn move_channel(
926 &self,
927 channel_id: ChannelId,
928 new_parent_id: ChannelId,
929 admin_id: UserId,
930 ) -> Result<(ChannelId, Vec<Channel>)> {
931 self.transaction(|tx| async move {
932 let channel = self.get_channel_internal(channel_id, &tx).await?;
933 self.check_user_is_channel_admin(&channel, admin_id, &tx)
934 .await?;
935 let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
936
937 if new_parent.root_id() != channel.root_id() {
938 Err(anyhow!(ErrorCode::WrongMoveTarget))?;
939 }
940
941 if new_parent
942 .ancestors_including_self()
943 .any(|id| id == channel.id)
944 {
945 Err(anyhow!(ErrorCode::CircularNesting))?;
946 }
947
948 if channel.visibility == ChannelVisibility::Public
949 && new_parent.visibility != ChannelVisibility::Public
950 {
951 Err(anyhow!(ErrorCode::BadPublicNesting))?;
952 }
953
954 let root_id = channel.root_id();
955 let old_path = format!("{}{}/", channel.parent_path, channel.id);
956 let new_path = format!("{}{}/", new_parent.path(), channel.id);
957
958 let mut model = channel.into_active_model();
959 model.parent_path = ActiveValue::Set(new_parent.path());
960 let channel = model.update(&*tx).await?;
961
962 let descendent_ids =
963 ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
964 self.pool.get_database_backend(),
965 "
966 UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
967 WHERE parent_path LIKE $3 || '%'
968 RETURNING id
969 ",
970 [old_path.clone().into(), new_path.into(), old_path.into()],
971 ))
972 .all(&*tx)
973 .await?;
974
975 let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
976
977 let channels = channel::Entity::find()
978 .filter(channel::Column::Id.is_in(all_moved_ids))
979 .all(&*tx)
980 .await?
981 .into_iter()
982 .map(Channel::from_model)
983 .collect::<Vec<_>>();
984
985 Ok((root_id, channels))
986 })
987 .await
988 }
989}
990
991#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
992enum QueryIds {
993 Id,
994}
995
996#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
997enum QueryUserIds {
998 UserId,
999}