channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    /// Adds a user to the specified channel.
 101    pub async fn join_channel(
 102        &self,
 103        channel_id: ChannelId,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 107        self.transaction(move |tx| async move {
 108            let channel = self.get_channel_internal(channel_id, &tx).await?;
 109            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 110
 111            let mut accept_invite_result = None;
 112
 113            if role.is_none() {
 114                if let Some(invitation) = self
 115                    .pending_invite_for_channel(&channel, user_id, &tx)
 116                    .await?
 117                {
 118                    // note, this may be a parent channel
 119                    role = Some(invitation.role);
 120                    channel_member::Entity::update(channel_member::ActiveModel {
 121                        accepted: ActiveValue::Set(true),
 122                        ..invitation.into_active_model()
 123                    })
 124                    .exec(&*tx)
 125                    .await?;
 126
 127                    accept_invite_result = Some(
 128                        self.calculate_membership_updated(&channel, user_id, &tx)
 129                            .await?,
 130                    );
 131
 132                    debug_assert!(
 133                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 134                    );
 135                } else if channel.visibility == ChannelVisibility::Public {
 136                    role = Some(ChannelRole::Guest);
 137                    channel_member::Entity::insert(channel_member::ActiveModel {
 138                        id: ActiveValue::NotSet,
 139                        channel_id: ActiveValue::Set(channel.root_id()),
 140                        user_id: ActiveValue::Set(user_id),
 141                        accepted: ActiveValue::Set(true),
 142                        role: ActiveValue::Set(ChannelRole::Guest),
 143                    })
 144                    .exec(&*tx)
 145                    .await?;
 146
 147                    accept_invite_result = Some(
 148                        self.calculate_membership_updated(&channel, user_id, &tx)
 149                            .await?,
 150                    );
 151
 152                    debug_assert!(
 153                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 154                    );
 155                }
 156            }
 157
 158            if role.is_none() || role == Some(ChannelRole::Banned) {
 159                Err(ErrorCode::Forbidden.anyhow())?
 160            }
 161            let role = role.unwrap();
 162
 163            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 164            let room_id = self
 165                .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
 166                .await?;
 167
 168            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 169                .await
 170                .map(|jr| (jr, accept_invite_result, role))
 171        })
 172        .await
 173    }
 174
 175    /// Sets the visibility of the given channel.
 176    pub async fn set_channel_visibility(
 177        &self,
 178        channel_id: ChannelId,
 179        visibility: ChannelVisibility,
 180        admin_id: UserId,
 181    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 182        self.transaction(move |tx| async move {
 183            let channel = self.get_channel_internal(channel_id, &tx).await?;
 184            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 185                .await?;
 186
 187            if visibility == ChannelVisibility::Public {
 188                if let Some(parent_id) = channel.parent_id() {
 189                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 190
 191                    if parent.visibility != ChannelVisibility::Public {
 192                        Err(ErrorCode::BadPublicNesting
 193                            .with_tag("direction", "parent")
 194                            .anyhow())?;
 195                    }
 196                }
 197            } else if visibility == ChannelVisibility::Members {
 198                if self
 199                    .get_channel_descendants_excluding_self([&channel], &tx)
 200                    .await?
 201                    .into_iter()
 202                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 203                {
 204                    Err(ErrorCode::BadPublicNesting
 205                        .with_tag("direction", "children")
 206                        .anyhow())?;
 207                }
 208            }
 209
 210            let mut model = channel.into_active_model();
 211            model.visibility = ActiveValue::Set(visibility);
 212            let channel = model.update(&*tx).await?;
 213
 214            let channel_members = channel_member::Entity::find()
 215                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 216                .all(&*tx)
 217                .await?;
 218
 219            Ok((Channel::from_model(channel), channel_members))
 220        })
 221        .await
 222    }
 223
 224    #[cfg(test)]
 225    pub async fn set_channel_requires_zed_cla(
 226        &self,
 227        channel_id: ChannelId,
 228        requires_zed_cla: bool,
 229    ) -> Result<()> {
 230        self.transaction(move |tx| async move {
 231            let channel = self.get_channel_internal(channel_id, &tx).await?;
 232            let mut model = channel.into_active_model();
 233            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 234            model.update(&*tx).await?;
 235            Ok(())
 236        })
 237        .await
 238    }
 239
 240    /// Deletes the channel with the specified ID.
 241    pub async fn delete_channel(
 242        &self,
 243        channel_id: ChannelId,
 244        user_id: UserId,
 245    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 246        self.transaction(move |tx| async move {
 247            let channel = self.get_channel_internal(channel_id, &tx).await?;
 248            self.check_user_is_channel_admin(&channel, user_id, &tx)
 249                .await?;
 250
 251            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 252                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 253                .select_only()
 254                .column(channel_member::Column::UserId)
 255                .distinct()
 256                .into_values::<_, QueryUserIds>()
 257                .all(&*tx)
 258                .await?;
 259
 260            let channels_to_remove = self
 261                .get_channel_descendants_excluding_self([&channel], &tx)
 262                .await?
 263                .into_iter()
 264                .map(|channel| channel.id)
 265                .chain(Some(channel_id))
 266                .collect::<Vec<_>>();
 267
 268            channel::Entity::delete_many()
 269                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 270                .exec(&*tx)
 271                .await?;
 272
 273            Ok((channels_to_remove, members_to_notify))
 274        })
 275        .await
 276    }
 277
 278    /// Invites a user to a channel as a member.
 279    pub async fn invite_channel_member(
 280        &self,
 281        channel_id: ChannelId,
 282        invitee_id: UserId,
 283        inviter_id: UserId,
 284        role: ChannelRole,
 285    ) -> Result<InviteMemberResult> {
 286        self.transaction(move |tx| async move {
 287            let channel = self.get_channel_internal(channel_id, &tx).await?;
 288            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 289                .await?;
 290            if !channel.is_root() {
 291                Err(ErrorCode::NotARootChannel.anyhow())?
 292            }
 293
 294            channel_member::ActiveModel {
 295                id: ActiveValue::NotSet,
 296                channel_id: ActiveValue::Set(channel_id),
 297                user_id: ActiveValue::Set(invitee_id),
 298                accepted: ActiveValue::Set(false),
 299                role: ActiveValue::Set(role),
 300            }
 301            .insert(&*tx)
 302            .await?;
 303
 304            let channel = Channel::from_model(channel);
 305
 306            let notifications = self
 307                .create_notification(
 308                    invitee_id,
 309                    rpc::Notification::ChannelInvitation {
 310                        channel_id: channel_id.to_proto(),
 311                        channel_name: channel.name.clone(),
 312                        inviter_id: inviter_id.to_proto(),
 313                    },
 314                    true,
 315                    &tx,
 316                )
 317                .await?
 318                .into_iter()
 319                .collect();
 320
 321            Ok(InviteMemberResult {
 322                channel,
 323                notifications,
 324            })
 325        })
 326        .await
 327    }
 328
 329    fn sanitize_channel_name(name: &str) -> Result<&str> {
 330        let new_name = name.trim().trim_start_matches('#');
 331        if new_name == "" {
 332            Err(anyhow!("channel name can't be blank"))?;
 333        }
 334        Ok(new_name)
 335    }
 336
 337    /// Renames the specified channel.
 338    pub async fn rename_channel(
 339        &self,
 340        channel_id: ChannelId,
 341        admin_id: UserId,
 342        new_name: &str,
 343    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 344        self.transaction(move |tx| async move {
 345            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 346
 347            let channel = self.get_channel_internal(channel_id, &tx).await?;
 348            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 349                .await?;
 350
 351            let mut model = channel.into_active_model();
 352            model.name = ActiveValue::Set(new_name.clone());
 353            let channel = model.update(&*tx).await?;
 354
 355            let channel_members = channel_member::Entity::find()
 356                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 357                .all(&*tx)
 358                .await?;
 359
 360            Ok((Channel::from_model(channel), channel_members))
 361        })
 362        .await
 363    }
 364
 365    /// accept or decline an invite to join a channel
 366    pub async fn respond_to_channel_invite(
 367        &self,
 368        channel_id: ChannelId,
 369        user_id: UserId,
 370        accept: bool,
 371    ) -> Result<RespondToChannelInvite> {
 372        self.transaction(move |tx| async move {
 373            let channel = self.get_channel_internal(channel_id, &tx).await?;
 374
 375            let membership_update = if accept {
 376                let rows_affected = channel_member::Entity::update_many()
 377                    .set(channel_member::ActiveModel {
 378                        accepted: ActiveValue::Set(accept),
 379                        ..Default::default()
 380                    })
 381                    .filter(
 382                        channel_member::Column::ChannelId
 383                            .eq(channel_id)
 384                            .and(channel_member::Column::UserId.eq(user_id))
 385                            .and(channel_member::Column::Accepted.eq(false)),
 386                    )
 387                    .exec(&*tx)
 388                    .await?
 389                    .rows_affected;
 390
 391                if rows_affected == 0 {
 392                    Err(anyhow!("no such invitation"))?;
 393                }
 394
 395                Some(
 396                    self.calculate_membership_updated(&channel, user_id, &tx)
 397                        .await?,
 398                )
 399            } else {
 400                let rows_affected = channel_member::Entity::delete_many()
 401                    .filter(
 402                        channel_member::Column::ChannelId
 403                            .eq(channel_id)
 404                            .and(channel_member::Column::UserId.eq(user_id))
 405                            .and(channel_member::Column::Accepted.eq(false)),
 406                    )
 407                    .exec(&*tx)
 408                    .await?
 409                    .rows_affected;
 410                if rows_affected == 0 {
 411                    Err(anyhow!("no such invitation"))?;
 412                }
 413
 414                None
 415            };
 416
 417            Ok(RespondToChannelInvite {
 418                membership_update,
 419                notifications: self
 420                    .mark_notification_as_read_with_response(
 421                        user_id,
 422                        &rpc::Notification::ChannelInvitation {
 423                            channel_id: channel_id.to_proto(),
 424                            channel_name: Default::default(),
 425                            inviter_id: Default::default(),
 426                        },
 427                        accept,
 428                        &tx,
 429                    )
 430                    .await?
 431                    .into_iter()
 432                    .collect(),
 433            })
 434        })
 435        .await
 436    }
 437
 438    async fn calculate_membership_updated(
 439        &self,
 440        channel: &channel::Model,
 441        user_id: UserId,
 442        tx: &DatabaseTransaction,
 443    ) -> Result<MembershipUpdated> {
 444        let new_channels = self.get_user_channels(user_id, Some(channel), tx).await?;
 445        let removed_channels = self
 446            .get_channel_descendants_excluding_self([channel], tx)
 447            .await?
 448            .into_iter()
 449            .map(|channel| channel.id)
 450            .chain([channel.id])
 451            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 452            .collect::<Vec<_>>();
 453
 454        Ok(MembershipUpdated {
 455            channel_id: channel.id,
 456            new_channels,
 457            removed_channels,
 458        })
 459    }
 460
 461    /// Removes a channel member.
 462    pub async fn remove_channel_member(
 463        &self,
 464        channel_id: ChannelId,
 465        member_id: UserId,
 466        admin_id: UserId,
 467    ) -> Result<RemoveChannelMemberResult> {
 468        self.transaction(|tx| async move {
 469            let channel = self.get_channel_internal(channel_id, &tx).await?;
 470
 471            if member_id != admin_id {
 472                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 473                    .await?;
 474            }
 475
 476            let result = channel_member::Entity::delete_many()
 477                .filter(
 478                    channel_member::Column::ChannelId
 479                        .eq(channel_id)
 480                        .and(channel_member::Column::UserId.eq(member_id)),
 481                )
 482                .exec(&*tx)
 483                .await?;
 484
 485            if result.rows_affected == 0 {
 486                Err(anyhow!("no such member"))?;
 487            }
 488
 489            Ok(RemoveChannelMemberResult {
 490                membership_update: self
 491                    .calculate_membership_updated(&channel, member_id, &tx)
 492                    .await?,
 493                notification_id: self
 494                    .remove_notification(
 495                        member_id,
 496                        rpc::Notification::ChannelInvitation {
 497                            channel_id: channel_id.to_proto(),
 498                            channel_name: Default::default(),
 499                            inviter_id: Default::default(),
 500                        },
 501                        &tx,
 502                    )
 503                    .await?,
 504            })
 505        })
 506        .await
 507    }
 508
 509    /// Returns all channel invites for the user with the given ID.
 510    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 511        self.transaction(|tx| async move {
 512            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 513
 514            let channel_invites = channel_member::Entity::find()
 515                .filter(
 516                    channel_member::Column::UserId
 517                        .eq(user_id)
 518                        .and(channel_member::Column::Accepted.eq(false)),
 519                )
 520                .all(&*tx)
 521                .await?;
 522
 523            for invite in channel_invites {
 524                role_for_channel.insert(invite.channel_id, invite.role);
 525            }
 526
 527            let channels = channel::Entity::find()
 528                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 529                .all(&*tx)
 530                .await?;
 531
 532            let channels = channels.into_iter().map(Channel::from_model).collect();
 533
 534            Ok(channels)
 535        })
 536        .await
 537    }
 538
 539    /// Returns all channels for the user with the given ID.
 540    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 541        self.transaction(|tx| async move {
 542            let tx = tx;
 543
 544            self.get_user_channels(user_id, None, &tx).await
 545        })
 546        .await
 547    }
 548
 549    /// Returns all channels for the user with the given ID that are descendants
 550    /// of the specified ancestor channel.
 551    pub async fn get_user_channels(
 552        &self,
 553        user_id: UserId,
 554        ancestor_channel: Option<&channel::Model>,
 555        tx: &DatabaseTransaction,
 556    ) -> Result<ChannelsForUser> {
 557        let mut filter = channel_member::Column::UserId
 558            .eq(user_id)
 559            .and(channel_member::Column::Accepted.eq(true));
 560
 561        if let Some(ancestor) = ancestor_channel {
 562            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 563        }
 564
 565        let channel_memberships = channel_member::Entity::find()
 566            .filter(filter)
 567            .all(tx)
 568            .await?;
 569
 570        let channels = channel::Entity::find()
 571            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 572            .all(tx)
 573            .await?;
 574
 575        let mut descendants = self
 576            .get_channel_descendants_excluding_self(channels.iter(), tx)
 577            .await?;
 578
 579        for channel in channels {
 580            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 581                descendants.insert(ix, channel);
 582            }
 583        }
 584
 585        let roles_by_channel_id = channel_memberships
 586            .iter()
 587            .map(|membership| (membership.channel_id, membership.role))
 588            .collect::<HashMap<_, _>>();
 589
 590        let channels: Vec<Channel> = descendants
 591            .into_iter()
 592            .filter_map(|channel| {
 593                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 594                if parent_role.can_see_channel(channel.visibility) {
 595                    Some(Channel::from_model(channel))
 596                } else {
 597                    None
 598                }
 599            })
 600            .collect();
 601
 602        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 603        enum QueryUserIdsAndChannelIds {
 604            ChannelId,
 605            UserId,
 606        }
 607
 608        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 609        {
 610            let mut rows = room_participant::Entity::find()
 611                .inner_join(room::Entity)
 612                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 613                .select_only()
 614                .column(room::Column::ChannelId)
 615                .column(room_participant::Column::UserId)
 616                .into_values::<_, QueryUserIdsAndChannelIds>()
 617                .stream(tx)
 618                .await?;
 619            while let Some(row) = rows.next().await {
 620                let row: (ChannelId, UserId) = row?;
 621                channel_participants.entry(row.0).or_default().push(row.1)
 622            }
 623        }
 624
 625        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 626
 627        let mut channel_ids_by_buffer_id = HashMap::default();
 628        let mut rows = buffer::Entity::find()
 629            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 630            .stream(tx)
 631            .await?;
 632        while let Some(row) = rows.next().await {
 633            let row = row?;
 634            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 635        }
 636        drop(rows);
 637
 638        let latest_buffer_versions = self
 639            .latest_channel_buffer_changes(&channel_ids_by_buffer_id, tx)
 640            .await?;
 641
 642        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 643
 644        let observed_buffer_versions = self
 645            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 646            .await?;
 647
 648        let observed_channel_messages = self
 649            .observed_channel_messages(&channel_ids, user_id, tx)
 650            .await?;
 651
 652        let hosted_projects = self
 653            .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
 654            .await?;
 655
 656        Ok(ChannelsForUser {
 657            channel_memberships,
 658            channels,
 659            hosted_projects,
 660            channel_participants,
 661            latest_buffer_versions,
 662            latest_channel_messages,
 663            observed_buffer_versions,
 664            observed_channel_messages,
 665        })
 666    }
 667
 668    /// Sets the role for the specified channel member.
 669    pub async fn set_channel_member_role(
 670        &self,
 671        channel_id: ChannelId,
 672        admin_id: UserId,
 673        for_user: UserId,
 674        role: ChannelRole,
 675    ) -> Result<SetMemberRoleResult> {
 676        self.transaction(|tx| async move {
 677            let channel = self.get_channel_internal(channel_id, &tx).await?;
 678            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 679                .await?;
 680
 681            let membership = channel_member::Entity::find()
 682                .filter(
 683                    channel_member::Column::ChannelId
 684                        .eq(channel_id)
 685                        .and(channel_member::Column::UserId.eq(for_user)),
 686                )
 687                .one(&*tx)
 688                .await?;
 689
 690            let Some(membership) = membership else {
 691                Err(anyhow!("no such member"))?
 692            };
 693
 694            let mut update = membership.into_active_model();
 695            update.role = ActiveValue::Set(role);
 696            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 697
 698            if updated.accepted {
 699                Ok(SetMemberRoleResult::MembershipUpdated(
 700                    self.calculate_membership_updated(&channel, for_user, &tx)
 701                        .await?,
 702                ))
 703            } else {
 704                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 705                    channel,
 706                )))
 707            }
 708        })
 709        .await
 710    }
 711
 712    /// Returns the details for the specified channel member.
 713    pub async fn get_channel_participant_details(
 714        &self,
 715        channel_id: ChannelId,
 716        user_id: UserId,
 717    ) -> Result<Vec<proto::ChannelMember>> {
 718        let (role, members) = self
 719            .transaction(move |tx| async move {
 720                let channel = self.get_channel_internal(channel_id, &tx).await?;
 721                let role = self
 722                    .check_user_is_channel_participant(&channel, user_id, &tx)
 723                    .await?;
 724                Ok((
 725                    role,
 726                    self.get_channel_participant_details_internal(&channel, &tx)
 727                        .await?,
 728                ))
 729            })
 730            .await?;
 731
 732        if role == ChannelRole::Admin {
 733            Ok(members
 734                .into_iter()
 735                .map(|channel_member| proto::ChannelMember {
 736                    role: channel_member.role.into(),
 737                    user_id: channel_member.user_id.to_proto(),
 738                    kind: if channel_member.accepted {
 739                        Kind::Member
 740                    } else {
 741                        Kind::Invitee
 742                    }
 743                    .into(),
 744                })
 745                .collect())
 746        } else {
 747            return Ok(members
 748                .into_iter()
 749                .filter_map(|member| {
 750                    if !member.accepted {
 751                        return None;
 752                    }
 753                    Some(proto::ChannelMember {
 754                        role: member.role.into(),
 755                        user_id: member.user_id.to_proto(),
 756                        kind: Kind::Member.into(),
 757                    })
 758                })
 759                .collect());
 760        }
 761    }
 762
 763    async fn get_channel_participant_details_internal(
 764        &self,
 765        channel: &channel::Model,
 766        tx: &DatabaseTransaction,
 767    ) -> Result<Vec<channel_member::Model>> {
 768        Ok(channel_member::Entity::find()
 769            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 770            .all(tx)
 771            .await?)
 772    }
 773
 774    /// Returns the participants in the given channel.
 775    pub async fn get_channel_participants(
 776        &self,
 777        channel: &channel::Model,
 778        tx: &DatabaseTransaction,
 779    ) -> Result<Vec<UserId>> {
 780        let participants = self
 781            .get_channel_participant_details_internal(channel, tx)
 782            .await?;
 783        Ok(participants
 784            .into_iter()
 785            .map(|member| member.user_id)
 786            .collect())
 787    }
 788
 789    /// Returns whether the given user is an admin in the specified channel.
 790    pub async fn check_user_is_channel_admin(
 791        &self,
 792        channel: &channel::Model,
 793        user_id: UserId,
 794        tx: &DatabaseTransaction,
 795    ) -> Result<ChannelRole> {
 796        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 797        match role {
 798            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 799            Some(ChannelRole::Member)
 800            | Some(ChannelRole::Talker)
 801            | Some(ChannelRole::Banned)
 802            | Some(ChannelRole::Guest)
 803            | None => Err(anyhow!(
 804                "user is not a channel admin or channel does not exist"
 805            ))?,
 806        }
 807    }
 808
 809    /// Returns whether the given user is a member of the specified channel.
 810    pub async fn check_user_is_channel_member(
 811        &self,
 812        channel: &channel::Model,
 813        user_id: UserId,
 814        tx: &DatabaseTransaction,
 815    ) -> Result<ChannelRole> {
 816        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 817        match channel_role {
 818            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 819            Some(ChannelRole::Banned)
 820            | Some(ChannelRole::Guest)
 821            | Some(ChannelRole::Talker)
 822            | None => Err(anyhow!(
 823                "user is not a channel member or channel does not exist"
 824            ))?,
 825        }
 826    }
 827
 828    /// Returns whether the given user is a participant in the specified channel.
 829    pub async fn check_user_is_channel_participant(
 830        &self,
 831        channel: &channel::Model,
 832        user_id: UserId,
 833        tx: &DatabaseTransaction,
 834    ) -> Result<ChannelRole> {
 835        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 836        match role {
 837            Some(ChannelRole::Admin)
 838            | Some(ChannelRole::Member)
 839            | Some(ChannelRole::Guest)
 840            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 841            Some(ChannelRole::Banned) | None => Err(anyhow!(
 842                "user is not a channel participant or channel does not exist"
 843            ))?,
 844        }
 845    }
 846
 847    /// Returns a user's pending invite for the given channel, if one exists.
 848    pub async fn pending_invite_for_channel(
 849        &self,
 850        channel: &channel::Model,
 851        user_id: UserId,
 852        tx: &DatabaseTransaction,
 853    ) -> Result<Option<channel_member::Model>> {
 854        let row = channel_member::Entity::find()
 855            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 856            .filter(channel_member::Column::UserId.eq(user_id))
 857            .filter(channel_member::Column::Accepted.eq(false))
 858            .one(tx)
 859            .await?;
 860
 861        Ok(row)
 862    }
 863
 864    /// Returns the role for a user in the given channel.
 865    pub async fn channel_role_for_user(
 866        &self,
 867        channel: &channel::Model,
 868        user_id: UserId,
 869        tx: &DatabaseTransaction,
 870    ) -> Result<Option<ChannelRole>> {
 871        let membership = channel_member::Entity::find()
 872            .filter(
 873                channel_member::Column::ChannelId
 874                    .eq(channel.root_id())
 875                    .and(channel_member::Column::UserId.eq(user_id))
 876                    .and(channel_member::Column::Accepted.eq(true)),
 877            )
 878            .one(tx)
 879            .await?;
 880
 881        let Some(membership) = membership else {
 882            return Ok(None);
 883        };
 884
 885        if !membership.role.can_see_channel(channel.visibility) {
 886            return Ok(None);
 887        }
 888
 889        Ok(Some(membership.role))
 890    }
 891
 892    // Get the descendants of the given set if channels, ordered by their
 893    // path.
 894    pub(crate) async fn get_channel_descendants_excluding_self(
 895        &self,
 896        channels: impl IntoIterator<Item = &channel::Model>,
 897        tx: &DatabaseTransaction,
 898    ) -> Result<Vec<channel::Model>> {
 899        let mut filter = Condition::any();
 900        for channel in channels.into_iter() {
 901            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 902        }
 903
 904        if filter.is_empty() {
 905            return Ok(vec![]);
 906        }
 907
 908        Ok(channel::Entity::find()
 909            .filter(filter)
 910            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 911            .all(tx)
 912            .await?)
 913    }
 914
 915    /// Returns the channel with the given ID.
 916    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 917        self.transaction(|tx| async move {
 918            let channel = self.get_channel_internal(channel_id, &tx).await?;
 919            self.check_user_is_channel_participant(&channel, user_id, &tx)
 920                .await?;
 921
 922            Ok(Channel::from_model(channel))
 923        })
 924        .await
 925    }
 926
 927    pub(crate) async fn get_channel_internal(
 928        &self,
 929        channel_id: ChannelId,
 930        tx: &DatabaseTransaction,
 931    ) -> Result<channel::Model> {
 932        Ok(channel::Entity::find_by_id(channel_id)
 933            .one(tx)
 934            .await?
 935            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 936    }
 937
 938    pub(crate) async fn get_or_create_channel_room(
 939        &self,
 940        channel_id: ChannelId,
 941        live_kit_room: &str,
 942        tx: &DatabaseTransaction,
 943    ) -> Result<RoomId> {
 944        let room = room::Entity::find()
 945            .filter(room::Column::ChannelId.eq(channel_id))
 946            .one(tx)
 947            .await?;
 948
 949        let room_id = if let Some(room) = room {
 950            room.id
 951        } else {
 952            let result = room::Entity::insert(room::ActiveModel {
 953                channel_id: ActiveValue::Set(Some(channel_id)),
 954                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 955                ..Default::default()
 956            })
 957            .exec(tx)
 958            .await?;
 959
 960            result.last_insert_id
 961        };
 962
 963        Ok(room_id)
 964    }
 965
 966    /// Move a channel from one parent to another
 967    pub async fn move_channel(
 968        &self,
 969        channel_id: ChannelId,
 970        new_parent_id: ChannelId,
 971        admin_id: UserId,
 972    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 973        self.transaction(|tx| async move {
 974            let channel = self.get_channel_internal(channel_id, &tx).await?;
 975            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 976                .await?;
 977            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 978
 979            if new_parent.root_id() != channel.root_id() {
 980                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 981            }
 982
 983            if new_parent
 984                .ancestors_including_self()
 985                .any(|id| id == channel.id)
 986            {
 987                Err(anyhow!(ErrorCode::CircularNesting))?;
 988            }
 989
 990            if channel.visibility == ChannelVisibility::Public
 991                && new_parent.visibility != ChannelVisibility::Public
 992            {
 993                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 994            }
 995
 996            let root_id = channel.root_id();
 997            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 998            let new_path = format!("{}{}/", new_parent.path(), channel.id);
 999
1000            let mut model = channel.into_active_model();
1001            model.parent_path = ActiveValue::Set(new_parent.path());
1002            let channel = model.update(&*tx).await?;
1003
1004            let descendent_ids =
1005                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1006                    self.pool.get_database_backend(),
1007                    "
1008                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1009                    WHERE parent_path LIKE $3 || '%'
1010                    RETURNING id
1011                ",
1012                    [old_path.clone().into(), new_path.into(), old_path.into()],
1013                ))
1014                .all(&*tx)
1015                .await?;
1016
1017            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1018
1019            let channels = channel::Entity::find()
1020                .filter(channel::Column::Id.is_in(all_moved_ids))
1021                .all(&*tx)
1022                .await?
1023                .into_iter()
1024                .map(|c| Channel::from_model(c))
1025                .collect::<Vec<_>>();
1026
1027            let channel_members = channel_member::Entity::find()
1028                .filter(channel_member::Column::ChannelId.eq(root_id))
1029                .all(&*tx)
1030                .await?;
1031
1032            Ok((channels, channel_members))
1033        })
1034        .await
1035    }
1036}
1037
1038#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1039enum QueryIds {
1040    Id,
1041}
1042
1043#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1044enum QueryUserIds {
1045    UserId,
1046}