channels.rs

   1use super::*;
   2use rpc::{
   3    proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
   4    ErrorCode, ErrorCodeExt,
   5};
   6use sea_orm::TryGetableMany;
   7
   8impl Database {
   9    #[cfg(test)]
  10    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  11        self.transaction(move |tx| async move {
  12            let mut channels = Vec::new();
  13            let mut rows = channel::Entity::find().stream(&*tx).await?;
  14            while let Some(row) = rows.next().await {
  15                let row = row?;
  16                channels.push((row.id, row.name));
  17            }
  18            Ok(channels)
  19        })
  20        .await
  21    }
  22
  23    #[cfg(test)]
  24    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  25        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  26    }
  27
  28    #[cfg(test)]
  29    pub async fn create_sub_channel(
  30        &self,
  31        name: &str,
  32        parent: ChannelId,
  33        creator_id: UserId,
  34    ) -> Result<ChannelId> {
  35        Ok(self
  36            .create_channel(name, Some(parent), creator_id)
  37            .await?
  38            .0
  39            .id)
  40    }
  41
  42    /// Creates a new channel.
  43    pub async fn create_channel(
  44        &self,
  45        name: &str,
  46        parent_channel_id: Option<ChannelId>,
  47        admin_id: UserId,
  48    ) -> Result<(
  49        Channel,
  50        Option<channel_member::Model>,
  51        Vec<channel_member::Model>,
  52    )> {
  53        let name = Self::sanitize_channel_name(name)?;
  54        self.transaction(move |tx| async move {
  55            let mut parent = None;
  56            let mut membership = None;
  57
  58            if let Some(parent_channel_id) = parent_channel_id {
  59                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  60                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  61                    .await?;
  62                parent = Some(parent_channel);
  63            }
  64
  65            let channel = channel::ActiveModel {
  66                id: ActiveValue::NotSet,
  67                name: ActiveValue::Set(name.to_string()),
  68                visibility: ActiveValue::Set(ChannelVisibility::Members),
  69                parent_path: ActiveValue::Set(
  70                    parent
  71                        .as_ref()
  72                        .map_or(String::new(), |parent| parent.path()),
  73                ),
  74                requires_zed_cla: ActiveValue::NotSet,
  75            }
  76            .insert(&*tx)
  77            .await?;
  78
  79            if parent.is_none() {
  80                membership = Some(
  81                    channel_member::ActiveModel {
  82                        id: ActiveValue::NotSet,
  83                        channel_id: ActiveValue::Set(channel.id),
  84                        user_id: ActiveValue::Set(admin_id),
  85                        accepted: ActiveValue::Set(true),
  86                        role: ActiveValue::Set(ChannelRole::Admin),
  87                    }
  88                    .insert(&*tx)
  89                    .await?,
  90                );
  91            }
  92
  93            let channel_members = channel_member::Entity::find()
  94                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  95                .all(&*tx)
  96                .await?;
  97
  98            Ok((Channel::from_model(channel), membership, channel_members))
  99        })
 100        .await
 101    }
 102
 103    /// Adds a user to the specified channel.
 104    pub async fn join_channel(
 105        &self,
 106        channel_id: ChannelId,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 110        self.transaction(move |tx| async move {
 111            let channel = self.get_channel_internal(channel_id, &tx).await?;
 112            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 113
 114            let mut accept_invite_result = None;
 115
 116            if role.is_none() {
 117                if let Some(invitation) = self
 118                    .pending_invite_for_channel(&channel, user_id, &tx)
 119                    .await?
 120                {
 121                    // note, this may be a parent channel
 122                    role = Some(invitation.role);
 123                    channel_member::Entity::update(channel_member::ActiveModel {
 124                        accepted: ActiveValue::Set(true),
 125                        ..invitation.into_active_model()
 126                    })
 127                    .exec(&*tx)
 128                    .await?;
 129
 130                    accept_invite_result = Some(
 131                        self.calculate_membership_updated(&channel, user_id, &tx)
 132                            .await?,
 133                    );
 134
 135                    debug_assert!(
 136                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 137                    );
 138                } else if channel.visibility == ChannelVisibility::Public {
 139                    role = Some(ChannelRole::Guest);
 140                    channel_member::Entity::insert(channel_member::ActiveModel {
 141                        id: ActiveValue::NotSet,
 142                        channel_id: ActiveValue::Set(channel.root_id()),
 143                        user_id: ActiveValue::Set(user_id),
 144                        accepted: ActiveValue::Set(true),
 145                        role: ActiveValue::Set(ChannelRole::Guest),
 146                    })
 147                    .exec(&*tx)
 148                    .await?;
 149
 150                    accept_invite_result = Some(
 151                        self.calculate_membership_updated(&channel, user_id, &tx)
 152                            .await?,
 153                    );
 154
 155                    debug_assert!(
 156                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 157                    );
 158                }
 159            }
 160
 161            if role.is_none() || role == Some(ChannelRole::Banned) {
 162                Err(ErrorCode::Forbidden.anyhow())?
 163            }
 164            let role = role.unwrap();
 165
 166            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 167            let room_id = self
 168                .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
 169                .await?;
 170
 171            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 172                .await
 173                .map(|jr| (jr, accept_invite_result, role))
 174        })
 175        .await
 176    }
 177
 178    /// Sets the visibility of the given channel.
 179    pub async fn set_channel_visibility(
 180        &self,
 181        channel_id: ChannelId,
 182        visibility: ChannelVisibility,
 183        admin_id: UserId,
 184    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 185        self.transaction(move |tx| async move {
 186            let channel = self.get_channel_internal(channel_id, &tx).await?;
 187            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 188                .await?;
 189
 190            if visibility == ChannelVisibility::Public {
 191                if let Some(parent_id) = channel.parent_id() {
 192                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 193
 194                    if parent.visibility != ChannelVisibility::Public {
 195                        Err(ErrorCode::BadPublicNesting
 196                            .with_tag("direction", "parent")
 197                            .anyhow())?;
 198                    }
 199                }
 200            } else if visibility == ChannelVisibility::Members {
 201                if self
 202                    .get_channel_descendants_excluding_self([&channel], &tx)
 203                    .await?
 204                    .into_iter()
 205                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 206                {
 207                    Err(ErrorCode::BadPublicNesting
 208                        .with_tag("direction", "children")
 209                        .anyhow())?;
 210                }
 211            }
 212
 213            let mut model = channel.into_active_model();
 214            model.visibility = ActiveValue::Set(visibility);
 215            let channel = model.update(&*tx).await?;
 216
 217            let channel_members = channel_member::Entity::find()
 218                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 219                .all(&*tx)
 220                .await?;
 221
 222            Ok((Channel::from_model(channel), channel_members))
 223        })
 224        .await
 225    }
 226
 227    #[cfg(test)]
 228    pub async fn set_channel_requires_zed_cla(
 229        &self,
 230        channel_id: ChannelId,
 231        requires_zed_cla: bool,
 232    ) -> Result<()> {
 233        self.transaction(move |tx| async move {
 234            let channel = self.get_channel_internal(channel_id, &tx).await?;
 235            let mut model = channel.into_active_model();
 236            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 237            model.update(&*tx).await?;
 238            Ok(())
 239        })
 240        .await
 241    }
 242
 243    /// Deletes the channel with the specified ID.
 244    pub async fn delete_channel(
 245        &self,
 246        channel_id: ChannelId,
 247        user_id: UserId,
 248    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 249        self.transaction(move |tx| async move {
 250            let channel = self.get_channel_internal(channel_id, &tx).await?;
 251            self.check_user_is_channel_admin(&channel, user_id, &tx)
 252                .await?;
 253
 254            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 255                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 256                .select_only()
 257                .column(channel_member::Column::UserId)
 258                .distinct()
 259                .into_values::<_, QueryUserIds>()
 260                .all(&*tx)
 261                .await?;
 262
 263            let channels_to_remove = self
 264                .get_channel_descendants_excluding_self([&channel], &tx)
 265                .await?
 266                .into_iter()
 267                .map(|channel| channel.id)
 268                .chain(Some(channel_id))
 269                .collect::<Vec<_>>();
 270
 271            channel::Entity::delete_many()
 272                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 273                .exec(&*tx)
 274                .await?;
 275
 276            Ok((channels_to_remove, members_to_notify))
 277        })
 278        .await
 279    }
 280
 281    /// Invites a user to a channel as a member.
 282    pub async fn invite_channel_member(
 283        &self,
 284        channel_id: ChannelId,
 285        invitee_id: UserId,
 286        inviter_id: UserId,
 287        role: ChannelRole,
 288    ) -> Result<InviteMemberResult> {
 289        self.transaction(move |tx| async move {
 290            let channel = self.get_channel_internal(channel_id, &tx).await?;
 291            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 292                .await?;
 293            if !channel.is_root() {
 294                Err(ErrorCode::NotARootChannel.anyhow())?
 295            }
 296
 297            channel_member::ActiveModel {
 298                id: ActiveValue::NotSet,
 299                channel_id: ActiveValue::Set(channel_id),
 300                user_id: ActiveValue::Set(invitee_id),
 301                accepted: ActiveValue::Set(false),
 302                role: ActiveValue::Set(role),
 303            }
 304            .insert(&*tx)
 305            .await?;
 306
 307            let channel = Channel::from_model(channel);
 308
 309            let notifications = self
 310                .create_notification(
 311                    invitee_id,
 312                    rpc::Notification::ChannelInvitation {
 313                        channel_id: channel_id.to_proto(),
 314                        channel_name: channel.name.clone(),
 315                        inviter_id: inviter_id.to_proto(),
 316                    },
 317                    true,
 318                    &tx,
 319                )
 320                .await?
 321                .into_iter()
 322                .collect();
 323
 324            Ok(InviteMemberResult {
 325                channel,
 326                notifications,
 327            })
 328        })
 329        .await
 330    }
 331
 332    fn sanitize_channel_name(name: &str) -> Result<&str> {
 333        let new_name = name.trim().trim_start_matches('#');
 334        if new_name == "" {
 335            Err(anyhow!("channel name can't be blank"))?;
 336        }
 337        Ok(new_name)
 338    }
 339
 340    /// Renames the specified channel.
 341    pub async fn rename_channel(
 342        &self,
 343        channel_id: ChannelId,
 344        admin_id: UserId,
 345        new_name: &str,
 346    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 347        self.transaction(move |tx| async move {
 348            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 349
 350            let channel = self.get_channel_internal(channel_id, &tx).await?;
 351            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 352                .await?;
 353
 354            let mut model = channel.into_active_model();
 355            model.name = ActiveValue::Set(new_name.clone());
 356            let channel = model.update(&*tx).await?;
 357
 358            let channel_members = channel_member::Entity::find()
 359                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 360                .all(&*tx)
 361                .await?;
 362
 363            Ok((Channel::from_model(channel), channel_members))
 364        })
 365        .await
 366    }
 367
 368    /// accept or decline an invite to join a channel
 369    pub async fn respond_to_channel_invite(
 370        &self,
 371        channel_id: ChannelId,
 372        user_id: UserId,
 373        accept: bool,
 374    ) -> Result<RespondToChannelInvite> {
 375        self.transaction(move |tx| async move {
 376            let channel = self.get_channel_internal(channel_id, &tx).await?;
 377
 378            let membership_update = if accept {
 379                let rows_affected = channel_member::Entity::update_many()
 380                    .set(channel_member::ActiveModel {
 381                        accepted: ActiveValue::Set(accept),
 382                        ..Default::default()
 383                    })
 384                    .filter(
 385                        channel_member::Column::ChannelId
 386                            .eq(channel_id)
 387                            .and(channel_member::Column::UserId.eq(user_id))
 388                            .and(channel_member::Column::Accepted.eq(false)),
 389                    )
 390                    .exec(&*tx)
 391                    .await?
 392                    .rows_affected;
 393
 394                if rows_affected == 0 {
 395                    Err(anyhow!("no such invitation"))?;
 396                }
 397
 398                Some(
 399                    self.calculate_membership_updated(&channel, user_id, &tx)
 400                        .await?,
 401                )
 402            } else {
 403                let rows_affected = channel_member::Entity::delete_many()
 404                    .filter(
 405                        channel_member::Column::ChannelId
 406                            .eq(channel_id)
 407                            .and(channel_member::Column::UserId.eq(user_id))
 408                            .and(channel_member::Column::Accepted.eq(false)),
 409                    )
 410                    .exec(&*tx)
 411                    .await?
 412                    .rows_affected;
 413                if rows_affected == 0 {
 414                    Err(anyhow!("no such invitation"))?;
 415                }
 416
 417                None
 418            };
 419
 420            Ok(RespondToChannelInvite {
 421                membership_update,
 422                notifications: self
 423                    .mark_notification_as_read_with_response(
 424                        user_id,
 425                        &rpc::Notification::ChannelInvitation {
 426                            channel_id: channel_id.to_proto(),
 427                            channel_name: Default::default(),
 428                            inviter_id: Default::default(),
 429                        },
 430                        accept,
 431                        &tx,
 432                    )
 433                    .await?
 434                    .into_iter()
 435                    .collect(),
 436            })
 437        })
 438        .await
 439    }
 440
 441    async fn calculate_membership_updated(
 442        &self,
 443        channel: &channel::Model,
 444        user_id: UserId,
 445        tx: &DatabaseTransaction,
 446    ) -> Result<MembershipUpdated> {
 447        let new_channels = self.get_user_channels(user_id, Some(channel), tx).await?;
 448        let removed_channels = self
 449            .get_channel_descendants_excluding_self([channel], tx)
 450            .await?
 451            .into_iter()
 452            .map(|channel| channel.id)
 453            .chain([channel.id])
 454            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 455            .collect::<Vec<_>>();
 456
 457        Ok(MembershipUpdated {
 458            channel_id: channel.id,
 459            new_channels,
 460            removed_channels,
 461        })
 462    }
 463
 464    /// Removes a channel member.
 465    pub async fn remove_channel_member(
 466        &self,
 467        channel_id: ChannelId,
 468        member_id: UserId,
 469        admin_id: UserId,
 470    ) -> Result<RemoveChannelMemberResult> {
 471        self.transaction(|tx| async move {
 472            let channel = self.get_channel_internal(channel_id, &tx).await?;
 473
 474            if member_id != admin_id {
 475                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 476                    .await?;
 477            }
 478
 479            let result = channel_member::Entity::delete_many()
 480                .filter(
 481                    channel_member::Column::ChannelId
 482                        .eq(channel_id)
 483                        .and(channel_member::Column::UserId.eq(member_id)),
 484                )
 485                .exec(&*tx)
 486                .await?;
 487
 488            if result.rows_affected == 0 {
 489                Err(anyhow!("no such member"))?;
 490            }
 491
 492            Ok(RemoveChannelMemberResult {
 493                membership_update: self
 494                    .calculate_membership_updated(&channel, member_id, &tx)
 495                    .await?,
 496                notification_id: self
 497                    .remove_notification(
 498                        member_id,
 499                        rpc::Notification::ChannelInvitation {
 500                            channel_id: channel_id.to_proto(),
 501                            channel_name: Default::default(),
 502                            inviter_id: Default::default(),
 503                        },
 504                        &tx,
 505                    )
 506                    .await?,
 507            })
 508        })
 509        .await
 510    }
 511
 512    /// Returns all channel invites for the user with the given ID.
 513    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 514        self.transaction(|tx| async move {
 515            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 516
 517            let channel_invites = channel_member::Entity::find()
 518                .filter(
 519                    channel_member::Column::UserId
 520                        .eq(user_id)
 521                        .and(channel_member::Column::Accepted.eq(false)),
 522                )
 523                .all(&*tx)
 524                .await?;
 525
 526            for invite in channel_invites {
 527                role_for_channel.insert(invite.channel_id, invite.role);
 528            }
 529
 530            let channels = channel::Entity::find()
 531                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 532                .all(&*tx)
 533                .await?;
 534
 535            let channels = channels.into_iter().map(Channel::from_model).collect();
 536
 537            Ok(channels)
 538        })
 539        .await
 540    }
 541
 542    /// Returns all channels for the user with the given ID.
 543    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 544        self.transaction(|tx| async move {
 545            let tx = tx;
 546
 547            self.get_user_channels(user_id, None, &tx).await
 548        })
 549        .await
 550    }
 551
 552    /// Returns all channels for the user with the given ID that are descendants
 553    /// of the specified ancestor channel.
 554    pub async fn get_user_channels(
 555        &self,
 556        user_id: UserId,
 557        ancestor_channel: Option<&channel::Model>,
 558        tx: &DatabaseTransaction,
 559    ) -> Result<ChannelsForUser> {
 560        let mut filter = channel_member::Column::UserId
 561            .eq(user_id)
 562            .and(channel_member::Column::Accepted.eq(true));
 563
 564        if let Some(ancestor) = ancestor_channel {
 565            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 566        }
 567
 568        let channel_memberships = channel_member::Entity::find()
 569            .filter(filter)
 570            .all(tx)
 571            .await?;
 572
 573        let channels = channel::Entity::find()
 574            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 575            .all(tx)
 576            .await?;
 577
 578        let mut descendants = self
 579            .get_channel_descendants_excluding_self(channels.iter(), tx)
 580            .await?;
 581
 582        for channel in channels {
 583            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 584                descendants.insert(ix, channel);
 585            }
 586        }
 587
 588        let roles_by_channel_id = channel_memberships
 589            .iter()
 590            .map(|membership| (membership.channel_id, membership.role))
 591            .collect::<HashMap<_, _>>();
 592
 593        let channels: Vec<Channel> = descendants
 594            .into_iter()
 595            .filter_map(|channel| {
 596                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 597                if parent_role.can_see_channel(channel.visibility) {
 598                    Some(Channel::from_model(channel))
 599                } else {
 600                    None
 601                }
 602            })
 603            .collect();
 604
 605        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 606        enum QueryUserIdsAndChannelIds {
 607            ChannelId,
 608            UserId,
 609        }
 610
 611        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 612        {
 613            let mut rows = room_participant::Entity::find()
 614                .inner_join(room::Entity)
 615                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 616                .select_only()
 617                .column(room::Column::ChannelId)
 618                .column(room_participant::Column::UserId)
 619                .into_values::<_, QueryUserIdsAndChannelIds>()
 620                .stream(tx)
 621                .await?;
 622            while let Some(row) = rows.next().await {
 623                let row: (ChannelId, UserId) = row?;
 624                channel_participants.entry(row.0).or_default().push(row.1)
 625            }
 626        }
 627
 628        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 629
 630        let mut channel_ids_by_buffer_id = HashMap::default();
 631        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 632        let mut rows = buffer::Entity::find()
 633            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 634            .stream(tx)
 635            .await?;
 636        while let Some(row) = rows.next().await {
 637            let row = row?;
 638            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 639            latest_buffer_versions.push(ChannelBufferVersion {
 640                channel_id: row.channel_id.0 as u64,
 641                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 642                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 643                    .latest_operation_lamport_timestamp
 644                    .zip(row.latest_operation_replica_id)
 645                {
 646                    vec![VectorClockEntry {
 647                        timestamp: latest_lamport_timestamp as u32,
 648                        replica_id: latest_replica_id as u32,
 649                    }]
 650                } else {
 651                    vec![]
 652                },
 653            });
 654        }
 655        drop(rows);
 656
 657        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 658
 659        let observed_buffer_versions = self
 660            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 661            .await?;
 662
 663        let observed_channel_messages = self
 664            .observed_channel_messages(&channel_ids, user_id, tx)
 665            .await?;
 666
 667        let hosted_projects = self
 668            .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
 669            .await?;
 670
 671        Ok(ChannelsForUser {
 672            channel_memberships,
 673            channels,
 674            hosted_projects,
 675            channel_participants,
 676            latest_buffer_versions,
 677            latest_channel_messages,
 678            observed_buffer_versions,
 679            observed_channel_messages,
 680        })
 681    }
 682
 683    /// Sets the role for the specified channel member.
 684    pub async fn set_channel_member_role(
 685        &self,
 686        channel_id: ChannelId,
 687        admin_id: UserId,
 688        for_user: UserId,
 689        role: ChannelRole,
 690    ) -> Result<SetMemberRoleResult> {
 691        self.transaction(|tx| async move {
 692            let channel = self.get_channel_internal(channel_id, &tx).await?;
 693            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 694                .await?;
 695
 696            let membership = channel_member::Entity::find()
 697                .filter(
 698                    channel_member::Column::ChannelId
 699                        .eq(channel_id)
 700                        .and(channel_member::Column::UserId.eq(for_user)),
 701                )
 702                .one(&*tx)
 703                .await?;
 704
 705            let Some(membership) = membership else {
 706                Err(anyhow!("no such member"))?
 707            };
 708
 709            let mut update = membership.into_active_model();
 710            update.role = ActiveValue::Set(role);
 711            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 712
 713            if updated.accepted {
 714                Ok(SetMemberRoleResult::MembershipUpdated(
 715                    self.calculate_membership_updated(&channel, for_user, &tx)
 716                        .await?,
 717                ))
 718            } else {
 719                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 720                    channel,
 721                )))
 722            }
 723        })
 724        .await
 725    }
 726
 727    /// Returns the details for the specified channel member.
 728    pub async fn get_channel_participant_details(
 729        &self,
 730        channel_id: ChannelId,
 731        user_id: UserId,
 732    ) -> Result<Vec<proto::ChannelMember>> {
 733        let (role, members) = self
 734            .transaction(move |tx| async move {
 735                let channel = self.get_channel_internal(channel_id, &tx).await?;
 736                let role = self
 737                    .check_user_is_channel_participant(&channel, user_id, &tx)
 738                    .await?;
 739                Ok((
 740                    role,
 741                    self.get_channel_participant_details_internal(&channel, &tx)
 742                        .await?,
 743                ))
 744            })
 745            .await?;
 746
 747        if role == ChannelRole::Admin {
 748            Ok(members
 749                .into_iter()
 750                .map(|channel_member| proto::ChannelMember {
 751                    role: channel_member.role.into(),
 752                    user_id: channel_member.user_id.to_proto(),
 753                    kind: if channel_member.accepted {
 754                        Kind::Member
 755                    } else {
 756                        Kind::Invitee
 757                    }
 758                    .into(),
 759                })
 760                .collect())
 761        } else {
 762            return Ok(members
 763                .into_iter()
 764                .filter_map(|member| {
 765                    if !member.accepted {
 766                        return None;
 767                    }
 768                    Some(proto::ChannelMember {
 769                        role: member.role.into(),
 770                        user_id: member.user_id.to_proto(),
 771                        kind: Kind::Member.into(),
 772                    })
 773                })
 774                .collect());
 775        }
 776    }
 777
 778    async fn get_channel_participant_details_internal(
 779        &self,
 780        channel: &channel::Model,
 781        tx: &DatabaseTransaction,
 782    ) -> Result<Vec<channel_member::Model>> {
 783        Ok(channel_member::Entity::find()
 784            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 785            .all(tx)
 786            .await?)
 787    }
 788
 789    /// Returns the participants in the given channel.
 790    pub async fn get_channel_participants(
 791        &self,
 792        channel: &channel::Model,
 793        tx: &DatabaseTransaction,
 794    ) -> Result<Vec<UserId>> {
 795        let participants = self
 796            .get_channel_participant_details_internal(channel, tx)
 797            .await?;
 798        Ok(participants
 799            .into_iter()
 800            .map(|member| member.user_id)
 801            .collect())
 802    }
 803
 804    /// Returns whether the given user is an admin in the specified channel.
 805    pub async fn check_user_is_channel_admin(
 806        &self,
 807        channel: &channel::Model,
 808        user_id: UserId,
 809        tx: &DatabaseTransaction,
 810    ) -> Result<ChannelRole> {
 811        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 812        match role {
 813            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 814            Some(ChannelRole::Member)
 815            | Some(ChannelRole::Talker)
 816            | Some(ChannelRole::Banned)
 817            | Some(ChannelRole::Guest)
 818            | None => Err(anyhow!(
 819                "user is not a channel admin or channel does not exist"
 820            ))?,
 821        }
 822    }
 823
 824    /// Returns whether the given user is a member of the specified channel.
 825    pub async fn check_user_is_channel_member(
 826        &self,
 827        channel: &channel::Model,
 828        user_id: UserId,
 829        tx: &DatabaseTransaction,
 830    ) -> Result<ChannelRole> {
 831        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 832        match channel_role {
 833            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 834            Some(ChannelRole::Banned)
 835            | Some(ChannelRole::Guest)
 836            | Some(ChannelRole::Talker)
 837            | None => Err(anyhow!(
 838                "user is not a channel member or channel does not exist"
 839            ))?,
 840        }
 841    }
 842
 843    /// Returns whether the given user is a participant in the specified channel.
 844    pub async fn check_user_is_channel_participant(
 845        &self,
 846        channel: &channel::Model,
 847        user_id: UserId,
 848        tx: &DatabaseTransaction,
 849    ) -> Result<ChannelRole> {
 850        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 851        match role {
 852            Some(ChannelRole::Admin)
 853            | Some(ChannelRole::Member)
 854            | Some(ChannelRole::Guest)
 855            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 856            Some(ChannelRole::Banned) | None => Err(anyhow!(
 857                "user is not a channel participant or channel does not exist"
 858            ))?,
 859        }
 860    }
 861
 862    /// Returns a user's pending invite for the given channel, if one exists.
 863    pub async fn pending_invite_for_channel(
 864        &self,
 865        channel: &channel::Model,
 866        user_id: UserId,
 867        tx: &DatabaseTransaction,
 868    ) -> Result<Option<channel_member::Model>> {
 869        let row = channel_member::Entity::find()
 870            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 871            .filter(channel_member::Column::UserId.eq(user_id))
 872            .filter(channel_member::Column::Accepted.eq(false))
 873            .one(tx)
 874            .await?;
 875
 876        Ok(row)
 877    }
 878
 879    /// Returns the role for a user in the given channel.
 880    pub async fn channel_role_for_user(
 881        &self,
 882        channel: &channel::Model,
 883        user_id: UserId,
 884        tx: &DatabaseTransaction,
 885    ) -> Result<Option<ChannelRole>> {
 886        let membership = channel_member::Entity::find()
 887            .filter(
 888                channel_member::Column::ChannelId
 889                    .eq(channel.root_id())
 890                    .and(channel_member::Column::UserId.eq(user_id))
 891                    .and(channel_member::Column::Accepted.eq(true)),
 892            )
 893            .one(tx)
 894            .await?;
 895
 896        let Some(membership) = membership else {
 897            return Ok(None);
 898        };
 899
 900        if !membership.role.can_see_channel(channel.visibility) {
 901            return Ok(None);
 902        }
 903
 904        Ok(Some(membership.role))
 905    }
 906
 907    // Get the descendants of the given set if channels, ordered by their
 908    // path.
 909    pub(crate) async fn get_channel_descendants_excluding_self(
 910        &self,
 911        channels: impl IntoIterator<Item = &channel::Model>,
 912        tx: &DatabaseTransaction,
 913    ) -> Result<Vec<channel::Model>> {
 914        let mut filter = Condition::any();
 915        for channel in channels.into_iter() {
 916            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 917        }
 918
 919        if filter.is_empty() {
 920            return Ok(vec![]);
 921        }
 922
 923        Ok(channel::Entity::find()
 924            .filter(filter)
 925            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 926            .all(tx)
 927            .await?)
 928    }
 929
 930    /// Returns the channel with the given ID.
 931    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 932        self.transaction(|tx| async move {
 933            let channel = self.get_channel_internal(channel_id, &tx).await?;
 934            self.check_user_is_channel_participant(&channel, user_id, &tx)
 935                .await?;
 936
 937            Ok(Channel::from_model(channel))
 938        })
 939        .await
 940    }
 941
 942    pub(crate) async fn get_channel_internal(
 943        &self,
 944        channel_id: ChannelId,
 945        tx: &DatabaseTransaction,
 946    ) -> Result<channel::Model> {
 947        Ok(channel::Entity::find_by_id(channel_id)
 948            .one(tx)
 949            .await?
 950            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 951    }
 952
 953    pub(crate) async fn get_or_create_channel_room(
 954        &self,
 955        channel_id: ChannelId,
 956        live_kit_room: &str,
 957        tx: &DatabaseTransaction,
 958    ) -> Result<RoomId> {
 959        let room = room::Entity::find()
 960            .filter(room::Column::ChannelId.eq(channel_id))
 961            .one(tx)
 962            .await?;
 963
 964        let room_id = if let Some(room) = room {
 965            room.id
 966        } else {
 967            let result = room::Entity::insert(room::ActiveModel {
 968                channel_id: ActiveValue::Set(Some(channel_id)),
 969                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 970                ..Default::default()
 971            })
 972            .exec(tx)
 973            .await?;
 974
 975            result.last_insert_id
 976        };
 977
 978        Ok(room_id)
 979    }
 980
 981    /// Move a channel from one parent to another
 982    pub async fn move_channel(
 983        &self,
 984        channel_id: ChannelId,
 985        new_parent_id: ChannelId,
 986        admin_id: UserId,
 987    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 988        self.transaction(|tx| async move {
 989            let channel = self.get_channel_internal(channel_id, &tx).await?;
 990            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 991                .await?;
 992            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 993
 994            if new_parent.root_id() != channel.root_id() {
 995                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 996            }
 997
 998            if new_parent
 999                .ancestors_including_self()
1000                .any(|id| id == channel.id)
1001            {
1002                Err(anyhow!(ErrorCode::CircularNesting))?;
1003            }
1004
1005            if channel.visibility == ChannelVisibility::Public
1006                && new_parent.visibility != ChannelVisibility::Public
1007            {
1008                Err(anyhow!(ErrorCode::BadPublicNesting))?;
1009            }
1010
1011            let root_id = channel.root_id();
1012            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1013            let new_path = format!("{}{}/", new_parent.path(), channel.id);
1014
1015            let mut model = channel.into_active_model();
1016            model.parent_path = ActiveValue::Set(new_parent.path());
1017            let channel = model.update(&*tx).await?;
1018
1019            let descendent_ids =
1020                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1021                    self.pool.get_database_backend(),
1022                    "
1023                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1024                    WHERE parent_path LIKE $3 || '%'
1025                    RETURNING id
1026                ",
1027                    [old_path.clone().into(), new_path.into(), old_path.into()],
1028                ))
1029                .all(&*tx)
1030                .await?;
1031
1032            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1033
1034            let channels = channel::Entity::find()
1035                .filter(channel::Column::Id.is_in(all_moved_ids))
1036                .all(&*tx)
1037                .await?
1038                .into_iter()
1039                .map(|c| Channel::from_model(c))
1040                .collect::<Vec<_>>();
1041
1042            let channel_members = channel_member::Entity::find()
1043                .filter(channel_member::Column::ChannelId.eq(root_id))
1044                .all(&*tx)
1045                .await?;
1046
1047            Ok((channels, channel_members))
1048        })
1049        .await
1050    }
1051}
1052
1053#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1054enum QueryIds {
1055    Id,
1056}
1057
1058#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1059enum QueryUserIds {
1060    UserId,
1061}