channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    /// Adds a user to the specified channel.
 101    pub async fn join_channel(
 102        &self,
 103        channel_id: ChannelId,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 107        self.transaction(move |tx| async move {
 108            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 109            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 110
 111            let mut accept_invite_result = None;
 112
 113            if role.is_none() {
 114                if let Some(invitation) = self
 115                    .pending_invite_for_channel(&channel, user_id, &*tx)
 116                    .await?
 117                {
 118                    // note, this may be a parent channel
 119                    role = Some(invitation.role);
 120                    channel_member::Entity::update(channel_member::ActiveModel {
 121                        accepted: ActiveValue::Set(true),
 122                        ..invitation.into_active_model()
 123                    })
 124                    .exec(&*tx)
 125                    .await?;
 126
 127                    accept_invite_result = Some(
 128                        self.calculate_membership_updated(&channel, user_id, &*tx)
 129                            .await?,
 130                    );
 131
 132                    debug_assert!(
 133                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 134                    );
 135                } else if channel.visibility == ChannelVisibility::Public {
 136                    role = Some(ChannelRole::Guest);
 137                    channel_member::Entity::insert(channel_member::ActiveModel {
 138                        id: ActiveValue::NotSet,
 139                        channel_id: ActiveValue::Set(channel.root_id()),
 140                        user_id: ActiveValue::Set(user_id),
 141                        accepted: ActiveValue::Set(true),
 142                        role: ActiveValue::Set(ChannelRole::Guest),
 143                    })
 144                    .exec(&*tx)
 145                    .await?;
 146
 147                    accept_invite_result = Some(
 148                        self.calculate_membership_updated(&channel, user_id, &*tx)
 149                            .await?,
 150                    );
 151
 152                    debug_assert!(
 153                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 154                    );
 155                }
 156            }
 157
 158            if role.is_none() || role == Some(ChannelRole::Banned) {
 159                Err(ErrorCode::Forbidden.anyhow())?
 160            }
 161            let role = role.unwrap();
 162
 163            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 164            let room_id = self
 165                .get_or_create_channel_room(channel_id, &live_kit_room, &*tx)
 166                .await?;
 167
 168            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 169                .await
 170                .map(|jr| (jr, accept_invite_result, role))
 171        })
 172        .await
 173    }
 174
 175    /// Sets the visibility of the given channel.
 176    pub async fn set_channel_visibility(
 177        &self,
 178        channel_id: ChannelId,
 179        visibility: ChannelVisibility,
 180        admin_id: UserId,
 181    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 182        self.transaction(move |tx| async move {
 183            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 184            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 185                .await?;
 186
 187            if visibility == ChannelVisibility::Public {
 188                if let Some(parent_id) = channel.parent_id() {
 189                    let parent = self.get_channel_internal(parent_id, &*tx).await?;
 190
 191                    if parent.visibility != ChannelVisibility::Public {
 192                        Err(ErrorCode::BadPublicNesting
 193                            .with_tag("direction", "parent")
 194                            .anyhow())?;
 195                    }
 196                }
 197            } else if visibility == ChannelVisibility::Members {
 198                if self
 199                    .get_channel_descendants_excluding_self([&channel], &*tx)
 200                    .await?
 201                    .into_iter()
 202                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 203                {
 204                    Err(ErrorCode::BadPublicNesting
 205                        .with_tag("direction", "children")
 206                        .anyhow())?;
 207                }
 208            }
 209
 210            let mut model = channel.into_active_model();
 211            model.visibility = ActiveValue::Set(visibility);
 212            let channel = model.update(&*tx).await?;
 213
 214            let channel_members = channel_member::Entity::find()
 215                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 216                .all(&*tx)
 217                .await?;
 218
 219            Ok((Channel::from_model(channel), channel_members))
 220        })
 221        .await
 222    }
 223
 224    #[cfg(test)]
 225    pub async fn set_channel_requires_zed_cla(
 226        &self,
 227        channel_id: ChannelId,
 228        requires_zed_cla: bool,
 229    ) -> Result<()> {
 230        self.transaction(move |tx| async move {
 231            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 232            let mut model = channel.into_active_model();
 233            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 234            model.update(&*tx).await?;
 235            Ok(())
 236        })
 237        .await
 238    }
 239
 240    /// Deletes the channel with the specified ID.
 241    pub async fn delete_channel(
 242        &self,
 243        channel_id: ChannelId,
 244        user_id: UserId,
 245    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 246        self.transaction(move |tx| async move {
 247            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 248            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 249                .await?;
 250
 251            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 252                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 253                .select_only()
 254                .column(channel_member::Column::UserId)
 255                .distinct()
 256                .into_values::<_, QueryUserIds>()
 257                .all(&*tx)
 258                .await?;
 259
 260            let channels_to_remove = self
 261                .get_channel_descendants_excluding_self([&channel], &*tx)
 262                .await?
 263                .into_iter()
 264                .map(|channel| channel.id)
 265                .chain(Some(channel_id))
 266                .collect::<Vec<_>>();
 267
 268            channel::Entity::delete_many()
 269                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 270                .exec(&*tx)
 271                .await?;
 272
 273            Ok((channels_to_remove, members_to_notify))
 274        })
 275        .await
 276    }
 277
 278    /// Invites a user to a channel as a member.
 279    pub async fn invite_channel_member(
 280        &self,
 281        channel_id: ChannelId,
 282        invitee_id: UserId,
 283        inviter_id: UserId,
 284        role: ChannelRole,
 285    ) -> Result<InviteMemberResult> {
 286        self.transaction(move |tx| async move {
 287            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 288            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 289                .await?;
 290            if !channel.is_root() {
 291                Err(ErrorCode::NotARootChannel.anyhow())?
 292            }
 293
 294            channel_member::ActiveModel {
 295                id: ActiveValue::NotSet,
 296                channel_id: ActiveValue::Set(channel_id),
 297                user_id: ActiveValue::Set(invitee_id),
 298                accepted: ActiveValue::Set(false),
 299                role: ActiveValue::Set(role),
 300            }
 301            .insert(&*tx)
 302            .await?;
 303
 304            let channel = Channel::from_model(channel);
 305
 306            let notifications = self
 307                .create_notification(
 308                    invitee_id,
 309                    rpc::Notification::ChannelInvitation {
 310                        channel_id: channel_id.to_proto(),
 311                        channel_name: channel.name.clone(),
 312                        inviter_id: inviter_id.to_proto(),
 313                    },
 314                    true,
 315                    &*tx,
 316                )
 317                .await?
 318                .into_iter()
 319                .collect();
 320
 321            Ok(InviteMemberResult {
 322                channel,
 323                notifications,
 324            })
 325        })
 326        .await
 327    }
 328
 329    fn sanitize_channel_name(name: &str) -> Result<&str> {
 330        let new_name = name.trim().trim_start_matches('#');
 331        if new_name == "" {
 332            Err(anyhow!("channel name can't be blank"))?;
 333        }
 334        Ok(new_name)
 335    }
 336
 337    /// Renames the specified channel.
 338    pub async fn rename_channel(
 339        &self,
 340        channel_id: ChannelId,
 341        admin_id: UserId,
 342        new_name: &str,
 343    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 344        self.transaction(move |tx| async move {
 345            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 346
 347            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 348            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 349                .await?;
 350
 351            let mut model = channel.into_active_model();
 352            model.name = ActiveValue::Set(new_name.clone());
 353            let channel = model.update(&*tx).await?;
 354
 355            let channel_members = channel_member::Entity::find()
 356                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 357                .all(&*tx)
 358                .await?;
 359
 360            Ok((Channel::from_model(channel), channel_members))
 361        })
 362        .await
 363    }
 364
 365    /// accept or decline an invite to join a channel
 366    pub async fn respond_to_channel_invite(
 367        &self,
 368        channel_id: ChannelId,
 369        user_id: UserId,
 370        accept: bool,
 371    ) -> Result<RespondToChannelInvite> {
 372        self.transaction(move |tx| async move {
 373            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 374
 375            let membership_update = if accept {
 376                let rows_affected = channel_member::Entity::update_many()
 377                    .set(channel_member::ActiveModel {
 378                        accepted: ActiveValue::Set(accept),
 379                        ..Default::default()
 380                    })
 381                    .filter(
 382                        channel_member::Column::ChannelId
 383                            .eq(channel_id)
 384                            .and(channel_member::Column::UserId.eq(user_id))
 385                            .and(channel_member::Column::Accepted.eq(false)),
 386                    )
 387                    .exec(&*tx)
 388                    .await?
 389                    .rows_affected;
 390
 391                if rows_affected == 0 {
 392                    Err(anyhow!("no such invitation"))?;
 393                }
 394
 395                Some(
 396                    self.calculate_membership_updated(&channel, user_id, &*tx)
 397                        .await?,
 398                )
 399            } else {
 400                let rows_affected = channel_member::Entity::delete_many()
 401                    .filter(
 402                        channel_member::Column::ChannelId
 403                            .eq(channel_id)
 404                            .and(channel_member::Column::UserId.eq(user_id))
 405                            .and(channel_member::Column::Accepted.eq(false)),
 406                    )
 407                    .exec(&*tx)
 408                    .await?
 409                    .rows_affected;
 410                if rows_affected == 0 {
 411                    Err(anyhow!("no such invitation"))?;
 412                }
 413
 414                None
 415            };
 416
 417            Ok(RespondToChannelInvite {
 418                membership_update,
 419                notifications: self
 420                    .mark_notification_as_read_with_response(
 421                        user_id,
 422                        &rpc::Notification::ChannelInvitation {
 423                            channel_id: channel_id.to_proto(),
 424                            channel_name: Default::default(),
 425                            inviter_id: Default::default(),
 426                        },
 427                        accept,
 428                        &*tx,
 429                    )
 430                    .await?
 431                    .into_iter()
 432                    .collect(),
 433            })
 434        })
 435        .await
 436    }
 437
 438    async fn calculate_membership_updated(
 439        &self,
 440        channel: &channel::Model,
 441        user_id: UserId,
 442        tx: &DatabaseTransaction,
 443    ) -> Result<MembershipUpdated> {
 444        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 445        let removed_channels = self
 446            .get_channel_descendants_excluding_self([channel], &*tx)
 447            .await?
 448            .into_iter()
 449            .map(|channel| channel.id)
 450            .chain([channel.id])
 451            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 452            .collect::<Vec<_>>();
 453
 454        Ok(MembershipUpdated {
 455            channel_id: channel.id,
 456            new_channels,
 457            removed_channels,
 458        })
 459    }
 460
 461    /// Removes a channel member.
 462    pub async fn remove_channel_member(
 463        &self,
 464        channel_id: ChannelId,
 465        member_id: UserId,
 466        admin_id: UserId,
 467    ) -> Result<RemoveChannelMemberResult> {
 468        self.transaction(|tx| async move {
 469            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 470
 471            if member_id != admin_id {
 472                self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 473                    .await?;
 474            }
 475
 476            let result = channel_member::Entity::delete_many()
 477                .filter(
 478                    channel_member::Column::ChannelId
 479                        .eq(channel_id)
 480                        .and(channel_member::Column::UserId.eq(member_id)),
 481                )
 482                .exec(&*tx)
 483                .await?;
 484
 485            if result.rows_affected == 0 {
 486                Err(anyhow!("no such member"))?;
 487            }
 488
 489            Ok(RemoveChannelMemberResult {
 490                membership_update: self
 491                    .calculate_membership_updated(&channel, member_id, &*tx)
 492                    .await?,
 493                notification_id: self
 494                    .remove_notification(
 495                        member_id,
 496                        rpc::Notification::ChannelInvitation {
 497                            channel_id: channel_id.to_proto(),
 498                            channel_name: Default::default(),
 499                            inviter_id: Default::default(),
 500                        },
 501                        &*tx,
 502                    )
 503                    .await?,
 504            })
 505        })
 506        .await
 507    }
 508
 509    /// Returns all channel invites for the user with the given ID.
 510    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 511        self.transaction(|tx| async move {
 512            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 513
 514            let channel_invites = channel_member::Entity::find()
 515                .filter(
 516                    channel_member::Column::UserId
 517                        .eq(user_id)
 518                        .and(channel_member::Column::Accepted.eq(false)),
 519                )
 520                .all(&*tx)
 521                .await?;
 522
 523            for invite in channel_invites {
 524                role_for_channel.insert(invite.channel_id, invite.role);
 525            }
 526
 527            let channels = channel::Entity::find()
 528                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 529                .all(&*tx)
 530                .await?;
 531
 532            let channels = channels
 533                .into_iter()
 534                .filter_map(|channel| Some(Channel::from_model(channel)))
 535                .collect();
 536
 537            Ok(channels)
 538        })
 539        .await
 540    }
 541
 542    /// Returns all channels for the user with the given ID.
 543    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 544        self.transaction(|tx| async move {
 545            let tx = tx;
 546
 547            self.get_user_channels(user_id, None, &tx).await
 548        })
 549        .await
 550    }
 551
 552    /// Returns all channels for the user with the given ID that are descendants
 553    /// of the specified ancestor channel.
 554    pub async fn get_user_channels(
 555        &self,
 556        user_id: UserId,
 557        ancestor_channel: Option<&channel::Model>,
 558        tx: &DatabaseTransaction,
 559    ) -> Result<ChannelsForUser> {
 560        let mut filter = channel_member::Column::UserId
 561            .eq(user_id)
 562            .and(channel_member::Column::Accepted.eq(true));
 563
 564        if let Some(ancestor) = ancestor_channel {
 565            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 566        }
 567
 568        let channel_memberships = channel_member::Entity::find()
 569            .filter(filter)
 570            .all(&*tx)
 571            .await?;
 572
 573        let channels = channel::Entity::find()
 574            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 575            .all(&*tx)
 576            .await?;
 577
 578        let mut descendants = self
 579            .get_channel_descendants_excluding_self(channels.iter(), &*tx)
 580            .await?;
 581
 582        for channel in channels {
 583            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 584                descendants.insert(ix, channel);
 585            }
 586        }
 587
 588        let roles_by_channel_id = channel_memberships
 589            .iter()
 590            .map(|membership| (membership.channel_id, membership.role))
 591            .collect::<HashMap<_, _>>();
 592
 593        let channels: Vec<Channel> = descendants
 594            .into_iter()
 595            .filter_map(|channel| {
 596                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 597                if parent_role.can_see_channel(channel.visibility) {
 598                    Some(Channel::from_model(channel))
 599                } else {
 600                    None
 601                }
 602            })
 603            .collect();
 604
 605        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 606        enum QueryUserIdsAndChannelIds {
 607            ChannelId,
 608            UserId,
 609        }
 610
 611        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 612        {
 613            let mut rows = room_participant::Entity::find()
 614                .inner_join(room::Entity)
 615                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 616                .select_only()
 617                .column(room::Column::ChannelId)
 618                .column(room_participant::Column::UserId)
 619                .into_values::<_, QueryUserIdsAndChannelIds>()
 620                .stream(&*tx)
 621                .await?;
 622            while let Some(row) = rows.next().await {
 623                let row: (ChannelId, UserId) = row?;
 624                channel_participants.entry(row.0).or_default().push(row.1)
 625            }
 626        }
 627
 628        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 629
 630        let mut channel_ids_by_buffer_id = HashMap::default();
 631        let mut rows = buffer::Entity::find()
 632            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 633            .stream(&*tx)
 634            .await?;
 635        while let Some(row) = rows.next().await {
 636            let row = row?;
 637            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 638        }
 639        drop(rows);
 640
 641        let latest_buffer_versions = self
 642            .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
 643            .await?;
 644
 645        let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
 646
 647        let observed_buffer_versions = self
 648            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
 649            .await?;
 650
 651        let observed_channel_messages = self
 652            .observed_channel_messages(&channel_ids, user_id, &*tx)
 653            .await?;
 654
 655        let hosted_projects = self
 656            .get_hosted_projects(&channel_ids, &roles_by_channel_id, &*tx)
 657            .await?;
 658
 659        Ok(ChannelsForUser {
 660            channel_memberships,
 661            channels,
 662            hosted_projects,
 663            channel_participants,
 664            latest_buffer_versions,
 665            latest_channel_messages,
 666            observed_buffer_versions,
 667            observed_channel_messages,
 668        })
 669    }
 670
 671    /// Sets the role for the specified channel member.
 672    pub async fn set_channel_member_role(
 673        &self,
 674        channel_id: ChannelId,
 675        admin_id: UserId,
 676        for_user: UserId,
 677        role: ChannelRole,
 678    ) -> Result<SetMemberRoleResult> {
 679        self.transaction(|tx| async move {
 680            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 681            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 682                .await?;
 683
 684            let membership = channel_member::Entity::find()
 685                .filter(
 686                    channel_member::Column::ChannelId
 687                        .eq(channel_id)
 688                        .and(channel_member::Column::UserId.eq(for_user)),
 689                )
 690                .one(&*tx)
 691                .await?;
 692
 693            let Some(membership) = membership else {
 694                Err(anyhow!("no such member"))?
 695            };
 696
 697            let mut update = membership.into_active_model();
 698            update.role = ActiveValue::Set(role);
 699            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 700
 701            if updated.accepted {
 702                Ok(SetMemberRoleResult::MembershipUpdated(
 703                    self.calculate_membership_updated(&channel, for_user, &*tx)
 704                        .await?,
 705                ))
 706            } else {
 707                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 708                    channel,
 709                )))
 710            }
 711        })
 712        .await
 713    }
 714
 715    /// Returns the details for the specified channel member.
 716    pub async fn get_channel_participant_details(
 717        &self,
 718        channel_id: ChannelId,
 719        user_id: UserId,
 720    ) -> Result<Vec<proto::ChannelMember>> {
 721        let (role, members) = self
 722            .transaction(move |tx| async move {
 723                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 724                let role = self
 725                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 726                    .await?;
 727                Ok((
 728                    role,
 729                    self.get_channel_participant_details_internal(&channel, &*tx)
 730                        .await?,
 731                ))
 732            })
 733            .await?;
 734
 735        if role == ChannelRole::Admin {
 736            Ok(members
 737                .into_iter()
 738                .map(|channel_member| proto::ChannelMember {
 739                    role: channel_member.role.into(),
 740                    user_id: channel_member.user_id.to_proto(),
 741                    kind: if channel_member.accepted {
 742                        Kind::Member
 743                    } else {
 744                        Kind::Invitee
 745                    }
 746                    .into(),
 747                })
 748                .collect())
 749        } else {
 750            return Ok(members
 751                .into_iter()
 752                .filter_map(|member| {
 753                    if !member.accepted {
 754                        return None;
 755                    }
 756                    Some(proto::ChannelMember {
 757                        role: member.role.into(),
 758                        user_id: member.user_id.to_proto(),
 759                        kind: Kind::Member.into(),
 760                    })
 761                })
 762                .collect());
 763        }
 764    }
 765
 766    async fn get_channel_participant_details_internal(
 767        &self,
 768        channel: &channel::Model,
 769        tx: &DatabaseTransaction,
 770    ) -> Result<Vec<channel_member::Model>> {
 771        Ok(channel_member::Entity::find()
 772            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 773            .all(tx)
 774            .await?)
 775    }
 776
 777    /// Returns the participants in the given channel.
 778    pub async fn get_channel_participants(
 779        &self,
 780        channel: &channel::Model,
 781        tx: &DatabaseTransaction,
 782    ) -> Result<Vec<UserId>> {
 783        let participants = self
 784            .get_channel_participant_details_internal(channel, &*tx)
 785            .await?;
 786        Ok(participants
 787            .into_iter()
 788            .map(|member| member.user_id)
 789            .collect())
 790    }
 791
 792    /// Returns whether the given user is an admin in the specified channel.
 793    pub async fn check_user_is_channel_admin(
 794        &self,
 795        channel: &channel::Model,
 796        user_id: UserId,
 797        tx: &DatabaseTransaction,
 798    ) -> Result<ChannelRole> {
 799        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 800        match role {
 801            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 802            Some(ChannelRole::Member)
 803            | Some(ChannelRole::Talker)
 804            | Some(ChannelRole::Banned)
 805            | Some(ChannelRole::Guest)
 806            | None => Err(anyhow!(
 807                "user is not a channel admin or channel does not exist"
 808            ))?,
 809        }
 810    }
 811
 812    /// Returns whether the given user is a member of the specified channel.
 813    pub async fn check_user_is_channel_member(
 814        &self,
 815        channel: &channel::Model,
 816        user_id: UserId,
 817        tx: &DatabaseTransaction,
 818    ) -> Result<ChannelRole> {
 819        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 820        match channel_role {
 821            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 822            Some(ChannelRole::Banned)
 823            | Some(ChannelRole::Guest)
 824            | Some(ChannelRole::Talker)
 825            | None => Err(anyhow!(
 826                "user is not a channel member or channel does not exist"
 827            ))?,
 828        }
 829    }
 830
 831    /// Returns whether the given user is a participant in the specified channel.
 832    pub async fn check_user_is_channel_participant(
 833        &self,
 834        channel: &channel::Model,
 835        user_id: UserId,
 836        tx: &DatabaseTransaction,
 837    ) -> Result<ChannelRole> {
 838        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 839        match role {
 840            Some(ChannelRole::Admin)
 841            | Some(ChannelRole::Member)
 842            | Some(ChannelRole::Guest)
 843            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 844            Some(ChannelRole::Banned) | None => Err(anyhow!(
 845                "user is not a channel participant or channel does not exist"
 846            ))?,
 847        }
 848    }
 849
 850    /// Returns a user's pending invite for the given channel, if one exists.
 851    pub async fn pending_invite_for_channel(
 852        &self,
 853        channel: &channel::Model,
 854        user_id: UserId,
 855        tx: &DatabaseTransaction,
 856    ) -> Result<Option<channel_member::Model>> {
 857        let row = channel_member::Entity::find()
 858            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 859            .filter(channel_member::Column::UserId.eq(user_id))
 860            .filter(channel_member::Column::Accepted.eq(false))
 861            .one(&*tx)
 862            .await?;
 863
 864        Ok(row)
 865    }
 866
 867    /// Returns the role for a user in the given channel.
 868    pub async fn channel_role_for_user(
 869        &self,
 870        channel: &channel::Model,
 871        user_id: UserId,
 872        tx: &DatabaseTransaction,
 873    ) -> Result<Option<ChannelRole>> {
 874        let membership = channel_member::Entity::find()
 875            .filter(
 876                channel_member::Column::ChannelId
 877                    .eq(channel.root_id())
 878                    .and(channel_member::Column::UserId.eq(user_id))
 879                    .and(channel_member::Column::Accepted.eq(true)),
 880            )
 881            .one(&*tx)
 882            .await?;
 883
 884        let Some(membership) = membership else {
 885            return Ok(None);
 886        };
 887
 888        if !membership.role.can_see_channel(channel.visibility) {
 889            return Ok(None);
 890        }
 891
 892        Ok(Some(membership.role))
 893    }
 894
 895    // Get the descendants of the given set if channels, ordered by their
 896    // path.
 897    pub(crate) async fn get_channel_descendants_excluding_self(
 898        &self,
 899        channels: impl IntoIterator<Item = &channel::Model>,
 900        tx: &DatabaseTransaction,
 901    ) -> Result<Vec<channel::Model>> {
 902        let mut filter = Condition::any();
 903        for channel in channels.into_iter() {
 904            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 905        }
 906
 907        if filter.is_empty() {
 908            return Ok(vec![]);
 909        }
 910
 911        Ok(channel::Entity::find()
 912            .filter(filter)
 913            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 914            .all(tx)
 915            .await?)
 916    }
 917
 918    /// Returns the channel with the given ID.
 919    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 920        self.transaction(|tx| async move {
 921            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 922            self.check_user_is_channel_participant(&channel, user_id, &*tx)
 923                .await?;
 924
 925            Ok(Channel::from_model(channel))
 926        })
 927        .await
 928    }
 929
 930    pub(crate) async fn get_channel_internal(
 931        &self,
 932        channel_id: ChannelId,
 933        tx: &DatabaseTransaction,
 934    ) -> Result<channel::Model> {
 935        Ok(channel::Entity::find_by_id(channel_id)
 936            .one(&*tx)
 937            .await?
 938            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 939    }
 940
 941    pub(crate) async fn get_or_create_channel_room(
 942        &self,
 943        channel_id: ChannelId,
 944        live_kit_room: &str,
 945        tx: &DatabaseTransaction,
 946    ) -> Result<RoomId> {
 947        let room = room::Entity::find()
 948            .filter(room::Column::ChannelId.eq(channel_id))
 949            .one(&*tx)
 950            .await?;
 951
 952        let room_id = if let Some(room) = room {
 953            room.id
 954        } else {
 955            let result = room::Entity::insert(room::ActiveModel {
 956                channel_id: ActiveValue::Set(Some(channel_id)),
 957                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 958                ..Default::default()
 959            })
 960            .exec(&*tx)
 961            .await?;
 962
 963            result.last_insert_id
 964        };
 965
 966        Ok(room_id)
 967    }
 968
 969    /// Move a channel from one parent to another
 970    pub async fn move_channel(
 971        &self,
 972        channel_id: ChannelId,
 973        new_parent_id: ChannelId,
 974        admin_id: UserId,
 975    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 976        self.transaction(|tx| async move {
 977            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 978            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 979                .await?;
 980            let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
 981
 982            if new_parent.root_id() != channel.root_id() {
 983                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 984            }
 985
 986            if new_parent
 987                .ancestors_including_self()
 988                .any(|id| id == channel.id)
 989            {
 990                Err(anyhow!(ErrorCode::CircularNesting))?;
 991            }
 992
 993            if channel.visibility == ChannelVisibility::Public
 994                && new_parent.visibility != ChannelVisibility::Public
 995            {
 996                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 997            }
 998
 999            let root_id = channel.root_id();
1000            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1001            let new_path = format!("{}{}/", new_parent.path(), channel.id);
1002
1003            let mut model = channel.into_active_model();
1004            model.parent_path = ActiveValue::Set(new_parent.path());
1005            let channel = model.update(&*tx).await?;
1006
1007            let descendent_ids =
1008                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1009                    self.pool.get_database_backend(),
1010                    "
1011                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1012                    WHERE parent_path LIKE $3 || '%'
1013                    RETURNING id
1014                ",
1015                    [old_path.clone().into(), new_path.into(), old_path.into()],
1016                ))
1017                .all(&*tx)
1018                .await?;
1019
1020            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1021
1022            let channels = channel::Entity::find()
1023                .filter(channel::Column::Id.is_in(all_moved_ids))
1024                .all(&*tx)
1025                .await?
1026                .into_iter()
1027                .map(|c| Channel::from_model(c))
1028                .collect::<Vec<_>>();
1029
1030            let channel_members = channel_member::Entity::find()
1031                .filter(channel_member::Column::ChannelId.eq(root_id))
1032                .all(&*tx)
1033                .await?;
1034
1035            Ok((channels, channel_members))
1036        })
1037        .await
1038    }
1039}
1040
1041#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1042enum QueryIds {
1043    Id,
1044}
1045
1046#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1047enum QueryUserIds {
1048    UserId,
1049}