channels.rs

   1use super::*;
   2use rpc::{
   3    proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
   4    ErrorCode, ErrorCodeExt,
   5};
   6use sea_orm::TryGetableMany;
   7
   8impl Database {
   9    #[cfg(test)]
  10    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  11        self.transaction(move |tx| async move {
  12            let mut channels = Vec::new();
  13            let mut rows = channel::Entity::find().stream(&*tx).await?;
  14            while let Some(row) = rows.next().await {
  15                let row = row?;
  16                channels.push((row.id, row.name));
  17            }
  18            Ok(channels)
  19        })
  20        .await
  21    }
  22
  23    #[cfg(test)]
  24    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  25        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  26    }
  27
  28    #[cfg(test)]
  29    pub async fn create_sub_channel(
  30        &self,
  31        name: &str,
  32        parent: ChannelId,
  33        creator_id: UserId,
  34    ) -> Result<ChannelId> {
  35        Ok(self
  36            .create_channel(name, Some(parent), creator_id)
  37            .await?
  38            .0
  39            .id)
  40    }
  41
  42    /// Creates a new channel.
  43    pub async fn create_channel(
  44        &self,
  45        name: &str,
  46        parent_channel_id: Option<ChannelId>,
  47        admin_id: UserId,
  48    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  49        let name = Self::sanitize_channel_name(name)?;
  50        self.transaction(move |tx| async move {
  51            let mut parent = None;
  52            let mut membership = None;
  53
  54            if let Some(parent_channel_id) = parent_channel_id {
  55                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  56                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  57                    .await?;
  58                parent = Some(parent_channel);
  59            }
  60
  61            let channel = channel::ActiveModel {
  62                id: ActiveValue::NotSet,
  63                name: ActiveValue::Set(name.to_string()),
  64                visibility: ActiveValue::Set(ChannelVisibility::Members),
  65                parent_path: ActiveValue::Set(
  66                    parent
  67                        .as_ref()
  68                        .map_or(String::new(), |parent| parent.path()),
  69                ),
  70                requires_zed_cla: ActiveValue::NotSet,
  71            }
  72            .insert(&*tx)
  73            .await?;
  74
  75            if parent.is_none() {
  76                membership = Some(
  77                    channel_member::ActiveModel {
  78                        id: ActiveValue::NotSet,
  79                        channel_id: ActiveValue::Set(channel.id),
  80                        user_id: ActiveValue::Set(admin_id),
  81                        accepted: ActiveValue::Set(true),
  82                        role: ActiveValue::Set(ChannelRole::Admin),
  83                    }
  84                    .insert(&*tx)
  85                    .await?,
  86                );
  87            }
  88
  89            Ok((channel, membership))
  90        })
  91        .await
  92    }
  93
  94    /// Adds a user to the specified channel.
  95    pub async fn join_channel(
  96        &self,
  97        channel_id: ChannelId,
  98        user_id: UserId,
  99        connection: ConnectionId,
 100    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 101        self.transaction(move |tx| async move {
 102            let channel = self.get_channel_internal(channel_id, &tx).await?;
 103            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 104
 105            let mut accept_invite_result = None;
 106
 107            if role.is_none() {
 108                if let Some(invitation) = self
 109                    .pending_invite_for_channel(&channel, user_id, &tx)
 110                    .await?
 111                {
 112                    // note, this may be a parent channel
 113                    role = Some(invitation.role);
 114                    channel_member::Entity::update(channel_member::ActiveModel {
 115                        accepted: ActiveValue::Set(true),
 116                        ..invitation.into_active_model()
 117                    })
 118                    .exec(&*tx)
 119                    .await?;
 120
 121                    accept_invite_result = Some(
 122                        self.calculate_membership_updated(&channel, user_id, &tx)
 123                            .await?,
 124                    );
 125
 126                    debug_assert!(
 127                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 128                    );
 129                } else if channel.visibility == ChannelVisibility::Public {
 130                    role = Some(ChannelRole::Guest);
 131                    channel_member::Entity::insert(channel_member::ActiveModel {
 132                        id: ActiveValue::NotSet,
 133                        channel_id: ActiveValue::Set(channel.root_id()),
 134                        user_id: ActiveValue::Set(user_id),
 135                        accepted: ActiveValue::Set(true),
 136                        role: ActiveValue::Set(ChannelRole::Guest),
 137                    })
 138                    .exec(&*tx)
 139                    .await?;
 140
 141                    accept_invite_result = Some(
 142                        self.calculate_membership_updated(&channel, user_id, &tx)
 143                            .await?,
 144                    );
 145
 146                    debug_assert!(
 147                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 148                    );
 149                }
 150            }
 151
 152            if role.is_none() || role == Some(ChannelRole::Banned) {
 153                Err(ErrorCode::Forbidden.anyhow())?
 154            }
 155            let role = role.unwrap();
 156
 157            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 158            let room_id = self
 159                .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
 160                .await?;
 161
 162            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 163                .await
 164                .map(|jr| (jr, accept_invite_result, role))
 165        })
 166        .await
 167    }
 168
 169    /// Sets the visibility of the given channel.
 170    pub async fn set_channel_visibility(
 171        &self,
 172        channel_id: ChannelId,
 173        visibility: ChannelVisibility,
 174        admin_id: UserId,
 175    ) -> Result<channel::Model> {
 176        self.transaction(move |tx| async move {
 177            let channel = self.get_channel_internal(channel_id, &tx).await?;
 178            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 179                .await?;
 180
 181            if visibility == ChannelVisibility::Public {
 182                if let Some(parent_id) = channel.parent_id() {
 183                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 184
 185                    if parent.visibility != ChannelVisibility::Public {
 186                        Err(ErrorCode::BadPublicNesting
 187                            .with_tag("direction", "parent")
 188                            .anyhow())?;
 189                    }
 190                }
 191            } else if visibility == ChannelVisibility::Members {
 192                if self
 193                    .get_channel_descendants_excluding_self([&channel], &tx)
 194                    .await?
 195                    .into_iter()
 196                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 197                {
 198                    Err(ErrorCode::BadPublicNesting
 199                        .with_tag("direction", "children")
 200                        .anyhow())?;
 201                }
 202            }
 203
 204            let mut model = channel.into_active_model();
 205            model.visibility = ActiveValue::Set(visibility);
 206            let channel = model.update(&*tx).await?;
 207
 208            Ok(channel)
 209        })
 210        .await
 211    }
 212
 213    #[cfg(test)]
 214    pub async fn set_channel_requires_zed_cla(
 215        &self,
 216        channel_id: ChannelId,
 217        requires_zed_cla: bool,
 218    ) -> Result<()> {
 219        self.transaction(move |tx| async move {
 220            let channel = self.get_channel_internal(channel_id, &tx).await?;
 221            let mut model = channel.into_active_model();
 222            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 223            model.update(&*tx).await?;
 224            Ok(())
 225        })
 226        .await
 227    }
 228
 229    /// Deletes the channel with the specified ID.
 230    pub async fn delete_channel(
 231        &self,
 232        channel_id: ChannelId,
 233        user_id: UserId,
 234    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 235        self.transaction(move |tx| async move {
 236            let channel = self.get_channel_internal(channel_id, &tx).await?;
 237            self.check_user_is_channel_admin(&channel, user_id, &tx)
 238                .await?;
 239
 240            let channels_to_remove = self
 241                .get_channel_descendants_excluding_self([&channel], &tx)
 242                .await?
 243                .into_iter()
 244                .map(|channel| channel.id)
 245                .chain(Some(channel_id))
 246                .collect::<Vec<_>>();
 247
 248            channel::Entity::delete_many()
 249                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 250                .exec(&*tx)
 251                .await?;
 252
 253            Ok((channel.root_id(), channels_to_remove))
 254        })
 255        .await
 256    }
 257
 258    /// Invites a user to a channel as a member.
 259    pub async fn invite_channel_member(
 260        &self,
 261        channel_id: ChannelId,
 262        invitee_id: UserId,
 263        inviter_id: UserId,
 264        role: ChannelRole,
 265    ) -> Result<InviteMemberResult> {
 266        self.transaction(move |tx| async move {
 267            let channel = self.get_channel_internal(channel_id, &tx).await?;
 268            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 269                .await?;
 270            if !channel.is_root() {
 271                Err(ErrorCode::NotARootChannel.anyhow())?
 272            }
 273
 274            channel_member::ActiveModel {
 275                id: ActiveValue::NotSet,
 276                channel_id: ActiveValue::Set(channel_id),
 277                user_id: ActiveValue::Set(invitee_id),
 278                accepted: ActiveValue::Set(false),
 279                role: ActiveValue::Set(role),
 280            }
 281            .insert(&*tx)
 282            .await?;
 283
 284            let channel = Channel::from_model(channel);
 285
 286            let notifications = self
 287                .create_notification(
 288                    invitee_id,
 289                    rpc::Notification::ChannelInvitation {
 290                        channel_id: channel_id.to_proto(),
 291                        channel_name: channel.name.clone(),
 292                        inviter_id: inviter_id.to_proto(),
 293                    },
 294                    true,
 295                    &tx,
 296                )
 297                .await?
 298                .into_iter()
 299                .collect();
 300
 301            Ok(InviteMemberResult {
 302                channel,
 303                notifications,
 304            })
 305        })
 306        .await
 307    }
 308
 309    fn sanitize_channel_name(name: &str) -> Result<&str> {
 310        let new_name = name.trim().trim_start_matches('#');
 311        if new_name == "" {
 312            Err(anyhow!("channel name can't be blank"))?;
 313        }
 314        Ok(new_name)
 315    }
 316
 317    /// Renames the specified channel.
 318    pub async fn rename_channel(
 319        &self,
 320        channel_id: ChannelId,
 321        admin_id: UserId,
 322        new_name: &str,
 323    ) -> Result<channel::Model> {
 324        self.transaction(move |tx| async move {
 325            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 326
 327            let channel = self.get_channel_internal(channel_id, &tx).await?;
 328            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 329                .await?;
 330
 331            let mut model = channel.into_active_model();
 332            model.name = ActiveValue::Set(new_name.clone());
 333            let channel = model.update(&*tx).await?;
 334
 335            Ok(channel)
 336        })
 337        .await
 338    }
 339
 340    /// accept or decline an invite to join a channel
 341    pub async fn respond_to_channel_invite(
 342        &self,
 343        channel_id: ChannelId,
 344        user_id: UserId,
 345        accept: bool,
 346    ) -> Result<RespondToChannelInvite> {
 347        self.transaction(move |tx| async move {
 348            let channel = self.get_channel_internal(channel_id, &tx).await?;
 349
 350            let membership_update = if accept {
 351                let rows_affected = channel_member::Entity::update_many()
 352                    .set(channel_member::ActiveModel {
 353                        accepted: ActiveValue::Set(accept),
 354                        ..Default::default()
 355                    })
 356                    .filter(
 357                        channel_member::Column::ChannelId
 358                            .eq(channel_id)
 359                            .and(channel_member::Column::UserId.eq(user_id))
 360                            .and(channel_member::Column::Accepted.eq(false)),
 361                    )
 362                    .exec(&*tx)
 363                    .await?
 364                    .rows_affected;
 365
 366                if rows_affected == 0 {
 367                    Err(anyhow!("no such invitation"))?;
 368                }
 369
 370                Some(
 371                    self.calculate_membership_updated(&channel, user_id, &tx)
 372                        .await?,
 373                )
 374            } else {
 375                let rows_affected = channel_member::Entity::delete_many()
 376                    .filter(
 377                        channel_member::Column::ChannelId
 378                            .eq(channel_id)
 379                            .and(channel_member::Column::UserId.eq(user_id))
 380                            .and(channel_member::Column::Accepted.eq(false)),
 381                    )
 382                    .exec(&*tx)
 383                    .await?
 384                    .rows_affected;
 385                if rows_affected == 0 {
 386                    Err(anyhow!("no such invitation"))?;
 387                }
 388
 389                None
 390            };
 391
 392            Ok(RespondToChannelInvite {
 393                membership_update,
 394                notifications: self
 395                    .mark_notification_as_read_with_response(
 396                        user_id,
 397                        &rpc::Notification::ChannelInvitation {
 398                            channel_id: channel_id.to_proto(),
 399                            channel_name: Default::default(),
 400                            inviter_id: Default::default(),
 401                        },
 402                        accept,
 403                        &tx,
 404                    )
 405                    .await?
 406                    .into_iter()
 407                    .collect(),
 408            })
 409        })
 410        .await
 411    }
 412
 413    async fn calculate_membership_updated(
 414        &self,
 415        channel: &channel::Model,
 416        user_id: UserId,
 417        tx: &DatabaseTransaction,
 418    ) -> Result<MembershipUpdated> {
 419        let new_channels = self.get_user_channels(user_id, Some(channel), tx).await?;
 420        let removed_channels = self
 421            .get_channel_descendants_excluding_self([channel], tx)
 422            .await?
 423            .into_iter()
 424            .map(|channel| channel.id)
 425            .chain([channel.id])
 426            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 427            .collect::<Vec<_>>();
 428
 429        Ok(MembershipUpdated {
 430            channel_id: channel.id,
 431            new_channels,
 432            removed_channels,
 433        })
 434    }
 435
 436    /// Removes a channel member.
 437    pub async fn remove_channel_member(
 438        &self,
 439        channel_id: ChannelId,
 440        member_id: UserId,
 441        admin_id: UserId,
 442    ) -> Result<RemoveChannelMemberResult> {
 443        self.transaction(|tx| async move {
 444            let channel = self.get_channel_internal(channel_id, &tx).await?;
 445
 446            if member_id != admin_id {
 447                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 448                    .await?;
 449            }
 450
 451            let result = channel_member::Entity::delete_many()
 452                .filter(
 453                    channel_member::Column::ChannelId
 454                        .eq(channel_id)
 455                        .and(channel_member::Column::UserId.eq(member_id)),
 456                )
 457                .exec(&*tx)
 458                .await?;
 459
 460            if result.rows_affected == 0 {
 461                Err(anyhow!("no such member"))?;
 462            }
 463
 464            Ok(RemoveChannelMemberResult {
 465                membership_update: self
 466                    .calculate_membership_updated(&channel, member_id, &tx)
 467                    .await?,
 468                notification_id: self
 469                    .remove_notification(
 470                        member_id,
 471                        rpc::Notification::ChannelInvitation {
 472                            channel_id: channel_id.to_proto(),
 473                            channel_name: Default::default(),
 474                            inviter_id: Default::default(),
 475                        },
 476                        &tx,
 477                    )
 478                    .await?,
 479            })
 480        })
 481        .await
 482    }
 483
 484    /// Returns all channel invites for the user with the given ID.
 485    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 486        self.transaction(|tx| async move {
 487            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 488
 489            let channel_invites = channel_member::Entity::find()
 490                .filter(
 491                    channel_member::Column::UserId
 492                        .eq(user_id)
 493                        .and(channel_member::Column::Accepted.eq(false)),
 494                )
 495                .all(&*tx)
 496                .await?;
 497
 498            for invite in channel_invites {
 499                role_for_channel.insert(invite.channel_id, invite.role);
 500            }
 501
 502            let channels = channel::Entity::find()
 503                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 504                .all(&*tx)
 505                .await?;
 506
 507            let channels = channels.into_iter().map(Channel::from_model).collect();
 508
 509            Ok(channels)
 510        })
 511        .await
 512    }
 513
 514    /// Returns all channels for the user with the given ID.
 515    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 516        self.transaction(|tx| async move {
 517            let tx = tx;
 518
 519            self.get_user_channels(user_id, None, &tx).await
 520        })
 521        .await
 522    }
 523
 524    /// Returns all channels for the user with the given ID that are descendants
 525    /// of the specified ancestor channel.
 526    pub async fn get_user_channels(
 527        &self,
 528        user_id: UserId,
 529        ancestor_channel: Option<&channel::Model>,
 530        tx: &DatabaseTransaction,
 531    ) -> Result<ChannelsForUser> {
 532        let mut filter = channel_member::Column::UserId
 533            .eq(user_id)
 534            .and(channel_member::Column::Accepted.eq(true));
 535
 536        if let Some(ancestor) = ancestor_channel {
 537            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 538        }
 539
 540        let channel_memberships = channel_member::Entity::find()
 541            .filter(filter)
 542            .all(tx)
 543            .await?;
 544
 545        let channels = channel::Entity::find()
 546            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 547            .all(tx)
 548            .await?;
 549
 550        let mut descendants = self
 551            .get_channel_descendants_excluding_self(channels.iter(), tx)
 552            .await?;
 553
 554        for channel in channels {
 555            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 556                descendants.insert(ix, channel);
 557            }
 558        }
 559
 560        let roles_by_channel_id = channel_memberships
 561            .iter()
 562            .map(|membership| (membership.channel_id, membership.role))
 563            .collect::<HashMap<_, _>>();
 564
 565        let channels: Vec<Channel> = descendants
 566            .into_iter()
 567            .filter_map(|channel| {
 568                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 569                if parent_role.can_see_channel(channel.visibility) {
 570                    Some(Channel::from_model(channel))
 571                } else {
 572                    None
 573                }
 574            })
 575            .collect();
 576
 577        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 578        enum QueryUserIdsAndChannelIds {
 579            ChannelId,
 580            UserId,
 581        }
 582
 583        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 584        {
 585            let mut rows = room_participant::Entity::find()
 586                .inner_join(room::Entity)
 587                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 588                .select_only()
 589                .column(room::Column::ChannelId)
 590                .column(room_participant::Column::UserId)
 591                .into_values::<_, QueryUserIdsAndChannelIds>()
 592                .stream(tx)
 593                .await?;
 594            while let Some(row) = rows.next().await {
 595                let row: (ChannelId, UserId) = row?;
 596                channel_participants.entry(row.0).or_default().push(row.1)
 597            }
 598        }
 599
 600        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 601
 602        let mut channel_ids_by_buffer_id = HashMap::default();
 603        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 604        let mut rows = buffer::Entity::find()
 605            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 606            .stream(tx)
 607            .await?;
 608        while let Some(row) = rows.next().await {
 609            let row = row?;
 610            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 611            latest_buffer_versions.push(ChannelBufferVersion {
 612                channel_id: row.channel_id.0 as u64,
 613                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 614                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 615                    .latest_operation_lamport_timestamp
 616                    .zip(row.latest_operation_replica_id)
 617                {
 618                    vec![VectorClockEntry {
 619                        timestamp: latest_lamport_timestamp as u32,
 620                        replica_id: latest_replica_id as u32,
 621                    }]
 622                } else {
 623                    vec![]
 624                },
 625            });
 626        }
 627        drop(rows);
 628
 629        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 630
 631        let observed_buffer_versions = self
 632            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 633            .await?;
 634
 635        let observed_channel_messages = self
 636            .observed_channel_messages(&channel_ids, user_id, tx)
 637            .await?;
 638
 639        let hosted_projects = self
 640            .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
 641            .await?;
 642
 643        let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
 644        let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
 645
 646        Ok(ChannelsForUser {
 647            channel_memberships,
 648            channels,
 649            hosted_projects,
 650            dev_servers,
 651            remote_projects,
 652            channel_participants,
 653            latest_buffer_versions,
 654            latest_channel_messages,
 655            observed_buffer_versions,
 656            observed_channel_messages,
 657        })
 658    }
 659
 660    /// Sets the role for the specified channel member.
 661    pub async fn set_channel_member_role(
 662        &self,
 663        channel_id: ChannelId,
 664        admin_id: UserId,
 665        for_user: UserId,
 666        role: ChannelRole,
 667    ) -> Result<SetMemberRoleResult> {
 668        self.transaction(|tx| async move {
 669            let channel = self.get_channel_internal(channel_id, &tx).await?;
 670            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 671                .await?;
 672
 673            let membership = channel_member::Entity::find()
 674                .filter(
 675                    channel_member::Column::ChannelId
 676                        .eq(channel_id)
 677                        .and(channel_member::Column::UserId.eq(for_user)),
 678                )
 679                .one(&*tx)
 680                .await?;
 681
 682            let Some(membership) = membership else {
 683                Err(anyhow!("no such member"))?
 684            };
 685
 686            let mut update = membership.into_active_model();
 687            update.role = ActiveValue::Set(role);
 688            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 689
 690            if updated.accepted {
 691                Ok(SetMemberRoleResult::MembershipUpdated(
 692                    self.calculate_membership_updated(&channel, for_user, &tx)
 693                        .await?,
 694                ))
 695            } else {
 696                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 697                    channel,
 698                )))
 699            }
 700        })
 701        .await
 702    }
 703
 704    /// Returns the details for the specified channel member.
 705    pub async fn get_channel_participant_details(
 706        &self,
 707        channel_id: ChannelId,
 708        user_id: UserId,
 709    ) -> Result<Vec<proto::ChannelMember>> {
 710        let (role, members) = self
 711            .transaction(move |tx| async move {
 712                let channel = self.get_channel_internal(channel_id, &tx).await?;
 713                let role = self
 714                    .check_user_is_channel_participant(&channel, user_id, &tx)
 715                    .await?;
 716                Ok((
 717                    role,
 718                    self.get_channel_participant_details_internal(&channel, &tx)
 719                        .await?,
 720                ))
 721            })
 722            .await?;
 723
 724        if role == ChannelRole::Admin {
 725            Ok(members
 726                .into_iter()
 727                .map(|channel_member| proto::ChannelMember {
 728                    role: channel_member.role.into(),
 729                    user_id: channel_member.user_id.to_proto(),
 730                    kind: if channel_member.accepted {
 731                        Kind::Member
 732                    } else {
 733                        Kind::Invitee
 734                    }
 735                    .into(),
 736                })
 737                .collect())
 738        } else {
 739            return Ok(members
 740                .into_iter()
 741                .filter_map(|member| {
 742                    if !member.accepted {
 743                        return None;
 744                    }
 745                    Some(proto::ChannelMember {
 746                        role: member.role.into(),
 747                        user_id: member.user_id.to_proto(),
 748                        kind: Kind::Member.into(),
 749                    })
 750                })
 751                .collect());
 752        }
 753    }
 754
 755    async fn get_channel_participant_details_internal(
 756        &self,
 757        channel: &channel::Model,
 758        tx: &DatabaseTransaction,
 759    ) -> Result<Vec<channel_member::Model>> {
 760        Ok(channel_member::Entity::find()
 761            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 762            .all(tx)
 763            .await?)
 764    }
 765
 766    /// Returns the participants in the given channel.
 767    pub async fn get_channel_participants(
 768        &self,
 769        channel: &channel::Model,
 770        tx: &DatabaseTransaction,
 771    ) -> Result<Vec<UserId>> {
 772        let participants = self
 773            .get_channel_participant_details_internal(channel, tx)
 774            .await?;
 775        Ok(participants
 776            .into_iter()
 777            .map(|member| member.user_id)
 778            .collect())
 779    }
 780
 781    /// Returns whether the given user is an admin in the specified channel.
 782    pub async fn check_user_is_channel_admin(
 783        &self,
 784        channel: &channel::Model,
 785        user_id: UserId,
 786        tx: &DatabaseTransaction,
 787    ) -> Result<ChannelRole> {
 788        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 789        match role {
 790            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 791            Some(ChannelRole::Member)
 792            | Some(ChannelRole::Talker)
 793            | Some(ChannelRole::Banned)
 794            | Some(ChannelRole::Guest)
 795            | None => Err(anyhow!(
 796                "user is not a channel admin or channel does not exist"
 797            ))?,
 798        }
 799    }
 800
 801    /// Returns whether the given user is a member of the specified channel.
 802    pub async fn check_user_is_channel_member(
 803        &self,
 804        channel: &channel::Model,
 805        user_id: UserId,
 806        tx: &DatabaseTransaction,
 807    ) -> Result<ChannelRole> {
 808        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 809        match channel_role {
 810            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 811            Some(ChannelRole::Banned)
 812            | Some(ChannelRole::Guest)
 813            | Some(ChannelRole::Talker)
 814            | None => Err(anyhow!(
 815                "user is not a channel member or channel does not exist"
 816            ))?,
 817        }
 818    }
 819
 820    /// Returns whether the given user is a participant in the specified channel.
 821    pub async fn check_user_is_channel_participant(
 822        &self,
 823        channel: &channel::Model,
 824        user_id: UserId,
 825        tx: &DatabaseTransaction,
 826    ) -> Result<ChannelRole> {
 827        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 828        match role {
 829            Some(ChannelRole::Admin)
 830            | Some(ChannelRole::Member)
 831            | Some(ChannelRole::Guest)
 832            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 833            Some(ChannelRole::Banned) | None => Err(anyhow!(
 834                "user is not a channel participant or channel does not exist"
 835            ))?,
 836        }
 837    }
 838
 839    /// Returns a user's pending invite for the given channel, if one exists.
 840    pub async fn pending_invite_for_channel(
 841        &self,
 842        channel: &channel::Model,
 843        user_id: UserId,
 844        tx: &DatabaseTransaction,
 845    ) -> Result<Option<channel_member::Model>> {
 846        let row = channel_member::Entity::find()
 847            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 848            .filter(channel_member::Column::UserId.eq(user_id))
 849            .filter(channel_member::Column::Accepted.eq(false))
 850            .one(tx)
 851            .await?;
 852
 853        Ok(row)
 854    }
 855
 856    /// Returns the role for a user in the given channel.
 857    pub async fn channel_role_for_user(
 858        &self,
 859        channel: &channel::Model,
 860        user_id: UserId,
 861        tx: &DatabaseTransaction,
 862    ) -> Result<Option<ChannelRole>> {
 863        let membership = channel_member::Entity::find()
 864            .filter(
 865                channel_member::Column::ChannelId
 866                    .eq(channel.root_id())
 867                    .and(channel_member::Column::UserId.eq(user_id))
 868                    .and(channel_member::Column::Accepted.eq(true)),
 869            )
 870            .one(tx)
 871            .await?;
 872
 873        let Some(membership) = membership else {
 874            return Ok(None);
 875        };
 876
 877        if !membership.role.can_see_channel(channel.visibility) {
 878            return Ok(None);
 879        }
 880
 881        Ok(Some(membership.role))
 882    }
 883
 884    // Get the descendants of the given set if channels, ordered by their
 885    // path.
 886    pub(crate) async fn get_channel_descendants_excluding_self(
 887        &self,
 888        channels: impl IntoIterator<Item = &channel::Model>,
 889        tx: &DatabaseTransaction,
 890    ) -> Result<Vec<channel::Model>> {
 891        let mut filter = Condition::any();
 892        for channel in channels.into_iter() {
 893            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 894        }
 895
 896        if filter.is_empty() {
 897            return Ok(vec![]);
 898        }
 899
 900        Ok(channel::Entity::find()
 901            .filter(filter)
 902            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 903            .all(tx)
 904            .await?)
 905    }
 906
 907    /// Returns the channel with the given ID.
 908    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 909        self.transaction(|tx| async move {
 910            let channel = self.get_channel_internal(channel_id, &tx).await?;
 911            self.check_user_is_channel_participant(&channel, user_id, &tx)
 912                .await?;
 913
 914            Ok(Channel::from_model(channel))
 915        })
 916        .await
 917    }
 918
 919    pub(crate) async fn get_channel_internal(
 920        &self,
 921        channel_id: ChannelId,
 922        tx: &DatabaseTransaction,
 923    ) -> Result<channel::Model> {
 924        Ok(channel::Entity::find_by_id(channel_id)
 925            .one(tx)
 926            .await?
 927            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 928    }
 929
 930    pub(crate) async fn get_or_create_channel_room(
 931        &self,
 932        channel_id: ChannelId,
 933        live_kit_room: &str,
 934        tx: &DatabaseTransaction,
 935    ) -> Result<RoomId> {
 936        let room = room::Entity::find()
 937            .filter(room::Column::ChannelId.eq(channel_id))
 938            .one(tx)
 939            .await?;
 940
 941        let room_id = if let Some(room) = room {
 942            room.id
 943        } else {
 944            let result = room::Entity::insert(room::ActiveModel {
 945                channel_id: ActiveValue::Set(Some(channel_id)),
 946                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 947                ..Default::default()
 948            })
 949            .exec(tx)
 950            .await?;
 951
 952            result.last_insert_id
 953        };
 954
 955        Ok(room_id)
 956    }
 957
 958    /// Move a channel from one parent to another
 959    pub async fn move_channel(
 960        &self,
 961        channel_id: ChannelId,
 962        new_parent_id: ChannelId,
 963        admin_id: UserId,
 964    ) -> Result<(ChannelId, Vec<Channel>)> {
 965        self.transaction(|tx| async move {
 966            let channel = self.get_channel_internal(channel_id, &tx).await?;
 967            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 968                .await?;
 969            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 970
 971            if new_parent.root_id() != channel.root_id() {
 972                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 973            }
 974
 975            if new_parent
 976                .ancestors_including_self()
 977                .any(|id| id == channel.id)
 978            {
 979                Err(anyhow!(ErrorCode::CircularNesting))?;
 980            }
 981
 982            if channel.visibility == ChannelVisibility::Public
 983                && new_parent.visibility != ChannelVisibility::Public
 984            {
 985                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 986            }
 987
 988            let root_id = channel.root_id();
 989            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 990            let new_path = format!("{}{}/", new_parent.path(), channel.id);
 991
 992            let mut model = channel.into_active_model();
 993            model.parent_path = ActiveValue::Set(new_parent.path());
 994            let channel = model.update(&*tx).await?;
 995
 996            let descendent_ids =
 997                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 998                    self.pool.get_database_backend(),
 999                    "
1000                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1001                    WHERE parent_path LIKE $3 || '%'
1002                    RETURNING id
1003                ",
1004                    [old_path.clone().into(), new_path.into(), old_path.into()],
1005                ))
1006                .all(&*tx)
1007                .await?;
1008
1009            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1010
1011            let channels = channel::Entity::find()
1012                .filter(channel::Column::Id.is_in(all_moved_ids))
1013                .all(&*tx)
1014                .await?
1015                .into_iter()
1016                .map(|c| Channel::from_model(c))
1017                .collect::<Vec<_>>();
1018
1019            Ok((root_id, channels))
1020        })
1021        .await
1022    }
1023}
1024
1025#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1026enum QueryIds {
1027    Id,
1028}
1029
1030#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1031enum QueryUserIds {
1032    UserId,
1033}