channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    pub async fn set_in_channel_call(
 101        &self,
 102        channel_id: ChannelId,
 103        user_id: UserId,
 104        in_call: bool,
 105    ) -> Result<(proto::Room, ChannelRole)> {
 106        self.transaction(move |tx| async move {
 107            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 108            let role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 109            if role.is_none() || role == Some(ChannelRole::Banned) {
 110                Err(ErrorCode::Forbidden.anyhow())?
 111            }
 112            let role = role.unwrap();
 113
 114            let Some(room) = room::Entity::find()
 115                .filter(room::Column::ChannelId.eq(channel_id))
 116                .one(&*tx)
 117                .await?
 118            else {
 119                Err(anyhow!("no room exists"))?
 120            };
 121
 122            let result = room_participant::Entity::update_many()
 123                .filter(
 124                    Condition::all()
 125                        .add(room_participant::Column::RoomId.eq(room.id))
 126                        .add(room_participant::Column::UserId.eq(user_id)),
 127                )
 128                .set(room_participant::ActiveModel {
 129                    in_call: ActiveValue::Set(in_call),
 130                    ..Default::default()
 131                })
 132                .exec(&*tx)
 133                .await?;
 134
 135            if result.rows_affected != 1 {
 136                Err(anyhow!("not in channel"))?
 137            }
 138
 139            let room = self.get_room(room.id, &*tx).await?;
 140            Ok((room, role))
 141        })
 142        .await
 143    }
 144
 145    /// Adds a user to the specified channel.
 146    pub async fn join_channel(
 147        &self,
 148        channel_id: ChannelId,
 149        user_id: UserId,
 150        autojoin: bool,
 151        connection: ConnectionId,
 152        environment: &str,
 153    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 154        self.transaction(move |tx| async move {
 155            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 156            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 157
 158            let mut accept_invite_result = None;
 159
 160            if role.is_none() {
 161                if let Some(invitation) = self
 162                    .pending_invite_for_channel(&channel, user_id, &*tx)
 163                    .await?
 164                {
 165                    // note, this may be a parent channel
 166                    role = Some(invitation.role);
 167                    channel_member::Entity::update(channel_member::ActiveModel {
 168                        accepted: ActiveValue::Set(true),
 169                        ..invitation.into_active_model()
 170                    })
 171                    .exec(&*tx)
 172                    .await?;
 173
 174                    accept_invite_result = Some(
 175                        self.calculate_membership_updated(&channel, user_id, &*tx)
 176                            .await?,
 177                    );
 178
 179                    debug_assert!(
 180                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 181                    );
 182                } else if channel.visibility == ChannelVisibility::Public {
 183                    role = Some(ChannelRole::Guest);
 184                    channel_member::Entity::insert(channel_member::ActiveModel {
 185                        id: ActiveValue::NotSet,
 186                        channel_id: ActiveValue::Set(channel.root_id()),
 187                        user_id: ActiveValue::Set(user_id),
 188                        accepted: ActiveValue::Set(true),
 189                        role: ActiveValue::Set(ChannelRole::Guest),
 190                    })
 191                    .exec(&*tx)
 192                    .await?;
 193
 194                    accept_invite_result = Some(
 195                        self.calculate_membership_updated(&channel, user_id, &*tx)
 196                            .await?,
 197                    );
 198
 199                    debug_assert!(
 200                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 201                    );
 202                }
 203            }
 204
 205            if role.is_none() || role == Some(ChannelRole::Banned) {
 206                Err(ErrorCode::Forbidden.anyhow())?
 207            }
 208            let role = role.unwrap();
 209
 210            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 211            let room_id = self
 212                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 213                .await?;
 214
 215            self.join_channel_room_internal(room_id, user_id, autojoin, connection, role, &*tx)
 216                .await
 217                .map(|jr| (jr, accept_invite_result, role))
 218        })
 219        .await
 220    }
 221
 222    /// Sets the visibility of the given channel.
 223    pub async fn set_channel_visibility(
 224        &self,
 225        channel_id: ChannelId,
 226        visibility: ChannelVisibility,
 227        admin_id: UserId,
 228    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 229        self.transaction(move |tx| async move {
 230            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 231            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 232                .await?;
 233
 234            if visibility == ChannelVisibility::Public {
 235                if let Some(parent_id) = channel.parent_id() {
 236                    let parent = self.get_channel_internal(parent_id, &*tx).await?;
 237
 238                    if parent.visibility != ChannelVisibility::Public {
 239                        Err(ErrorCode::BadPublicNesting
 240                            .with_tag("direction", "parent")
 241                            .anyhow())?;
 242                    }
 243                }
 244            } else if visibility == ChannelVisibility::Members {
 245                if self
 246                    .get_channel_descendants_excluding_self([&channel], &*tx)
 247                    .await?
 248                    .into_iter()
 249                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 250                {
 251                    Err(ErrorCode::BadPublicNesting
 252                        .with_tag("direction", "children")
 253                        .anyhow())?;
 254                }
 255            }
 256
 257            let mut model = channel.into_active_model();
 258            model.visibility = ActiveValue::Set(visibility);
 259            let channel = model.update(&*tx).await?;
 260
 261            let channel_members = channel_member::Entity::find()
 262                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 263                .all(&*tx)
 264                .await?;
 265
 266            Ok((Channel::from_model(channel), channel_members))
 267        })
 268        .await
 269    }
 270
 271    #[cfg(test)]
 272    pub async fn set_channel_requires_zed_cla(
 273        &self,
 274        channel_id: ChannelId,
 275        requires_zed_cla: bool,
 276    ) -> Result<()> {
 277        self.transaction(move |tx| async move {
 278            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 279            let mut model = channel.into_active_model();
 280            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 281            model.update(&*tx).await?;
 282            Ok(())
 283        })
 284        .await
 285    }
 286
 287    /// Deletes the channel with the specified ID.
 288    pub async fn delete_channel(
 289        &self,
 290        channel_id: ChannelId,
 291        user_id: UserId,
 292    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 293        self.transaction(move |tx| async move {
 294            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 295            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 296                .await?;
 297
 298            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 299                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 300                .select_only()
 301                .column(channel_member::Column::UserId)
 302                .distinct()
 303                .into_values::<_, QueryUserIds>()
 304                .all(&*tx)
 305                .await?;
 306
 307            let channels_to_remove = self
 308                .get_channel_descendants_excluding_self([&channel], &*tx)
 309                .await?
 310                .into_iter()
 311                .map(|channel| channel.id)
 312                .chain(Some(channel_id))
 313                .collect::<Vec<_>>();
 314
 315            channel::Entity::delete_many()
 316                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 317                .exec(&*tx)
 318                .await?;
 319
 320            Ok((channels_to_remove, members_to_notify))
 321        })
 322        .await
 323    }
 324
 325    /// Invites a user to a channel as a member.
 326    pub async fn invite_channel_member(
 327        &self,
 328        channel_id: ChannelId,
 329        invitee_id: UserId,
 330        inviter_id: UserId,
 331        role: ChannelRole,
 332    ) -> Result<InviteMemberResult> {
 333        self.transaction(move |tx| async move {
 334            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 335            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 336                .await?;
 337            if !channel.is_root() {
 338                Err(ErrorCode::NotARootChannel.anyhow())?
 339            }
 340
 341            channel_member::ActiveModel {
 342                id: ActiveValue::NotSet,
 343                channel_id: ActiveValue::Set(channel_id),
 344                user_id: ActiveValue::Set(invitee_id),
 345                accepted: ActiveValue::Set(false),
 346                role: ActiveValue::Set(role),
 347            }
 348            .insert(&*tx)
 349            .await?;
 350
 351            let channel = Channel::from_model(channel);
 352
 353            let notifications = self
 354                .create_notification(
 355                    invitee_id,
 356                    rpc::Notification::ChannelInvitation {
 357                        channel_id: channel_id.to_proto(),
 358                        channel_name: channel.name.clone(),
 359                        inviter_id: inviter_id.to_proto(),
 360                    },
 361                    true,
 362                    &*tx,
 363                )
 364                .await?
 365                .into_iter()
 366                .collect();
 367
 368            Ok(InviteMemberResult {
 369                channel,
 370                notifications,
 371            })
 372        })
 373        .await
 374    }
 375
 376    fn sanitize_channel_name(name: &str) -> Result<&str> {
 377        let new_name = name.trim().trim_start_matches('#');
 378        if new_name == "" {
 379            Err(anyhow!("channel name can't be blank"))?;
 380        }
 381        Ok(new_name)
 382    }
 383
 384    /// Renames the specified channel.
 385    pub async fn rename_channel(
 386        &self,
 387        channel_id: ChannelId,
 388        admin_id: UserId,
 389        new_name: &str,
 390    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 391        self.transaction(move |tx| async move {
 392            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 393
 394            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 395            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 396                .await?;
 397
 398            let mut model = channel.into_active_model();
 399            model.name = ActiveValue::Set(new_name.clone());
 400            let channel = model.update(&*tx).await?;
 401
 402            let channel_members = channel_member::Entity::find()
 403                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 404                .all(&*tx)
 405                .await?;
 406
 407            Ok((Channel::from_model(channel), channel_members))
 408        })
 409        .await
 410    }
 411
 412    /// accept or decline an invite to join a channel
 413    pub async fn respond_to_channel_invite(
 414        &self,
 415        channel_id: ChannelId,
 416        user_id: UserId,
 417        accept: bool,
 418    ) -> Result<RespondToChannelInvite> {
 419        self.transaction(move |tx| async move {
 420            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 421
 422            let membership_update = if accept {
 423                let rows_affected = channel_member::Entity::update_many()
 424                    .set(channel_member::ActiveModel {
 425                        accepted: ActiveValue::Set(accept),
 426                        ..Default::default()
 427                    })
 428                    .filter(
 429                        channel_member::Column::ChannelId
 430                            .eq(channel_id)
 431                            .and(channel_member::Column::UserId.eq(user_id))
 432                            .and(channel_member::Column::Accepted.eq(false)),
 433                    )
 434                    .exec(&*tx)
 435                    .await?
 436                    .rows_affected;
 437
 438                if rows_affected == 0 {
 439                    Err(anyhow!("no such invitation"))?;
 440                }
 441
 442                Some(
 443                    self.calculate_membership_updated(&channel, user_id, &*tx)
 444                        .await?,
 445                )
 446            } else {
 447                let rows_affected = channel_member::Entity::delete_many()
 448                    .filter(
 449                        channel_member::Column::ChannelId
 450                            .eq(channel_id)
 451                            .and(channel_member::Column::UserId.eq(user_id))
 452                            .and(channel_member::Column::Accepted.eq(false)),
 453                    )
 454                    .exec(&*tx)
 455                    .await?
 456                    .rows_affected;
 457                if rows_affected == 0 {
 458                    Err(anyhow!("no such invitation"))?;
 459                }
 460
 461                None
 462            };
 463
 464            Ok(RespondToChannelInvite {
 465                membership_update,
 466                notifications: self
 467                    .mark_notification_as_read_with_response(
 468                        user_id,
 469                        &rpc::Notification::ChannelInvitation {
 470                            channel_id: channel_id.to_proto(),
 471                            channel_name: Default::default(),
 472                            inviter_id: Default::default(),
 473                        },
 474                        accept,
 475                        &*tx,
 476                    )
 477                    .await?
 478                    .into_iter()
 479                    .collect(),
 480            })
 481        })
 482        .await
 483    }
 484
 485    async fn calculate_membership_updated(
 486        &self,
 487        channel: &channel::Model,
 488        user_id: UserId,
 489        tx: &DatabaseTransaction,
 490    ) -> Result<MembershipUpdated> {
 491        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 492        let removed_channels = self
 493            .get_channel_descendants_excluding_self([channel], &*tx)
 494            .await?
 495            .into_iter()
 496            .map(|channel| channel.id)
 497            .chain([channel.id])
 498            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 499            .collect::<Vec<_>>();
 500
 501        Ok(MembershipUpdated {
 502            channel_id: channel.id,
 503            new_channels,
 504            removed_channels,
 505        })
 506    }
 507
 508    /// Removes a channel member.
 509    pub async fn remove_channel_member(
 510        &self,
 511        channel_id: ChannelId,
 512        member_id: UserId,
 513        admin_id: UserId,
 514    ) -> Result<RemoveChannelMemberResult> {
 515        self.transaction(|tx| async move {
 516            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 517
 518            if member_id != admin_id {
 519                self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 520                    .await?;
 521            }
 522
 523            let result = channel_member::Entity::delete_many()
 524                .filter(
 525                    channel_member::Column::ChannelId
 526                        .eq(channel_id)
 527                        .and(channel_member::Column::UserId.eq(member_id)),
 528                )
 529                .exec(&*tx)
 530                .await?;
 531
 532            if result.rows_affected == 0 {
 533                Err(anyhow!("no such member"))?;
 534            }
 535
 536            Ok(RemoveChannelMemberResult {
 537                membership_update: self
 538                    .calculate_membership_updated(&channel, member_id, &*tx)
 539                    .await?,
 540                notification_id: self
 541                    .remove_notification(
 542                        member_id,
 543                        rpc::Notification::ChannelInvitation {
 544                            channel_id: channel_id.to_proto(),
 545                            channel_name: Default::default(),
 546                            inviter_id: Default::default(),
 547                        },
 548                        &*tx,
 549                    )
 550                    .await?,
 551            })
 552        })
 553        .await
 554    }
 555
 556    /// Returns all channel invites for the user with the given ID.
 557    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 558        self.transaction(|tx| async move {
 559            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 560
 561            let channel_invites = channel_member::Entity::find()
 562                .filter(
 563                    channel_member::Column::UserId
 564                        .eq(user_id)
 565                        .and(channel_member::Column::Accepted.eq(false)),
 566                )
 567                .all(&*tx)
 568                .await?;
 569
 570            for invite in channel_invites {
 571                role_for_channel.insert(invite.channel_id, invite.role);
 572            }
 573
 574            let channels = channel::Entity::find()
 575                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 576                .all(&*tx)
 577                .await?;
 578
 579            let channels = channels
 580                .into_iter()
 581                .filter_map(|channel| Some(Channel::from_model(channel)))
 582                .collect();
 583
 584            Ok(channels)
 585        })
 586        .await
 587    }
 588
 589    /// Returns all channels for the user with the given ID.
 590    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 591        self.transaction(|tx| async move {
 592            let tx = tx;
 593
 594            self.get_user_channels(user_id, None, &tx).await
 595        })
 596        .await
 597    }
 598
 599    /// Returns all channels for the user with the given ID that are descendants
 600    /// of the specified ancestor channel.
 601    pub async fn get_user_channels(
 602        &self,
 603        user_id: UserId,
 604        ancestor_channel: Option<&channel::Model>,
 605        tx: &DatabaseTransaction,
 606    ) -> Result<ChannelsForUser> {
 607        let mut filter = channel_member::Column::UserId
 608            .eq(user_id)
 609            .and(channel_member::Column::Accepted.eq(true));
 610
 611        if let Some(ancestor) = ancestor_channel {
 612            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 613        }
 614
 615        let channel_memberships = channel_member::Entity::find()
 616            .filter(filter)
 617            .all(&*tx)
 618            .await?;
 619
 620        let channels = channel::Entity::find()
 621            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 622            .all(&*tx)
 623            .await?;
 624
 625        let mut descendants = self
 626            .get_channel_descendants_excluding_self(channels.iter(), &*tx)
 627            .await?;
 628
 629        for channel in channels {
 630            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 631                descendants.insert(ix, channel);
 632            }
 633        }
 634
 635        let roles_by_channel_id = channel_memberships
 636            .iter()
 637            .map(|membership| (membership.channel_id, membership.role))
 638            .collect::<HashMap<_, _>>();
 639
 640        let channels: Vec<Channel> = descendants
 641            .into_iter()
 642            .filter_map(|channel| {
 643                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 644                if parent_role.can_see_channel(channel.visibility) {
 645                    Some(Channel::from_model(channel))
 646                } else {
 647                    None
 648                }
 649            })
 650            .collect();
 651
 652        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 653        enum QueryUserIdsAndChannelIds {
 654            ChannelId,
 655            UserId,
 656        }
 657
 658        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 659        {
 660            let mut rows = room_participant::Entity::find()
 661                .inner_join(room::Entity)
 662                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 663                .select_only()
 664                .column(room::Column::ChannelId)
 665                .column(room_participant::Column::UserId)
 666                .into_values::<_, QueryUserIdsAndChannelIds>()
 667                .stream(&*tx)
 668                .await?;
 669            while let Some(row) = rows.next().await {
 670                let row: (ChannelId, UserId) = row?;
 671                channel_participants.entry(row.0).or_default().push(row.1)
 672            }
 673        }
 674
 675        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 676
 677        let mut channel_ids_by_buffer_id = HashMap::default();
 678        let mut rows = buffer::Entity::find()
 679            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 680            .stream(&*tx)
 681            .await?;
 682        while let Some(row) = rows.next().await {
 683            let row = row?;
 684            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 685        }
 686        drop(rows);
 687
 688        let latest_buffer_versions = self
 689            .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
 690            .await?;
 691
 692        let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
 693
 694        let observed_buffer_versions = self
 695            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
 696            .await?;
 697
 698        let observed_channel_messages = self
 699            .observed_channel_messages(&channel_ids, user_id, &*tx)
 700            .await?;
 701
 702        Ok(ChannelsForUser {
 703            channel_memberships,
 704            channels,
 705            channel_participants,
 706            latest_buffer_versions,
 707            latest_channel_messages,
 708            observed_buffer_versions,
 709            observed_channel_messages,
 710        })
 711    }
 712
 713    /// Sets the role for the specified channel member.
 714    pub async fn set_channel_member_role(
 715        &self,
 716        channel_id: ChannelId,
 717        admin_id: UserId,
 718        for_user: UserId,
 719        role: ChannelRole,
 720    ) -> Result<SetMemberRoleResult> {
 721        self.transaction(|tx| async move {
 722            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 723            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 724                .await?;
 725
 726            let membership = channel_member::Entity::find()
 727                .filter(
 728                    channel_member::Column::ChannelId
 729                        .eq(channel_id)
 730                        .and(channel_member::Column::UserId.eq(for_user)),
 731                )
 732                .one(&*tx)
 733                .await?;
 734
 735            let Some(membership) = membership else {
 736                Err(anyhow!("no such member"))?
 737            };
 738
 739            let mut update = membership.into_active_model();
 740            update.role = ActiveValue::Set(role);
 741            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 742
 743            if updated.accepted {
 744                Ok(SetMemberRoleResult::MembershipUpdated(
 745                    self.calculate_membership_updated(&channel, for_user, &*tx)
 746                        .await?,
 747                ))
 748            } else {
 749                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 750                    channel,
 751                )))
 752            }
 753        })
 754        .await
 755    }
 756
 757    /// Returns the details for the specified channel member.
 758    pub async fn get_channel_participant_details(
 759        &self,
 760        channel_id: ChannelId,
 761        user_id: UserId,
 762    ) -> Result<Vec<proto::ChannelMember>> {
 763        let (role, members) = self
 764            .transaction(move |tx| async move {
 765                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 766                let role = self
 767                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 768                    .await?;
 769                Ok((
 770                    role,
 771                    self.get_channel_participant_details_internal(&channel, &*tx)
 772                        .await?,
 773                ))
 774            })
 775            .await?;
 776
 777        if role == ChannelRole::Admin {
 778            Ok(members
 779                .into_iter()
 780                .map(|channel_member| proto::ChannelMember {
 781                    role: channel_member.role.into(),
 782                    user_id: channel_member.user_id.to_proto(),
 783                    kind: if channel_member.accepted {
 784                        Kind::Member
 785                    } else {
 786                        Kind::Invitee
 787                    }
 788                    .into(),
 789                })
 790                .collect())
 791        } else {
 792            return Ok(members
 793                .into_iter()
 794                .filter_map(|member| {
 795                    if !member.accepted {
 796                        return None;
 797                    }
 798                    Some(proto::ChannelMember {
 799                        role: member.role.into(),
 800                        user_id: member.user_id.to_proto(),
 801                        kind: Kind::Member.into(),
 802                    })
 803                })
 804                .collect());
 805        }
 806    }
 807
 808    async fn get_channel_participant_details_internal(
 809        &self,
 810        channel: &channel::Model,
 811        tx: &DatabaseTransaction,
 812    ) -> Result<Vec<channel_member::Model>> {
 813        Ok(channel_member::Entity::find()
 814            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 815            .all(tx)
 816            .await?)
 817    }
 818
 819    /// Returns the participants in the given channel.
 820    pub async fn get_channel_participants(
 821        &self,
 822        channel: &channel::Model,
 823        tx: &DatabaseTransaction,
 824    ) -> Result<Vec<UserId>> {
 825        let participants = self
 826            .get_channel_participant_details_internal(channel, &*tx)
 827            .await?;
 828        Ok(participants
 829            .into_iter()
 830            .map(|member| member.user_id)
 831            .collect())
 832    }
 833
 834    /// Returns whether the given user is an admin in the specified channel.
 835    pub async fn check_user_is_channel_admin(
 836        &self,
 837        channel: &channel::Model,
 838        user_id: UserId,
 839        tx: &DatabaseTransaction,
 840    ) -> Result<ChannelRole> {
 841        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 842        match role {
 843            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 844            Some(ChannelRole::Member)
 845            | Some(ChannelRole::Banned)
 846            | Some(ChannelRole::Guest)
 847            | None => Err(anyhow!(
 848                "user is not a channel admin or channel does not exist"
 849            ))?,
 850        }
 851    }
 852
 853    /// Returns whether the given user is a member of the specified channel.
 854    pub async fn check_user_is_channel_member(
 855        &self,
 856        channel: &channel::Model,
 857        user_id: UserId,
 858        tx: &DatabaseTransaction,
 859    ) -> Result<ChannelRole> {
 860        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 861        match channel_role {
 862            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 863            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 864                "user is not a channel member or channel does not exist"
 865            ))?,
 866        }
 867    }
 868
 869    /// Returns whether the given user is a participant in the specified channel.
 870    pub async fn check_user_is_channel_participant(
 871        &self,
 872        channel: &channel::Model,
 873        user_id: UserId,
 874        tx: &DatabaseTransaction,
 875    ) -> Result<ChannelRole> {
 876        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 877        match role {
 878            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 879                Ok(role.unwrap())
 880            }
 881            Some(ChannelRole::Banned) | None => Err(anyhow!(
 882                "user is not a channel participant or channel does not exist"
 883            ))?,
 884        }
 885    }
 886
 887    /// Returns a user's pending invite for the given channel, if one exists.
 888    pub async fn pending_invite_for_channel(
 889        &self,
 890        channel: &channel::Model,
 891        user_id: UserId,
 892        tx: &DatabaseTransaction,
 893    ) -> Result<Option<channel_member::Model>> {
 894        let row = channel_member::Entity::find()
 895            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 896            .filter(channel_member::Column::UserId.eq(user_id))
 897            .filter(channel_member::Column::Accepted.eq(false))
 898            .one(&*tx)
 899            .await?;
 900
 901        Ok(row)
 902    }
 903
 904    /// Returns the role for a user in the given channel.
 905    pub async fn channel_role_for_user(
 906        &self,
 907        channel: &channel::Model,
 908        user_id: UserId,
 909        tx: &DatabaseTransaction,
 910    ) -> Result<Option<ChannelRole>> {
 911        let membership = channel_member::Entity::find()
 912            .filter(
 913                channel_member::Column::ChannelId
 914                    .eq(channel.root_id())
 915                    .and(channel_member::Column::UserId.eq(user_id))
 916                    .and(channel_member::Column::Accepted.eq(true)),
 917            )
 918            .one(&*tx)
 919            .await?;
 920
 921        let Some(membership) = membership else {
 922            return Ok(None);
 923        };
 924
 925        if !membership.role.can_see_channel(channel.visibility) {
 926            return Ok(None);
 927        }
 928
 929        Ok(Some(membership.role))
 930    }
 931
 932    // Get the descendants of the given set if channels, ordered by their
 933    // path.
 934    pub(crate) async fn get_channel_descendants_excluding_self(
 935        &self,
 936        channels: impl IntoIterator<Item = &channel::Model>,
 937        tx: &DatabaseTransaction,
 938    ) -> Result<Vec<channel::Model>> {
 939        let mut filter = Condition::any();
 940        for channel in channels.into_iter() {
 941            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 942        }
 943
 944        if filter.is_empty() {
 945            return Ok(vec![]);
 946        }
 947
 948        Ok(channel::Entity::find()
 949            .filter(filter)
 950            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 951            .all(tx)
 952            .await?)
 953    }
 954
 955    /// Returns the channel with the given ID.
 956    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 957        self.transaction(|tx| async move {
 958            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 959            self.check_user_is_channel_participant(&channel, user_id, &*tx)
 960                .await?;
 961
 962            Ok(Channel::from_model(channel))
 963        })
 964        .await
 965    }
 966
 967    pub(crate) async fn get_channel_internal(
 968        &self,
 969        channel_id: ChannelId,
 970        tx: &DatabaseTransaction,
 971    ) -> Result<channel::Model> {
 972        Ok(channel::Entity::find_by_id(channel_id)
 973            .one(&*tx)
 974            .await?
 975            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 976    }
 977
 978    pub(crate) async fn get_or_create_channel_room(
 979        &self,
 980        channel_id: ChannelId,
 981        live_kit_room: &str,
 982        environment: &str,
 983        tx: &DatabaseTransaction,
 984    ) -> Result<RoomId> {
 985        let room = room::Entity::find()
 986            .filter(room::Column::ChannelId.eq(channel_id))
 987            .one(&*tx)
 988            .await?;
 989
 990        let room_id = if let Some(room) = room {
 991            if let Some(env) = room.environment {
 992                if &env != environment {
 993                    Err(ErrorCode::WrongReleaseChannel
 994                        .with_tag("required", &env)
 995                        .anyhow())?;
 996                }
 997            }
 998            room.id
 999        } else {
1000            let result = room::Entity::insert(room::ActiveModel {
1001                channel_id: ActiveValue::Set(Some(channel_id)),
1002                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1003                environment: ActiveValue::Set(Some(environment.to_string())),
1004                ..Default::default()
1005            })
1006            .exec(&*tx)
1007            .await?;
1008
1009            result.last_insert_id
1010        };
1011
1012        Ok(room_id)
1013    }
1014
1015    /// Move a channel from one parent to another
1016    pub async fn move_channel(
1017        &self,
1018        channel_id: ChannelId,
1019        new_parent_id: ChannelId,
1020        admin_id: UserId,
1021    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
1022        self.transaction(|tx| async move {
1023            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1024            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1025                .await?;
1026            let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1027
1028            if new_parent.root_id() != channel.root_id() {
1029                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
1030            }
1031
1032            if new_parent
1033                .ancestors_including_self()
1034                .any(|id| id == channel.id)
1035            {
1036                Err(anyhow!(ErrorCode::CircularNesting))?;
1037            }
1038
1039            if channel.visibility == ChannelVisibility::Public
1040                && new_parent.visibility != ChannelVisibility::Public
1041            {
1042                Err(anyhow!(ErrorCode::BadPublicNesting))?;
1043            }
1044
1045            let root_id = channel.root_id();
1046            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1047            let new_path = format!("{}{}/", new_parent.path(), channel.id);
1048
1049            let mut model = channel.into_active_model();
1050            model.parent_path = ActiveValue::Set(new_parent.path());
1051            let channel = model.update(&*tx).await?;
1052
1053            let descendent_ids =
1054                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1055                    self.pool.get_database_backend(),
1056                    "
1057                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1058                    WHERE parent_path LIKE $3 || '%'
1059                    RETURNING id
1060                ",
1061                    [old_path.clone().into(), new_path.into(), old_path.into()],
1062                ))
1063                .all(&*tx)
1064                .await?;
1065
1066            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1067
1068            let channels = channel::Entity::find()
1069                .filter(channel::Column::Id.is_in(all_moved_ids))
1070                .all(&*tx)
1071                .await?
1072                .into_iter()
1073                .map(|c| Channel::from_model(c))
1074                .collect::<Vec<_>>();
1075
1076            let channel_members = channel_member::Entity::find()
1077                .filter(channel_member::Column::ChannelId.eq(root_id))
1078                .all(&*tx)
1079                .await?;
1080
1081            Ok((channels, channel_members))
1082        })
1083        .await
1084    }
1085}
1086
1087#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1088enum QueryIds {
1089    Id,
1090}
1091
1092#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1093enum QueryUserIds {
1094    UserId,
1095}