channels.rs

  1use super::*;
  2use anyhow::Context as _;
  3use rpc::{
  4    ErrorCode, ErrorCodeExt,
  5    proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
  6};
  7use sea_orm::{DbBackend, TryGetableMany};
  8
  9impl Database {
 10    #[cfg(test)]
 11    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
 12        self.transaction(move |tx| async move {
 13            let mut channels = Vec::new();
 14            let mut rows = channel::Entity::find().stream(&*tx).await?;
 15            while let Some(row) = rows.next().await {
 16                let row = row?;
 17                channels.push((row.id, row.name));
 18            }
 19            Ok(channels)
 20        })
 21        .await
 22    }
 23
 24    #[cfg(test)]
 25    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
 26        Ok(self.create_channel(name, None, creator_id).await?.0.id)
 27    }
 28
 29    #[cfg(test)]
 30    pub async fn create_sub_channel(
 31        &self,
 32        name: &str,
 33        parent: ChannelId,
 34        creator_id: UserId,
 35    ) -> Result<ChannelId> {
 36        Ok(self
 37            .create_channel(name, Some(parent), creator_id)
 38            .await?
 39            .0
 40            .id)
 41    }
 42
 43    /// Creates a new channel.
 44    pub async fn create_channel(
 45        &self,
 46        name: &str,
 47        parent_channel_id: Option<ChannelId>,
 48        admin_id: UserId,
 49    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
 50        let name = Self::sanitize_channel_name(name)?;
 51        self.transaction(move |tx| async move {
 52            let mut parent = None;
 53            let mut membership = None;
 54
 55            if let Some(parent_channel_id) = parent_channel_id {
 56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
 57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
 58                    .await?;
 59                parent = Some(parent_channel);
 60            }
 61
 62            let channel = channel::ActiveModel {
 63                id: ActiveValue::NotSet,
 64                name: ActiveValue::Set(name.to_string()),
 65                visibility: ActiveValue::Set(ChannelVisibility::Members),
 66                parent_path: ActiveValue::Set(
 67                    parent
 68                        .as_ref()
 69                        .map_or(String::new(), |parent| parent.path()),
 70                ),
 71                requires_zed_cla: ActiveValue::NotSet,
 72            }
 73            .insert(&*tx)
 74            .await?;
 75
 76            if parent.is_none() {
 77                membership = Some(
 78                    channel_member::ActiveModel {
 79                        id: ActiveValue::NotSet,
 80                        channel_id: ActiveValue::Set(channel.id),
 81                        user_id: ActiveValue::Set(admin_id),
 82                        accepted: ActiveValue::Set(true),
 83                        role: ActiveValue::Set(ChannelRole::Admin),
 84                    }
 85                    .insert(&*tx)
 86                    .await?,
 87                );
 88            }
 89
 90            Ok((channel, membership))
 91        })
 92        .await
 93    }
 94
 95    /// Adds a user to the specified channel.
 96    pub async fn join_channel(
 97        &self,
 98        channel_id: ChannelId,
 99        user_id: UserId,
100        connection: ConnectionId,
101    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
102        self.transaction(move |tx| async move {
103            let channel = self.get_channel_internal(channel_id, &tx).await?;
104            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
105
106            let mut accept_invite_result = None;
107
108            if role.is_none() {
109                if let Some(invitation) = self
110                    .pending_invite_for_channel(&channel, user_id, &tx)
111                    .await?
112                {
113                    // note, this may be a parent channel
114                    role = Some(invitation.role);
115                    channel_member::Entity::update(channel_member::ActiveModel {
116                        accepted: ActiveValue::Set(true),
117                        ..invitation.into_active_model()
118                    })
119                    .exec(&*tx)
120                    .await?;
121
122                    accept_invite_result = Some(
123                        self.calculate_membership_updated(&channel, user_id, &tx)
124                            .await?,
125                    );
126
127                    debug_assert!(
128                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
129                    );
130                } else if channel.visibility == ChannelVisibility::Public {
131                    role = Some(ChannelRole::Guest);
132                    channel_member::Entity::insert(channel_member::ActiveModel {
133                        id: ActiveValue::NotSet,
134                        channel_id: ActiveValue::Set(channel.root_id()),
135                        user_id: ActiveValue::Set(user_id),
136                        accepted: ActiveValue::Set(true),
137                        role: ActiveValue::Set(ChannelRole::Guest),
138                    })
139                    .exec(&*tx)
140                    .await?;
141
142                    accept_invite_result = Some(
143                        self.calculate_membership_updated(&channel, user_id, &tx)
144                            .await?,
145                    );
146
147                    debug_assert!(
148                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
149                    );
150                }
151            }
152
153            if role.is_none() || role == Some(ChannelRole::Banned) {
154                Err(ErrorCode::Forbidden.anyhow())?
155            }
156            let role = role.unwrap();
157
158            let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
159            let room_id = self
160                .get_or_create_channel_room(channel_id, &livekit_room, &tx)
161                .await?;
162
163            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
164                .await
165                .map(|jr| (jr, accept_invite_result, role))
166        })
167        .await
168    }
169
170    /// Sets the visibility of the given channel.
171    pub async fn set_channel_visibility(
172        &self,
173        channel_id: ChannelId,
174        visibility: ChannelVisibility,
175        admin_id: UserId,
176    ) -> Result<channel::Model> {
177        self.transaction(move |tx| async move {
178            let channel = self.get_channel_internal(channel_id, &tx).await?;
179            self.check_user_is_channel_admin(&channel, admin_id, &tx)
180                .await?;
181
182            if visibility == ChannelVisibility::Public {
183                if let Some(parent_id) = channel.parent_id() {
184                    let parent = self.get_channel_internal(parent_id, &tx).await?;
185
186                    if parent.visibility != ChannelVisibility::Public {
187                        Err(ErrorCode::BadPublicNesting
188                            .with_tag("direction", "parent")
189                            .anyhow())?;
190                    }
191                }
192            } else if visibility == ChannelVisibility::Members
193                && self
194                    .get_channel_descendants_excluding_self([&channel], &tx)
195                    .await?
196                    .into_iter()
197                    .any(|channel| channel.visibility == ChannelVisibility::Public)
198            {
199                Err(ErrorCode::BadPublicNesting
200                    .with_tag("direction", "children")
201                    .anyhow())?;
202            }
203
204            let mut model = channel.into_active_model();
205            model.visibility = ActiveValue::Set(visibility);
206            let channel = model.update(&*tx).await?;
207
208            Ok(channel)
209        })
210        .await
211    }
212
213    #[cfg(test)]
214    pub async fn set_channel_requires_zed_cla(
215        &self,
216        channel_id: ChannelId,
217        requires_zed_cla: bool,
218    ) -> Result<()> {
219        self.transaction(move |tx| async move {
220            let channel = self.get_channel_internal(channel_id, &tx).await?;
221            let mut model = channel.into_active_model();
222            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
223            model.update(&*tx).await?;
224            Ok(())
225        })
226        .await
227    }
228
229    /// Deletes the channel with the specified ID.
230    pub async fn delete_channel(
231        &self,
232        channel_id: ChannelId,
233        user_id: UserId,
234    ) -> Result<(ChannelId, Vec<ChannelId>)> {
235        self.transaction(move |tx| async move {
236            let channel = self.get_channel_internal(channel_id, &tx).await?;
237            self.check_user_is_channel_admin(&channel, user_id, &tx)
238                .await?;
239
240            let channels_to_remove = self
241                .get_channel_descendants_excluding_self([&channel], &tx)
242                .await?
243                .into_iter()
244                .map(|channel| channel.id)
245                .chain(Some(channel_id))
246                .collect::<Vec<_>>();
247
248            channel::Entity::delete_many()
249                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
250                .exec(&*tx)
251                .await?;
252
253            Ok((channel.root_id(), channels_to_remove))
254        })
255        .await
256    }
257
258    /// Invites a user to a channel as a member.
259    pub async fn invite_channel_member(
260        &self,
261        channel_id: ChannelId,
262        invitee_id: UserId,
263        inviter_id: UserId,
264        role: ChannelRole,
265    ) -> Result<InviteMemberResult> {
266        self.transaction(move |tx| async move {
267            let channel = self.get_channel_internal(channel_id, &tx).await?;
268            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
269                .await?;
270            if !channel.is_root() {
271                Err(ErrorCode::NotARootChannel.anyhow())?
272            }
273
274            channel_member::ActiveModel {
275                id: ActiveValue::NotSet,
276                channel_id: ActiveValue::Set(channel_id),
277                user_id: ActiveValue::Set(invitee_id),
278                accepted: ActiveValue::Set(false),
279                role: ActiveValue::Set(role),
280            }
281            .insert(&*tx)
282            .await?;
283
284            let channel = Channel::from_model(channel);
285
286            let notifications = self
287                .create_notification(
288                    invitee_id,
289                    rpc::Notification::ChannelInvitation {
290                        channel_id: channel_id.to_proto(),
291                        channel_name: channel.name.clone(),
292                        inviter_id: inviter_id.to_proto(),
293                    },
294                    true,
295                    &tx,
296                )
297                .await?
298                .into_iter()
299                .collect();
300
301            Ok(InviteMemberResult {
302                channel,
303                notifications,
304            })
305        })
306        .await
307    }
308
309    fn sanitize_channel_name(name: &str) -> Result<&str> {
310        let new_name = name.trim().trim_start_matches('#');
311        if new_name.is_empty() {
312            Err(anyhow!("channel name can't be blank"))?;
313        }
314        Ok(new_name)
315    }
316
317    /// Renames the specified channel.
318    pub async fn rename_channel(
319        &self,
320        channel_id: ChannelId,
321        admin_id: UserId,
322        new_name: &str,
323    ) -> Result<channel::Model> {
324        self.transaction(move |tx| async move {
325            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
326
327            let channel = self.get_channel_internal(channel_id, &tx).await?;
328            self.check_user_is_channel_admin(&channel, admin_id, &tx)
329                .await?;
330
331            let mut model = channel.into_active_model();
332            model.name = ActiveValue::Set(new_name.clone());
333            let channel = model.update(&*tx).await?;
334
335            Ok(channel)
336        })
337        .await
338    }
339
340    /// accept or decline an invite to join a channel
341    pub async fn respond_to_channel_invite(
342        &self,
343        channel_id: ChannelId,
344        user_id: UserId,
345        accept: bool,
346    ) -> Result<RespondToChannelInvite> {
347        self.transaction(move |tx| async move {
348            let channel = self.get_channel_internal(channel_id, &tx).await?;
349
350            let membership_update = if accept {
351                let rows_affected = channel_member::Entity::update_many()
352                    .set(channel_member::ActiveModel {
353                        accepted: ActiveValue::Set(accept),
354                        ..Default::default()
355                    })
356                    .filter(
357                        channel_member::Column::ChannelId
358                            .eq(channel_id)
359                            .and(channel_member::Column::UserId.eq(user_id))
360                            .and(channel_member::Column::Accepted.eq(false)),
361                    )
362                    .exec(&*tx)
363                    .await?
364                    .rows_affected;
365
366                if rows_affected == 0 {
367                    Err(anyhow!("no such invitation"))?;
368                }
369
370                Some(
371                    self.calculate_membership_updated(&channel, user_id, &tx)
372                        .await?,
373                )
374            } else {
375                let rows_affected = channel_member::Entity::delete_many()
376                    .filter(
377                        channel_member::Column::ChannelId
378                            .eq(channel_id)
379                            .and(channel_member::Column::UserId.eq(user_id))
380                            .and(channel_member::Column::Accepted.eq(false)),
381                    )
382                    .exec(&*tx)
383                    .await?
384                    .rows_affected;
385                if rows_affected == 0 {
386                    Err(anyhow!("no such invitation"))?;
387                }
388
389                None
390            };
391
392            Ok(RespondToChannelInvite {
393                membership_update,
394                notifications: self
395                    .mark_notification_as_read_with_response(
396                        user_id,
397                        &rpc::Notification::ChannelInvitation {
398                            channel_id: channel_id.to_proto(),
399                            channel_name: Default::default(),
400                            inviter_id: Default::default(),
401                        },
402                        accept,
403                        &tx,
404                    )
405                    .await?
406                    .into_iter()
407                    .collect(),
408            })
409        })
410        .await
411    }
412
413    async fn calculate_membership_updated(
414        &self,
415        channel: &channel::Model,
416        user_id: UserId,
417        tx: &DatabaseTransaction,
418    ) -> Result<MembershipUpdated> {
419        let new_channels = self
420            .get_user_channels(user_id, Some(channel), false, tx)
421            .await?;
422        let removed_channels = self
423            .get_channel_descendants_excluding_self([channel], tx)
424            .await?
425            .into_iter()
426            .map(|channel| channel.id)
427            .chain([channel.id])
428            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
429            .collect::<Vec<_>>();
430
431        Ok(MembershipUpdated {
432            channel_id: channel.id,
433            new_channels,
434            removed_channels,
435        })
436    }
437
438    /// Removes a channel member.
439    pub async fn remove_channel_member(
440        &self,
441        channel_id: ChannelId,
442        member_id: UserId,
443        admin_id: UserId,
444    ) -> Result<RemoveChannelMemberResult> {
445        self.transaction(|tx| async move {
446            let channel = self.get_channel_internal(channel_id, &tx).await?;
447
448            if member_id != admin_id {
449                self.check_user_is_channel_admin(&channel, admin_id, &tx)
450                    .await?;
451            }
452
453            let result = channel_member::Entity::delete_many()
454                .filter(
455                    channel_member::Column::ChannelId
456                        .eq(channel_id)
457                        .and(channel_member::Column::UserId.eq(member_id)),
458                )
459                .exec(&*tx)
460                .await?;
461
462            if result.rows_affected == 0 {
463                Err(anyhow!("no such member"))?;
464            }
465
466            Ok(RemoveChannelMemberResult {
467                membership_update: self
468                    .calculate_membership_updated(&channel, member_id, &tx)
469                    .await?,
470                notification_id: self
471                    .remove_notification(
472                        member_id,
473                        rpc::Notification::ChannelInvitation {
474                            channel_id: channel_id.to_proto(),
475                            channel_name: Default::default(),
476                            inviter_id: Default::default(),
477                        },
478                        &tx,
479                    )
480                    .await?,
481            })
482        })
483        .await
484    }
485
486    /// Returns all channels for the user with the given ID.
487    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
488        self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
489            .await
490    }
491
492    /// Returns all channels for the user with the given ID that are descendants
493    /// of the specified ancestor channel.
494    pub async fn get_user_channels(
495        &self,
496        user_id: UserId,
497        ancestor_channel: Option<&channel::Model>,
498        include_invites: bool,
499        tx: &DatabaseTransaction,
500    ) -> Result<ChannelsForUser> {
501        let mut filter = channel_member::Column::UserId.eq(user_id);
502        if !include_invites {
503            filter = filter.and(channel_member::Column::Accepted.eq(true))
504        }
505        if let Some(ancestor) = ancestor_channel {
506            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
507        }
508
509        let mut channels = Vec::<channel::Model>::new();
510        let mut invited_channels = Vec::<Channel>::new();
511        let mut channel_memberships = Vec::<channel_member::Model>::new();
512        let mut rows = channel_member::Entity::find()
513            .filter(filter)
514            .inner_join(channel::Entity)
515            .select_also(channel::Entity)
516            .stream(tx)
517            .await?;
518        while let Some(row) = rows.next().await {
519            if let (membership, Some(channel)) = row? {
520                if membership.accepted {
521                    channel_memberships.push(membership);
522                    channels.push(channel);
523                } else {
524                    invited_channels.push(Channel::from_model(channel));
525                }
526            }
527        }
528        drop(rows);
529
530        let mut descendants = self
531            .get_channel_descendants_excluding_self(channels.iter(), tx)
532            .await?;
533
534        for channel in channels {
535            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
536                descendants.insert(ix, channel);
537            }
538        }
539
540        let roles_by_channel_id = channel_memberships
541            .iter()
542            .map(|membership| (membership.channel_id, membership.role))
543            .collect::<HashMap<_, _>>();
544
545        let channels: Vec<Channel> = descendants
546            .into_iter()
547            .filter_map(|channel| {
548                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
549                if parent_role.can_see_channel(channel.visibility) {
550                    Some(Channel::from_model(channel))
551                } else {
552                    None
553                }
554            })
555            .collect();
556
557        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
558        enum QueryUserIdsAndChannelIds {
559            ChannelId,
560            UserId,
561        }
562
563        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
564        {
565            let mut rows = room_participant::Entity::find()
566                .inner_join(room::Entity)
567                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
568                .select_only()
569                .column(room::Column::ChannelId)
570                .column(room_participant::Column::UserId)
571                .into_values::<_, QueryUserIdsAndChannelIds>()
572                .stream(tx)
573                .await?;
574            while let Some(row) = rows.next().await {
575                let row: (ChannelId, UserId) = row?;
576                channel_participants.entry(row.0).or_default().push(row.1)
577            }
578        }
579
580        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
581
582        let mut channel_ids_by_buffer_id = HashMap::default();
583        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
584        let mut rows = buffer::Entity::find()
585            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
586            .stream(tx)
587            .await?;
588        while let Some(row) = rows.next().await {
589            let row = row?;
590            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
591            latest_buffer_versions.push(ChannelBufferVersion {
592                channel_id: row.channel_id.0 as u64,
593                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
594                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
595                    .latest_operation_lamport_timestamp
596                    .zip(row.latest_operation_replica_id)
597                {
598                    vec![VectorClockEntry {
599                        timestamp: latest_lamport_timestamp as u32,
600                        replica_id: latest_replica_id as u32,
601                    }]
602                } else {
603                    vec![]
604                },
605            });
606        }
607        drop(rows);
608
609        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
610
611        let observed_buffer_versions = self
612            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
613            .await?;
614
615        let observed_channel_messages = self
616            .observed_channel_messages(&channel_ids, user_id, tx)
617            .await?;
618
619        Ok(ChannelsForUser {
620            channel_memberships,
621            channels,
622            invited_channels,
623            channel_participants,
624            latest_buffer_versions,
625            latest_channel_messages,
626            observed_buffer_versions,
627            observed_channel_messages,
628        })
629    }
630
631    /// Sets the role for the specified channel member.
632    pub async fn set_channel_member_role(
633        &self,
634        channel_id: ChannelId,
635        admin_id: UserId,
636        for_user: UserId,
637        role: ChannelRole,
638    ) -> Result<SetMemberRoleResult> {
639        self.transaction(|tx| async move {
640            let channel = self.get_channel_internal(channel_id, &tx).await?;
641            self.check_user_is_channel_admin(&channel, admin_id, &tx)
642                .await?;
643
644            let membership = channel_member::Entity::find()
645                .filter(
646                    channel_member::Column::ChannelId
647                        .eq(channel_id)
648                        .and(channel_member::Column::UserId.eq(for_user)),
649                )
650                .one(&*tx)
651                .await?
652                .context("no such member")?;
653
654            let mut update = membership.into_active_model();
655            update.role = ActiveValue::Set(role);
656            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
657
658            if updated.accepted {
659                Ok(SetMemberRoleResult::MembershipUpdated(
660                    self.calculate_membership_updated(&channel, for_user, &tx)
661                        .await?,
662                ))
663            } else {
664                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
665                    channel,
666                )))
667            }
668        })
669        .await
670    }
671
672    /// Returns the details for the specified channel member.
673    pub async fn get_channel_participant_details(
674        &self,
675        channel_id: ChannelId,
676        filter: &str,
677        limit: u64,
678        user_id: UserId,
679    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
680        let members = self
681            .transaction(move |tx| async move {
682                let channel = self.get_channel_internal(channel_id, &tx).await?;
683                self.check_user_is_channel_participant(&channel, user_id, &tx)
684                    .await?;
685                let mut query = channel_member::Entity::find()
686                    .find_also_related(user::Entity)
687                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
688
689                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
690                    query = query.filter(Expr::cust_with_values(
691                        "UPPER(github_login) LIKE ?",
692                        [Self::fuzzy_like_string(&filter.to_uppercase())],
693                    ))
694                } else {
695                    query = query.filter(Expr::cust_with_values(
696                        "github_login ILIKE $1",
697                        [Self::fuzzy_like_string(filter)],
698                    ))
699                }
700                let members = query.order_by(
701                        Expr::cust(
702                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
703                        ),
704                        sea_orm::Order::Asc,
705                    )
706                    .limit(limit)
707                    .all(&*tx)
708                    .await?;
709
710                Ok(members)
711            })
712            .await?;
713
714        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
715
716        let members = members
717            .into_iter()
718            .map(|(member, user)| {
719                if let Some(user) = user {
720                    users.push(proto::User {
721                        id: user.id.to_proto(),
722                        avatar_url: format!(
723                            "https://github.com/{}.png?size=128",
724                            user.github_login
725                        ),
726                        github_login: user.github_login,
727                        name: user.name,
728                        email: user.email_address,
729                    })
730                }
731                proto::ChannelMember {
732                    role: member.role.into(),
733                    user_id: member.user_id.to_proto(),
734                    kind: if member.accepted {
735                        Kind::Member
736                    } else {
737                        Kind::Invitee
738                    }
739                    .into(),
740                }
741            })
742            .collect();
743
744        Ok((members, users))
745    }
746
747    /// Returns whether the given user is an admin in the specified channel.
748    pub async fn check_user_is_channel_admin(
749        &self,
750        channel: &channel::Model,
751        user_id: UserId,
752        tx: &DatabaseTransaction,
753    ) -> Result<ChannelRole> {
754        let role = self.channel_role_for_user(channel, user_id, tx).await?;
755        match role {
756            Some(ChannelRole::Admin) => Ok(role.unwrap()),
757            Some(ChannelRole::Member)
758            | Some(ChannelRole::Talker)
759            | Some(ChannelRole::Banned)
760            | Some(ChannelRole::Guest)
761            | None => Err(anyhow!(
762                "user is not a channel admin or channel does not exist"
763            ))?,
764        }
765    }
766
767    /// Returns whether the given user is a member of the specified channel.
768    pub async fn check_user_is_channel_member(
769        &self,
770        channel: &channel::Model,
771        user_id: UserId,
772        tx: &DatabaseTransaction,
773    ) -> Result<ChannelRole> {
774        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
775        match channel_role {
776            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
777            Some(ChannelRole::Banned)
778            | Some(ChannelRole::Guest)
779            | Some(ChannelRole::Talker)
780            | None => Err(anyhow!(
781                "user is not a channel member or channel does not exist"
782            ))?,
783        }
784    }
785
786    /// Returns whether the given user is a participant in the specified channel.
787    pub async fn check_user_is_channel_participant(
788        &self,
789        channel: &channel::Model,
790        user_id: UserId,
791        tx: &DatabaseTransaction,
792    ) -> Result<ChannelRole> {
793        let role = self.channel_role_for_user(channel, user_id, tx).await?;
794        match role {
795            Some(ChannelRole::Admin)
796            | Some(ChannelRole::Member)
797            | Some(ChannelRole::Guest)
798            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
799            Some(ChannelRole::Banned) | None => Err(anyhow!(
800                "user is not a channel participant or channel does not exist"
801            ))?,
802        }
803    }
804
805    /// Returns a user's pending invite for the given channel, if one exists.
806    pub async fn pending_invite_for_channel(
807        &self,
808        channel: &channel::Model,
809        user_id: UserId,
810        tx: &DatabaseTransaction,
811    ) -> Result<Option<channel_member::Model>> {
812        let row = channel_member::Entity::find()
813            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
814            .filter(channel_member::Column::UserId.eq(user_id))
815            .filter(channel_member::Column::Accepted.eq(false))
816            .one(tx)
817            .await?;
818
819        Ok(row)
820    }
821
822    /// Returns the role for a user in the given channel.
823    pub async fn channel_role_for_user(
824        &self,
825        channel: &channel::Model,
826        user_id: UserId,
827        tx: &DatabaseTransaction,
828    ) -> Result<Option<ChannelRole>> {
829        let membership = channel_member::Entity::find()
830            .filter(
831                channel_member::Column::ChannelId
832                    .eq(channel.root_id())
833                    .and(channel_member::Column::UserId.eq(user_id))
834                    .and(channel_member::Column::Accepted.eq(true)),
835            )
836            .one(tx)
837            .await?;
838
839        let Some(membership) = membership else {
840            return Ok(None);
841        };
842
843        if !membership.role.can_see_channel(channel.visibility) {
844            return Ok(None);
845        }
846
847        Ok(Some(membership.role))
848    }
849
850    // Get the descendants of the given set if channels, ordered by their
851    // path.
852    pub(crate) async fn get_channel_descendants_excluding_self(
853        &self,
854        channels: impl IntoIterator<Item = &channel::Model>,
855        tx: &DatabaseTransaction,
856    ) -> Result<Vec<channel::Model>> {
857        let mut filter = Condition::any();
858        for channel in channels.into_iter() {
859            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
860        }
861
862        if filter.is_empty() {
863            return Ok(vec![]);
864        }
865
866        Ok(channel::Entity::find()
867            .filter(filter)
868            .order_by_asc(Expr::cust("parent_path || id || '/'"))
869            .all(tx)
870            .await?)
871    }
872
873    /// Returns the channel with the given ID.
874    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
875        self.transaction(|tx| async move {
876            let channel = self.get_channel_internal(channel_id, &tx).await?;
877            self.check_user_is_channel_participant(&channel, user_id, &tx)
878                .await?;
879
880            Ok(Channel::from_model(channel))
881        })
882        .await
883    }
884
885    pub(crate) async fn get_channel_internal(
886        &self,
887        channel_id: ChannelId,
888        tx: &DatabaseTransaction,
889    ) -> Result<channel::Model> {
890        Ok(channel::Entity::find_by_id(channel_id)
891            .one(tx)
892            .await?
893            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
894    }
895
896    pub(crate) async fn get_or_create_channel_room(
897        &self,
898        channel_id: ChannelId,
899        livekit_room: &str,
900        tx: &DatabaseTransaction,
901    ) -> Result<RoomId> {
902        let room = room::Entity::find()
903            .filter(room::Column::ChannelId.eq(channel_id))
904            .one(tx)
905            .await?;
906
907        let room_id = if let Some(room) = room {
908            room.id
909        } else {
910            let result = room::Entity::insert(room::ActiveModel {
911                channel_id: ActiveValue::Set(Some(channel_id)),
912                live_kit_room: ActiveValue::Set(livekit_room.to_string()),
913                ..Default::default()
914            })
915            .exec(tx)
916            .await?;
917
918            result.last_insert_id
919        };
920
921        Ok(room_id)
922    }
923
924    /// Move a channel from one parent to another
925    pub async fn move_channel(
926        &self,
927        channel_id: ChannelId,
928        new_parent_id: ChannelId,
929        admin_id: UserId,
930    ) -> Result<(ChannelId, Vec<Channel>)> {
931        self.transaction(|tx| async move {
932            let channel = self.get_channel_internal(channel_id, &tx).await?;
933            self.check_user_is_channel_admin(&channel, admin_id, &tx)
934                .await?;
935            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
936
937            if new_parent.root_id() != channel.root_id() {
938                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
939            }
940
941            if new_parent
942                .ancestors_including_self()
943                .any(|id| id == channel.id)
944            {
945                Err(anyhow!(ErrorCode::CircularNesting))?;
946            }
947
948            if channel.visibility == ChannelVisibility::Public
949                && new_parent.visibility != ChannelVisibility::Public
950            {
951                Err(anyhow!(ErrorCode::BadPublicNesting))?;
952            }
953
954            let root_id = channel.root_id();
955            let old_path = format!("{}{}/", channel.parent_path, channel.id);
956            let new_path = format!("{}{}/", new_parent.path(), channel.id);
957
958            let mut model = channel.into_active_model();
959            model.parent_path = ActiveValue::Set(new_parent.path());
960            let channel = model.update(&*tx).await?;
961
962            let descendent_ids =
963                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
964                    self.pool.get_database_backend(),
965                    "
966                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
967                    WHERE parent_path LIKE $3 || '%'
968                    RETURNING id
969                ",
970                    [old_path.clone().into(), new_path.into(), old_path.into()],
971                ))
972                .all(&*tx)
973                .await?;
974
975            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
976
977            let channels = channel::Entity::find()
978                .filter(channel::Column::Id.is_in(all_moved_ids))
979                .all(&*tx)
980                .await?
981                .into_iter()
982                .map(Channel::from_model)
983                .collect::<Vec<_>>();
984
985            Ok((root_id, channels))
986        })
987        .await
988    }
989}
990
991#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
992enum QueryIds {
993    Id,
994}
995
996#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
997enum QueryUserIds {
998    UserId,
999}