channels.rs

  1use super::*;
  2use rpc::{
  3    proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
  4    ErrorCode, ErrorCodeExt,
  5};
  6use sea_orm::{DbBackend, TryGetableMany};
  7
  8impl Database {
  9    #[cfg(test)]
 10    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
 11        self.transaction(move |tx| async move {
 12            let mut channels = Vec::new();
 13            let mut rows = channel::Entity::find().stream(&*tx).await?;
 14            while let Some(row) = rows.next().await {
 15                let row = row?;
 16                channels.push((row.id, row.name));
 17            }
 18            Ok(channels)
 19        })
 20        .await
 21    }
 22
 23    #[cfg(test)]
 24    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
 25        Ok(self.create_channel(name, None, creator_id).await?.0.id)
 26    }
 27
 28    #[cfg(test)]
 29    pub async fn create_sub_channel(
 30        &self,
 31        name: &str,
 32        parent: ChannelId,
 33        creator_id: UserId,
 34    ) -> Result<ChannelId> {
 35        Ok(self
 36            .create_channel(name, Some(parent), creator_id)
 37            .await?
 38            .0
 39            .id)
 40    }
 41
 42    /// Creates a new channel.
 43    pub async fn create_channel(
 44        &self,
 45        name: &str,
 46        parent_channel_id: Option<ChannelId>,
 47        admin_id: UserId,
 48    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
 49        let name = Self::sanitize_channel_name(name)?;
 50        self.transaction(move |tx| async move {
 51            let mut parent = None;
 52            let mut membership = None;
 53
 54            if let Some(parent_channel_id) = parent_channel_id {
 55                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
 56                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
 57                    .await?;
 58                parent = Some(parent_channel);
 59            }
 60
 61            let channel = channel::ActiveModel {
 62                id: ActiveValue::NotSet,
 63                name: ActiveValue::Set(name.to_string()),
 64                visibility: ActiveValue::Set(ChannelVisibility::Members),
 65                parent_path: ActiveValue::Set(
 66                    parent
 67                        .as_ref()
 68                        .map_or(String::new(), |parent| parent.path()),
 69                ),
 70                requires_zed_cla: ActiveValue::NotSet,
 71            }
 72            .insert(&*tx)
 73            .await?;
 74
 75            if parent.is_none() {
 76                membership = Some(
 77                    channel_member::ActiveModel {
 78                        id: ActiveValue::NotSet,
 79                        channel_id: ActiveValue::Set(channel.id),
 80                        user_id: ActiveValue::Set(admin_id),
 81                        accepted: ActiveValue::Set(true),
 82                        role: ActiveValue::Set(ChannelRole::Admin),
 83                    }
 84                    .insert(&*tx)
 85                    .await?,
 86                );
 87            }
 88
 89            Ok((channel, membership))
 90        })
 91        .await
 92    }
 93
 94    /// Adds a user to the specified channel.
 95    pub async fn join_channel(
 96        &self,
 97        channel_id: ChannelId,
 98        user_id: UserId,
 99        connection: ConnectionId,
100    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
101        self.transaction(move |tx| async move {
102            let channel = self.get_channel_internal(channel_id, &tx).await?;
103            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
104
105            let mut accept_invite_result = None;
106
107            if role.is_none() {
108                if let Some(invitation) = self
109                    .pending_invite_for_channel(&channel, user_id, &tx)
110                    .await?
111                {
112                    // note, this may be a parent channel
113                    role = Some(invitation.role);
114                    channel_member::Entity::update(channel_member::ActiveModel {
115                        accepted: ActiveValue::Set(true),
116                        ..invitation.into_active_model()
117                    })
118                    .exec(&*tx)
119                    .await?;
120
121                    accept_invite_result = Some(
122                        self.calculate_membership_updated(&channel, user_id, &tx)
123                            .await?,
124                    );
125
126                    debug_assert!(
127                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
128                    );
129                } else if channel.visibility == ChannelVisibility::Public {
130                    role = Some(ChannelRole::Guest);
131                    channel_member::Entity::insert(channel_member::ActiveModel {
132                        id: ActiveValue::NotSet,
133                        channel_id: ActiveValue::Set(channel.root_id()),
134                        user_id: ActiveValue::Set(user_id),
135                        accepted: ActiveValue::Set(true),
136                        role: ActiveValue::Set(ChannelRole::Guest),
137                    })
138                    .exec(&*tx)
139                    .await?;
140
141                    accept_invite_result = Some(
142                        self.calculate_membership_updated(&channel, user_id, &tx)
143                            .await?,
144                    );
145
146                    debug_assert!(
147                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
148                    );
149                }
150            }
151
152            if role.is_none() || role == Some(ChannelRole::Banned) {
153                Err(ErrorCode::Forbidden.anyhow())?
154            }
155            let role = role.unwrap();
156
157            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
158            let room_id = self
159                .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
160                .await?;
161
162            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
163                .await
164                .map(|jr| (jr, accept_invite_result, role))
165        })
166        .await
167    }
168
169    /// Sets the visibility of the given channel.
170    pub async fn set_channel_visibility(
171        &self,
172        channel_id: ChannelId,
173        visibility: ChannelVisibility,
174        admin_id: UserId,
175    ) -> Result<channel::Model> {
176        self.transaction(move |tx| async move {
177            let channel = self.get_channel_internal(channel_id, &tx).await?;
178            self.check_user_is_channel_admin(&channel, admin_id, &tx)
179                .await?;
180
181            if visibility == ChannelVisibility::Public {
182                if let Some(parent_id) = channel.parent_id() {
183                    let parent = self.get_channel_internal(parent_id, &tx).await?;
184
185                    if parent.visibility != ChannelVisibility::Public {
186                        Err(ErrorCode::BadPublicNesting
187                            .with_tag("direction", "parent")
188                            .anyhow())?;
189                    }
190                }
191            } else if visibility == ChannelVisibility::Members
192                && self
193                    .get_channel_descendants_excluding_self([&channel], &tx)
194                    .await?
195                    .into_iter()
196                    .any(|channel| channel.visibility == ChannelVisibility::Public)
197            {
198                Err(ErrorCode::BadPublicNesting
199                    .with_tag("direction", "children")
200                    .anyhow())?;
201            }
202
203            let mut model = channel.into_active_model();
204            model.visibility = ActiveValue::Set(visibility);
205            let channel = model.update(&*tx).await?;
206
207            Ok(channel)
208        })
209        .await
210    }
211
212    #[cfg(test)]
213    pub async fn set_channel_requires_zed_cla(
214        &self,
215        channel_id: ChannelId,
216        requires_zed_cla: bool,
217    ) -> Result<()> {
218        self.transaction(move |tx| async move {
219            let channel = self.get_channel_internal(channel_id, &tx).await?;
220            let mut model = channel.into_active_model();
221            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
222            model.update(&*tx).await?;
223            Ok(())
224        })
225        .await
226    }
227
228    /// Deletes the channel with the specified ID.
229    pub async fn delete_channel(
230        &self,
231        channel_id: ChannelId,
232        user_id: UserId,
233    ) -> Result<(ChannelId, Vec<ChannelId>)> {
234        self.transaction(move |tx| async move {
235            let channel = self.get_channel_internal(channel_id, &tx).await?;
236            self.check_user_is_channel_admin(&channel, user_id, &tx)
237                .await?;
238
239            let channels_to_remove = self
240                .get_channel_descendants_excluding_self([&channel], &tx)
241                .await?
242                .into_iter()
243                .map(|channel| channel.id)
244                .chain(Some(channel_id))
245                .collect::<Vec<_>>();
246
247            channel::Entity::delete_many()
248                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
249                .exec(&*tx)
250                .await?;
251
252            Ok((channel.root_id(), channels_to_remove))
253        })
254        .await
255    }
256
257    /// Invites a user to a channel as a member.
258    pub async fn invite_channel_member(
259        &self,
260        channel_id: ChannelId,
261        invitee_id: UserId,
262        inviter_id: UserId,
263        role: ChannelRole,
264    ) -> Result<InviteMemberResult> {
265        self.transaction(move |tx| async move {
266            let channel = self.get_channel_internal(channel_id, &tx).await?;
267            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
268                .await?;
269            if !channel.is_root() {
270                Err(ErrorCode::NotARootChannel.anyhow())?
271            }
272
273            channel_member::ActiveModel {
274                id: ActiveValue::NotSet,
275                channel_id: ActiveValue::Set(channel_id),
276                user_id: ActiveValue::Set(invitee_id),
277                accepted: ActiveValue::Set(false),
278                role: ActiveValue::Set(role),
279            }
280            .insert(&*tx)
281            .await?;
282
283            let channel = Channel::from_model(channel);
284
285            let notifications = self
286                .create_notification(
287                    invitee_id,
288                    rpc::Notification::ChannelInvitation {
289                        channel_id: channel_id.to_proto(),
290                        channel_name: channel.name.clone(),
291                        inviter_id: inviter_id.to_proto(),
292                    },
293                    true,
294                    &tx,
295                )
296                .await?
297                .into_iter()
298                .collect();
299
300            Ok(InviteMemberResult {
301                channel,
302                notifications,
303            })
304        })
305        .await
306    }
307
308    fn sanitize_channel_name(name: &str) -> Result<&str> {
309        let new_name = name.trim().trim_start_matches('#');
310        if new_name.is_empty() {
311            Err(anyhow!("channel name can't be blank"))?;
312        }
313        Ok(new_name)
314    }
315
316    /// Renames the specified channel.
317    pub async fn rename_channel(
318        &self,
319        channel_id: ChannelId,
320        admin_id: UserId,
321        new_name: &str,
322    ) -> Result<channel::Model> {
323        self.transaction(move |tx| async move {
324            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
325
326            let channel = self.get_channel_internal(channel_id, &tx).await?;
327            self.check_user_is_channel_admin(&channel, admin_id, &tx)
328                .await?;
329
330            let mut model = channel.into_active_model();
331            model.name = ActiveValue::Set(new_name.clone());
332            let channel = model.update(&*tx).await?;
333
334            Ok(channel)
335        })
336        .await
337    }
338
339    /// accept or decline an invite to join a channel
340    pub async fn respond_to_channel_invite(
341        &self,
342        channel_id: ChannelId,
343        user_id: UserId,
344        accept: bool,
345    ) -> Result<RespondToChannelInvite> {
346        self.transaction(move |tx| async move {
347            let channel = self.get_channel_internal(channel_id, &tx).await?;
348
349            let membership_update = if accept {
350                let rows_affected = channel_member::Entity::update_many()
351                    .set(channel_member::ActiveModel {
352                        accepted: ActiveValue::Set(accept),
353                        ..Default::default()
354                    })
355                    .filter(
356                        channel_member::Column::ChannelId
357                            .eq(channel_id)
358                            .and(channel_member::Column::UserId.eq(user_id))
359                            .and(channel_member::Column::Accepted.eq(false)),
360                    )
361                    .exec(&*tx)
362                    .await?
363                    .rows_affected;
364
365                if rows_affected == 0 {
366                    Err(anyhow!("no such invitation"))?;
367                }
368
369                Some(
370                    self.calculate_membership_updated(&channel, user_id, &tx)
371                        .await?,
372                )
373            } else {
374                let rows_affected = channel_member::Entity::delete_many()
375                    .filter(
376                        channel_member::Column::ChannelId
377                            .eq(channel_id)
378                            .and(channel_member::Column::UserId.eq(user_id))
379                            .and(channel_member::Column::Accepted.eq(false)),
380                    )
381                    .exec(&*tx)
382                    .await?
383                    .rows_affected;
384                if rows_affected == 0 {
385                    Err(anyhow!("no such invitation"))?;
386                }
387
388                None
389            };
390
391            Ok(RespondToChannelInvite {
392                membership_update,
393                notifications: self
394                    .mark_notification_as_read_with_response(
395                        user_id,
396                        &rpc::Notification::ChannelInvitation {
397                            channel_id: channel_id.to_proto(),
398                            channel_name: Default::default(),
399                            inviter_id: Default::default(),
400                        },
401                        accept,
402                        &tx,
403                    )
404                    .await?
405                    .into_iter()
406                    .collect(),
407            })
408        })
409        .await
410    }
411
412    async fn calculate_membership_updated(
413        &self,
414        channel: &channel::Model,
415        user_id: UserId,
416        tx: &DatabaseTransaction,
417    ) -> Result<MembershipUpdated> {
418        let new_channels = self
419            .get_user_channels(user_id, Some(channel), false, tx)
420            .await?;
421        let removed_channels = self
422            .get_channel_descendants_excluding_self([channel], tx)
423            .await?
424            .into_iter()
425            .map(|channel| channel.id)
426            .chain([channel.id])
427            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
428            .collect::<Vec<_>>();
429
430        Ok(MembershipUpdated {
431            channel_id: channel.id,
432            new_channels,
433            removed_channels,
434        })
435    }
436
437    /// Removes a channel member.
438    pub async fn remove_channel_member(
439        &self,
440        channel_id: ChannelId,
441        member_id: UserId,
442        admin_id: UserId,
443    ) -> Result<RemoveChannelMemberResult> {
444        self.transaction(|tx| async move {
445            let channel = self.get_channel_internal(channel_id, &tx).await?;
446
447            if member_id != admin_id {
448                self.check_user_is_channel_admin(&channel, admin_id, &tx)
449                    .await?;
450            }
451
452            let result = channel_member::Entity::delete_many()
453                .filter(
454                    channel_member::Column::ChannelId
455                        .eq(channel_id)
456                        .and(channel_member::Column::UserId.eq(member_id)),
457                )
458                .exec(&*tx)
459                .await?;
460
461            if result.rows_affected == 0 {
462                Err(anyhow!("no such member"))?;
463            }
464
465            Ok(RemoveChannelMemberResult {
466                membership_update: self
467                    .calculate_membership_updated(&channel, member_id, &tx)
468                    .await?,
469                notification_id: self
470                    .remove_notification(
471                        member_id,
472                        rpc::Notification::ChannelInvitation {
473                            channel_id: channel_id.to_proto(),
474                            channel_name: Default::default(),
475                            inviter_id: Default::default(),
476                        },
477                        &tx,
478                    )
479                    .await?,
480            })
481        })
482        .await
483    }
484
485    /// Returns all channels for the user with the given ID.
486    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
487        self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
488            .await
489    }
490
491    /// Returns all channels for the user with the given ID that are descendants
492    /// of the specified ancestor channel.
493    pub async fn get_user_channels(
494        &self,
495        user_id: UserId,
496        ancestor_channel: Option<&channel::Model>,
497        include_invites: bool,
498        tx: &DatabaseTransaction,
499    ) -> Result<ChannelsForUser> {
500        let mut filter = channel_member::Column::UserId.eq(user_id);
501        if !include_invites {
502            filter = filter.and(channel_member::Column::Accepted.eq(true))
503        }
504        if let Some(ancestor) = ancestor_channel {
505            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
506        }
507
508        let mut channels = Vec::<channel::Model>::new();
509        let mut invited_channels = Vec::<Channel>::new();
510        let mut channel_memberships = Vec::<channel_member::Model>::new();
511        let mut rows = channel_member::Entity::find()
512            .filter(filter)
513            .inner_join(channel::Entity)
514            .select_also(channel::Entity)
515            .stream(tx)
516            .await?;
517        while let Some(row) = rows.next().await {
518            if let (membership, Some(channel)) = row? {
519                if membership.accepted {
520                    channel_memberships.push(membership);
521                    channels.push(channel);
522                } else {
523                    invited_channels.push(Channel::from_model(channel));
524                }
525            }
526        }
527        drop(rows);
528
529        let mut descendants = self
530            .get_channel_descendants_excluding_self(channels.iter(), tx)
531            .await?;
532
533        for channel in channels {
534            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
535                descendants.insert(ix, channel);
536            }
537        }
538
539        let roles_by_channel_id = channel_memberships
540            .iter()
541            .map(|membership| (membership.channel_id, membership.role))
542            .collect::<HashMap<_, _>>();
543
544        let channels: Vec<Channel> = descendants
545            .into_iter()
546            .filter_map(|channel| {
547                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
548                if parent_role.can_see_channel(channel.visibility) {
549                    Some(Channel::from_model(channel))
550                } else {
551                    None
552                }
553            })
554            .collect();
555
556        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
557        enum QueryUserIdsAndChannelIds {
558            ChannelId,
559            UserId,
560        }
561
562        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
563        {
564            let mut rows = room_participant::Entity::find()
565                .inner_join(room::Entity)
566                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
567                .select_only()
568                .column(room::Column::ChannelId)
569                .column(room_participant::Column::UserId)
570                .into_values::<_, QueryUserIdsAndChannelIds>()
571                .stream(tx)
572                .await?;
573            while let Some(row) = rows.next().await {
574                let row: (ChannelId, UserId) = row?;
575                channel_participants.entry(row.0).or_default().push(row.1)
576            }
577        }
578
579        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
580
581        let mut channel_ids_by_buffer_id = HashMap::default();
582        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
583        let mut rows = buffer::Entity::find()
584            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
585            .stream(tx)
586            .await?;
587        while let Some(row) = rows.next().await {
588            let row = row?;
589            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
590            latest_buffer_versions.push(ChannelBufferVersion {
591                channel_id: row.channel_id.0 as u64,
592                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
593                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
594                    .latest_operation_lamport_timestamp
595                    .zip(row.latest_operation_replica_id)
596                {
597                    vec![VectorClockEntry {
598                        timestamp: latest_lamport_timestamp as u32,
599                        replica_id: latest_replica_id as u32,
600                    }]
601                } else {
602                    vec![]
603                },
604            });
605        }
606        drop(rows);
607
608        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
609
610        let observed_buffer_versions = self
611            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
612            .await?;
613
614        let observed_channel_messages = self
615            .observed_channel_messages(&channel_ids, user_id, tx)
616            .await?;
617
618        Ok(ChannelsForUser {
619            channel_memberships,
620            channels,
621            invited_channels,
622            channel_participants,
623            latest_buffer_versions,
624            latest_channel_messages,
625            observed_buffer_versions,
626            observed_channel_messages,
627        })
628    }
629
630    /// Sets the role for the specified channel member.
631    pub async fn set_channel_member_role(
632        &self,
633        channel_id: ChannelId,
634        admin_id: UserId,
635        for_user: UserId,
636        role: ChannelRole,
637    ) -> Result<SetMemberRoleResult> {
638        self.transaction(|tx| async move {
639            let channel = self.get_channel_internal(channel_id, &tx).await?;
640            self.check_user_is_channel_admin(&channel, admin_id, &tx)
641                .await?;
642
643            let membership = channel_member::Entity::find()
644                .filter(
645                    channel_member::Column::ChannelId
646                        .eq(channel_id)
647                        .and(channel_member::Column::UserId.eq(for_user)),
648                )
649                .one(&*tx)
650                .await?;
651
652            let Some(membership) = membership else {
653                Err(anyhow!("no such member"))?
654            };
655
656            let mut update = membership.into_active_model();
657            update.role = ActiveValue::Set(role);
658            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
659
660            if updated.accepted {
661                Ok(SetMemberRoleResult::MembershipUpdated(
662                    self.calculate_membership_updated(&channel, for_user, &tx)
663                        .await?,
664                ))
665            } else {
666                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
667                    channel,
668                )))
669            }
670        })
671        .await
672    }
673
674    /// Returns the details for the specified channel member.
675    pub async fn get_channel_participant_details(
676        &self,
677        channel_id: ChannelId,
678        filter: &str,
679        limit: u64,
680        user_id: UserId,
681    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
682        let members = self
683            .transaction(move |tx| async move {
684                let channel = self.get_channel_internal(channel_id, &tx).await?;
685                self.check_user_is_channel_participant(&channel, user_id, &tx)
686                    .await?;
687                let mut query = channel_member::Entity::find()
688                    .find_also_related(user::Entity)
689                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
690
691                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
692                    query = query.filter(Expr::cust_with_values(
693                        "UPPER(github_login) LIKE ?",
694                        [Self::fuzzy_like_string(&filter.to_uppercase())],
695                    ))
696                } else {
697                    query = query.filter(Expr::cust_with_values(
698                        "github_login ILIKE $1",
699                        [Self::fuzzy_like_string(filter)],
700                    ))
701                }
702                let members = query.order_by(
703                        Expr::cust(
704                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
705                        ),
706                        sea_orm::Order::Asc,
707                    )
708                    .limit(limit)
709                    .all(&*tx)
710                    .await?;
711
712                Ok(members)
713            })
714            .await?;
715
716        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
717
718        let members = members
719            .into_iter()
720            .map(|(member, user)| {
721                if let Some(user) = user {
722                    users.push(proto::User {
723                        id: user.id.to_proto(),
724                        avatar_url: format!(
725                            "https://github.com/{}.png?size=128",
726                            user.github_login
727                        ),
728                        github_login: user.github_login,
729                    })
730                }
731                proto::ChannelMember {
732                    role: member.role.into(),
733                    user_id: member.user_id.to_proto(),
734                    kind: if member.accepted {
735                        Kind::Member
736                    } else {
737                        Kind::Invitee
738                    }
739                    .into(),
740                }
741            })
742            .collect();
743
744        Ok((members, users))
745    }
746
747    /// Returns whether the given user is an admin in the specified channel.
748    pub async fn check_user_is_channel_admin(
749        &self,
750        channel: &channel::Model,
751        user_id: UserId,
752        tx: &DatabaseTransaction,
753    ) -> Result<ChannelRole> {
754        let role = self.channel_role_for_user(channel, user_id, tx).await?;
755        match role {
756            Some(ChannelRole::Admin) => Ok(role.unwrap()),
757            Some(ChannelRole::Member)
758            | Some(ChannelRole::Talker)
759            | Some(ChannelRole::Banned)
760            | Some(ChannelRole::Guest)
761            | None => Err(anyhow!(
762                "user is not a channel admin or channel does not exist"
763            ))?,
764        }
765    }
766
767    /// Returns whether the given user is a member of the specified channel.
768    pub async fn check_user_is_channel_member(
769        &self,
770        channel: &channel::Model,
771        user_id: UserId,
772        tx: &DatabaseTransaction,
773    ) -> Result<ChannelRole> {
774        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
775        match channel_role {
776            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
777            Some(ChannelRole::Banned)
778            | Some(ChannelRole::Guest)
779            | Some(ChannelRole::Talker)
780            | None => Err(anyhow!(
781                "user is not a channel member or channel does not exist"
782            ))?,
783        }
784    }
785
786    /// Returns whether the given user is a participant in the specified channel.
787    pub async fn check_user_is_channel_participant(
788        &self,
789        channel: &channel::Model,
790        user_id: UserId,
791        tx: &DatabaseTransaction,
792    ) -> Result<ChannelRole> {
793        let role = self.channel_role_for_user(channel, user_id, tx).await?;
794        match role {
795            Some(ChannelRole::Admin)
796            | Some(ChannelRole::Member)
797            | Some(ChannelRole::Guest)
798            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
799            Some(ChannelRole::Banned) | None => Err(anyhow!(
800                "user is not a channel participant or channel does not exist"
801            ))?,
802        }
803    }
804
805    /// Returns a user's pending invite for the given channel, if one exists.
806    pub async fn pending_invite_for_channel(
807        &self,
808        channel: &channel::Model,
809        user_id: UserId,
810        tx: &DatabaseTransaction,
811    ) -> Result<Option<channel_member::Model>> {
812        let row = channel_member::Entity::find()
813            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
814            .filter(channel_member::Column::UserId.eq(user_id))
815            .filter(channel_member::Column::Accepted.eq(false))
816            .one(tx)
817            .await?;
818
819        Ok(row)
820    }
821
822    /// Returns the role for a user in the given channel.
823    pub async fn channel_role_for_user(
824        &self,
825        channel: &channel::Model,
826        user_id: UserId,
827        tx: &DatabaseTransaction,
828    ) -> Result<Option<ChannelRole>> {
829        let membership = channel_member::Entity::find()
830            .filter(
831                channel_member::Column::ChannelId
832                    .eq(channel.root_id())
833                    .and(channel_member::Column::UserId.eq(user_id))
834                    .and(channel_member::Column::Accepted.eq(true)),
835            )
836            .one(tx)
837            .await?;
838
839        let Some(membership) = membership else {
840            return Ok(None);
841        };
842
843        if !membership.role.can_see_channel(channel.visibility) {
844            return Ok(None);
845        }
846
847        Ok(Some(membership.role))
848    }
849
850    // Get the descendants of the given set if channels, ordered by their
851    // path.
852    pub(crate) async fn get_channel_descendants_excluding_self(
853        &self,
854        channels: impl IntoIterator<Item = &channel::Model>,
855        tx: &DatabaseTransaction,
856    ) -> Result<Vec<channel::Model>> {
857        let mut filter = Condition::any();
858        for channel in channels.into_iter() {
859            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
860        }
861
862        if filter.is_empty() {
863            return Ok(vec![]);
864        }
865
866        Ok(channel::Entity::find()
867            .filter(filter)
868            .order_by_asc(Expr::cust("parent_path || id || '/'"))
869            .all(tx)
870            .await?)
871    }
872
873    /// Returns the channel with the given ID.
874    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
875        self.transaction(|tx| async move {
876            let channel = self.get_channel_internal(channel_id, &tx).await?;
877            self.check_user_is_channel_participant(&channel, user_id, &tx)
878                .await?;
879
880            Ok(Channel::from_model(channel))
881        })
882        .await
883    }
884
885    pub(crate) async fn get_channel_internal(
886        &self,
887        channel_id: ChannelId,
888        tx: &DatabaseTransaction,
889    ) -> Result<channel::Model> {
890        Ok(channel::Entity::find_by_id(channel_id)
891            .one(tx)
892            .await?
893            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
894    }
895
896    pub(crate) async fn get_or_create_channel_room(
897        &self,
898        channel_id: ChannelId,
899        live_kit_room: &str,
900        tx: &DatabaseTransaction,
901    ) -> Result<RoomId> {
902        let room = room::Entity::find()
903            .filter(room::Column::ChannelId.eq(channel_id))
904            .one(tx)
905            .await?;
906
907        let room_id = if let Some(room) = room {
908            room.id
909        } else {
910            let result = room::Entity::insert(room::ActiveModel {
911                channel_id: ActiveValue::Set(Some(channel_id)),
912                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
913                ..Default::default()
914            })
915            .exec(tx)
916            .await?;
917
918            result.last_insert_id
919        };
920
921        Ok(room_id)
922    }
923
924    /// Move a channel from one parent to another
925    pub async fn move_channel(
926        &self,
927        channel_id: ChannelId,
928        new_parent_id: ChannelId,
929        admin_id: UserId,
930    ) -> Result<(ChannelId, Vec<Channel>)> {
931        self.transaction(|tx| async move {
932            let channel = self.get_channel_internal(channel_id, &tx).await?;
933            self.check_user_is_channel_admin(&channel, admin_id, &tx)
934                .await?;
935            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
936
937            if new_parent.root_id() != channel.root_id() {
938                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
939            }
940
941            if new_parent
942                .ancestors_including_self()
943                .any(|id| id == channel.id)
944            {
945                Err(anyhow!(ErrorCode::CircularNesting))?;
946            }
947
948            if channel.visibility == ChannelVisibility::Public
949                && new_parent.visibility != ChannelVisibility::Public
950            {
951                Err(anyhow!(ErrorCode::BadPublicNesting))?;
952            }
953
954            let root_id = channel.root_id();
955            let old_path = format!("{}{}/", channel.parent_path, channel.id);
956            let new_path = format!("{}{}/", new_parent.path(), channel.id);
957
958            let mut model = channel.into_active_model();
959            model.parent_path = ActiveValue::Set(new_parent.path());
960            let channel = model.update(&*tx).await?;
961
962            let descendent_ids =
963                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
964                    self.pool.get_database_backend(),
965                    "
966                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
967                    WHERE parent_path LIKE $3 || '%'
968                    RETURNING id
969                ",
970                    [old_path.clone().into(), new_path.into(), old_path.into()],
971                ))
972                .all(&*tx)
973                .await?;
974
975            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
976
977            let channels = channel::Entity::find()
978                .filter(channel::Column::Id.is_in(all_moved_ids))
979                .all(&*tx)
980                .await?
981                .into_iter()
982                .map(Channel::from_model)
983                .collect::<Vec<_>>();
984
985            Ok((root_id, channels))
986        })
987        .await
988    }
989}
990
991#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
992enum QueryIds {
993    Id,
994}
995
996#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
997enum QueryUserIds {
998    UserId,
999}