channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    /// Adds a user to the specified channel.
 101    pub async fn join_channel(
 102        &self,
 103        channel_id: ChannelId,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106        environment: &str,
 107    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 108        self.transaction(move |tx| async move {
 109            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 110            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 111
 112            let mut accept_invite_result = None;
 113
 114            if role.is_none() {
 115                if let Some(invitation) = self
 116                    .pending_invite_for_channel(&channel, user_id, &*tx)
 117                    .await?
 118                {
 119                    // note, this may be a parent channel
 120                    role = Some(invitation.role);
 121                    channel_member::Entity::update(channel_member::ActiveModel {
 122                        accepted: ActiveValue::Set(true),
 123                        ..invitation.into_active_model()
 124                    })
 125                    .exec(&*tx)
 126                    .await?;
 127
 128                    accept_invite_result = Some(
 129                        self.calculate_membership_updated(&channel, user_id, &*tx)
 130                            .await?,
 131                    );
 132
 133                    debug_assert!(
 134                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 135                    );
 136                } else if channel.visibility == ChannelVisibility::Public {
 137                    role = Some(ChannelRole::Guest);
 138                    channel_member::Entity::insert(channel_member::ActiveModel {
 139                        id: ActiveValue::NotSet,
 140                        channel_id: ActiveValue::Set(channel.root_id()),
 141                        user_id: ActiveValue::Set(user_id),
 142                        accepted: ActiveValue::Set(true),
 143                        role: ActiveValue::Set(ChannelRole::Guest),
 144                    })
 145                    .exec(&*tx)
 146                    .await?;
 147
 148                    accept_invite_result = Some(
 149                        self.calculate_membership_updated(&channel, user_id, &*tx)
 150                            .await?,
 151                    );
 152
 153                    debug_assert!(
 154                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 155                    );
 156                }
 157            }
 158
 159            if role.is_none() || role == Some(ChannelRole::Banned) {
 160                Err(ErrorCode::Forbidden.anyhow())?
 161            }
 162            let role = role.unwrap();
 163
 164            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 165            let room_id = self
 166                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 167                .await?;
 168
 169            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 170                .await
 171                .map(|jr| (jr, accept_invite_result, role))
 172        })
 173        .await
 174    }
 175
 176    /// Sets the visibiltity of the given channel.
 177    pub async fn set_channel_visibility(
 178        &self,
 179        channel_id: ChannelId,
 180        visibility: ChannelVisibility,
 181        admin_id: UserId,
 182    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 183        self.transaction(move |tx| async move {
 184            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 185            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 186                .await?;
 187
 188            if visibility == ChannelVisibility::Public {
 189                if let Some(parent_id) = channel.parent_id() {
 190                    let parent = self.get_channel_internal(parent_id, &*tx).await?;
 191
 192                    if parent.visibility != ChannelVisibility::Public {
 193                        Err(ErrorCode::BadPublicNesting
 194                            .with_tag("direction", "parent")
 195                            .anyhow())?;
 196                    }
 197                }
 198            } else if visibility == ChannelVisibility::Members {
 199                if self
 200                    .get_channel_descendants_including_self(vec![channel_id], &*tx)
 201                    .await?
 202                    .into_iter()
 203                    .any(|channel| {
 204                        channel.id != channel_id && channel.visibility == ChannelVisibility::Public
 205                    })
 206                {
 207                    Err(ErrorCode::BadPublicNesting
 208                        .with_tag("direction", "children")
 209                        .anyhow())?;
 210                }
 211            }
 212
 213            let mut model = channel.into_active_model();
 214            model.visibility = ActiveValue::Set(visibility);
 215            let channel = model.update(&*tx).await?;
 216
 217            let channel_members = channel_member::Entity::find()
 218                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 219                .all(&*tx)
 220                .await?;
 221
 222            Ok((Channel::from_model(channel), channel_members))
 223        })
 224        .await
 225    }
 226
 227    #[cfg(test)]
 228    pub async fn set_channel_requires_zed_cla(
 229        &self,
 230        channel_id: ChannelId,
 231        requires_zed_cla: bool,
 232    ) -> Result<()> {
 233        self.transaction(move |tx| async move {
 234            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 235            let mut model = channel.into_active_model();
 236            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 237            model.update(&*tx).await?;
 238            Ok(())
 239        })
 240        .await
 241    }
 242
 243    /// Deletes the channel with the specified ID.
 244    pub async fn delete_channel(
 245        &self,
 246        channel_id: ChannelId,
 247        user_id: UserId,
 248    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 249        self.transaction(move |tx| async move {
 250            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 251            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 252                .await?;
 253
 254            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 255                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 256                .select_only()
 257                .column(channel_member::Column::UserId)
 258                .distinct()
 259                .into_values::<_, QueryUserIds>()
 260                .all(&*tx)
 261                .await?;
 262
 263            let channels_to_remove = self
 264                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 265                .await?
 266                .into_iter()
 267                .map(|channel| channel.id)
 268                .collect::<Vec<_>>();
 269
 270            channel::Entity::delete_many()
 271                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 272                .exec(&*tx)
 273                .await?;
 274
 275            Ok((channels_to_remove, members_to_notify))
 276        })
 277        .await
 278    }
 279
 280    /// Invites a user to a channel as a member.
 281    pub async fn invite_channel_member(
 282        &self,
 283        channel_id: ChannelId,
 284        invitee_id: UserId,
 285        inviter_id: UserId,
 286        role: ChannelRole,
 287    ) -> Result<InviteMemberResult> {
 288        self.transaction(move |tx| async move {
 289            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 290            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 291                .await?;
 292            if !channel.is_root() {
 293                Err(ErrorCode::NotARootChannel.anyhow())?
 294            }
 295
 296            channel_member::ActiveModel {
 297                id: ActiveValue::NotSet,
 298                channel_id: ActiveValue::Set(channel_id),
 299                user_id: ActiveValue::Set(invitee_id),
 300                accepted: ActiveValue::Set(false),
 301                role: ActiveValue::Set(role),
 302            }
 303            .insert(&*tx)
 304            .await?;
 305
 306            let channel = Channel::from_model(channel);
 307
 308            let notifications = self
 309                .create_notification(
 310                    invitee_id,
 311                    rpc::Notification::ChannelInvitation {
 312                        channel_id: channel_id.to_proto(),
 313                        channel_name: channel.name.clone(),
 314                        inviter_id: inviter_id.to_proto(),
 315                    },
 316                    true,
 317                    &*tx,
 318                )
 319                .await?
 320                .into_iter()
 321                .collect();
 322
 323            Ok(InviteMemberResult {
 324                channel,
 325                notifications,
 326            })
 327        })
 328        .await
 329    }
 330
 331    fn sanitize_channel_name(name: &str) -> Result<&str> {
 332        let new_name = name.trim().trim_start_matches('#');
 333        if new_name == "" {
 334            Err(anyhow!("channel name can't be blank"))?;
 335        }
 336        Ok(new_name)
 337    }
 338
 339    /// Renames the specified channel.
 340    pub async fn rename_channel(
 341        &self,
 342        channel_id: ChannelId,
 343        admin_id: UserId,
 344        new_name: &str,
 345    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 346        self.transaction(move |tx| async move {
 347            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 348
 349            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 350            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 351                .await?;
 352
 353            let mut model = channel.into_active_model();
 354            model.name = ActiveValue::Set(new_name.clone());
 355            let channel = model.update(&*tx).await?;
 356
 357            let channel_members = channel_member::Entity::find()
 358                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 359                .all(&*tx)
 360                .await?;
 361
 362            Ok((Channel::from_model(channel), channel_members))
 363        })
 364        .await
 365    }
 366
 367    /// accept or decline an invite to join a channel
 368    pub async fn respond_to_channel_invite(
 369        &self,
 370        channel_id: ChannelId,
 371        user_id: UserId,
 372        accept: bool,
 373    ) -> Result<RespondToChannelInvite> {
 374        self.transaction(move |tx| async move {
 375            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 376
 377            let membership_update = if accept {
 378                let rows_affected = channel_member::Entity::update_many()
 379                    .set(channel_member::ActiveModel {
 380                        accepted: ActiveValue::Set(accept),
 381                        ..Default::default()
 382                    })
 383                    .filter(
 384                        channel_member::Column::ChannelId
 385                            .eq(channel_id)
 386                            .and(channel_member::Column::UserId.eq(user_id))
 387                            .and(channel_member::Column::Accepted.eq(false)),
 388                    )
 389                    .exec(&*tx)
 390                    .await?
 391                    .rows_affected;
 392
 393                if rows_affected == 0 {
 394                    Err(anyhow!("no such invitation"))?;
 395                }
 396
 397                Some(
 398                    self.calculate_membership_updated(&channel, user_id, &*tx)
 399                        .await?,
 400                )
 401            } else {
 402                let rows_affected = channel_member::Entity::delete_many()
 403                    .filter(
 404                        channel_member::Column::ChannelId
 405                            .eq(channel_id)
 406                            .and(channel_member::Column::UserId.eq(user_id))
 407                            .and(channel_member::Column::Accepted.eq(false)),
 408                    )
 409                    .exec(&*tx)
 410                    .await?
 411                    .rows_affected;
 412                if rows_affected == 0 {
 413                    Err(anyhow!("no such invitation"))?;
 414                }
 415
 416                None
 417            };
 418
 419            Ok(RespondToChannelInvite {
 420                membership_update,
 421                notifications: self
 422                    .mark_notification_as_read_with_response(
 423                        user_id,
 424                        &rpc::Notification::ChannelInvitation {
 425                            channel_id: channel_id.to_proto(),
 426                            channel_name: Default::default(),
 427                            inviter_id: Default::default(),
 428                        },
 429                        accept,
 430                        &*tx,
 431                    )
 432                    .await?
 433                    .into_iter()
 434                    .collect(),
 435            })
 436        })
 437        .await
 438    }
 439
 440    async fn calculate_membership_updated(
 441        &self,
 442        channel: &channel::Model,
 443        user_id: UserId,
 444        tx: &DatabaseTransaction,
 445    ) -> Result<MembershipUpdated> {
 446        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 447        let removed_channels = self
 448            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 449            .await?
 450            .into_iter()
 451            .filter_map(|channel| {
 452                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 453                    Some(channel.id)
 454                } else {
 455                    None
 456                }
 457            })
 458            .collect::<Vec<_>>();
 459
 460        Ok(MembershipUpdated {
 461            channel_id: channel.id,
 462            new_channels,
 463            removed_channels,
 464        })
 465    }
 466
 467    /// Removes a channel member.
 468    pub async fn remove_channel_member(
 469        &self,
 470        channel_id: ChannelId,
 471        member_id: UserId,
 472        admin_id: UserId,
 473    ) -> Result<RemoveChannelMemberResult> {
 474        self.transaction(|tx| async move {
 475            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 476
 477            if member_id != admin_id {
 478                self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 479                    .await?;
 480            }
 481
 482            let result = channel_member::Entity::delete_many()
 483                .filter(
 484                    channel_member::Column::ChannelId
 485                        .eq(channel_id)
 486                        .and(channel_member::Column::UserId.eq(member_id)),
 487                )
 488                .exec(&*tx)
 489                .await?;
 490
 491            if result.rows_affected == 0 {
 492                Err(anyhow!("no such member"))?;
 493            }
 494
 495            Ok(RemoveChannelMemberResult {
 496                membership_update: self
 497                    .calculate_membership_updated(&channel, member_id, &*tx)
 498                    .await?,
 499                notification_id: self
 500                    .remove_notification(
 501                        member_id,
 502                        rpc::Notification::ChannelInvitation {
 503                            channel_id: channel_id.to_proto(),
 504                            channel_name: Default::default(),
 505                            inviter_id: Default::default(),
 506                        },
 507                        &*tx,
 508                    )
 509                    .await?,
 510            })
 511        })
 512        .await
 513    }
 514
 515    /// Returns all channel invites for the user with the given ID.
 516    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 517        self.transaction(|tx| async move {
 518            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 519
 520            let channel_invites = channel_member::Entity::find()
 521                .filter(
 522                    channel_member::Column::UserId
 523                        .eq(user_id)
 524                        .and(channel_member::Column::Accepted.eq(false)),
 525                )
 526                .all(&*tx)
 527                .await?;
 528
 529            for invite in channel_invites {
 530                role_for_channel.insert(invite.channel_id, invite.role);
 531            }
 532
 533            let channels = channel::Entity::find()
 534                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 535                .all(&*tx)
 536                .await?;
 537
 538            let channels = channels
 539                .into_iter()
 540                .filter_map(|channel| Some(Channel::from_model(channel)))
 541                .collect();
 542
 543            Ok(channels)
 544        })
 545        .await
 546    }
 547
 548    pub async fn get_channel_memberships(
 549        &self,
 550        user_id: UserId,
 551    ) -> Result<(Vec<channel_member::Model>, Vec<channel::Model>)> {
 552        self.transaction(|tx| async move {
 553            let memberships = channel_member::Entity::find()
 554                .filter(channel_member::Column::UserId.eq(user_id))
 555                .all(&*tx)
 556                .await?;
 557            let channels = self
 558                .get_channel_descendants_including_self(
 559                    memberships.iter().map(|m| m.channel_id),
 560                    &*tx,
 561                )
 562                .await?;
 563            Ok((memberships, channels))
 564        })
 565        .await
 566    }
 567
 568    /// Returns all channels for the user with the given ID.
 569    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 570        self.transaction(|tx| async move {
 571            let tx = tx;
 572
 573            self.get_user_channels(user_id, None, &tx).await
 574        })
 575        .await
 576    }
 577
 578    /// Returns all channels for the user with the given ID that are descendants
 579    /// of the specified ancestor channel.
 580    pub async fn get_user_channels(
 581        &self,
 582        user_id: UserId,
 583        ancestor_channel: Option<&channel::Model>,
 584        tx: &DatabaseTransaction,
 585    ) -> Result<ChannelsForUser> {
 586        let mut filter = channel_member::Column::UserId
 587            .eq(user_id)
 588            .and(channel_member::Column::Accepted.eq(true));
 589
 590        if let Some(ancestor) = ancestor_channel {
 591            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 592        }
 593
 594        let channel_memberships = channel_member::Entity::find()
 595            .filter(filter)
 596            .all(&*tx)
 597            .await?;
 598
 599        let descendants = self
 600            .get_channel_descendants_including_self(
 601                channel_memberships.iter().map(|m| m.channel_id),
 602                &*tx,
 603            )
 604            .await?;
 605
 606        let roles_by_channel_id = channel_memberships
 607            .iter()
 608            .map(|membership| (membership.channel_id, membership.role))
 609            .collect::<HashMap<_, _>>();
 610
 611        let channels: Vec<Channel> = descendants
 612            .into_iter()
 613            .filter_map(|channel| {
 614                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 615                if parent_role.can_see_channel(channel.visibility) {
 616                    Some(Channel::from_model(channel))
 617                } else {
 618                    None
 619                }
 620            })
 621            .collect();
 622
 623        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 624        enum QueryUserIdsAndChannelIds {
 625            ChannelId,
 626            UserId,
 627        }
 628
 629        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 630        {
 631            let mut rows = room_participant::Entity::find()
 632                .inner_join(room::Entity)
 633                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 634                .select_only()
 635                .column(room::Column::ChannelId)
 636                .column(room_participant::Column::UserId)
 637                .into_values::<_, QueryUserIdsAndChannelIds>()
 638                .stream(&*tx)
 639                .await?;
 640            while let Some(row) = rows.next().await {
 641                let row: (ChannelId, UserId) = row?;
 642                channel_participants.entry(row.0).or_default().push(row.1)
 643            }
 644        }
 645
 646        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 647        let latest_buffer_versions = self
 648            .latest_channel_buffer_changes(&channel_ids, &*tx)
 649            .await?;
 650
 651        let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
 652
 653        Ok(ChannelsForUser {
 654            channel_memberships,
 655            channels,
 656            channel_participants,
 657            latest_buffer_versions,
 658            latest_channel_messages: latest_messages,
 659        })
 660    }
 661
 662    /// Sets the role for the specified channel member.
 663    pub async fn set_channel_member_role(
 664        &self,
 665        channel_id: ChannelId,
 666        admin_id: UserId,
 667        for_user: UserId,
 668        role: ChannelRole,
 669    ) -> Result<SetMemberRoleResult> {
 670        self.transaction(|tx| async move {
 671            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 672            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 673                .await?;
 674
 675            let membership = channel_member::Entity::find()
 676                .filter(
 677                    channel_member::Column::ChannelId
 678                        .eq(channel_id)
 679                        .and(channel_member::Column::UserId.eq(for_user)),
 680                )
 681                .one(&*tx)
 682                .await?;
 683
 684            let Some(membership) = membership else {
 685                Err(anyhow!("no such member"))?
 686            };
 687
 688            let mut update = membership.into_active_model();
 689            update.role = ActiveValue::Set(role);
 690            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 691
 692            if updated.accepted {
 693                Ok(SetMemberRoleResult::MembershipUpdated(
 694                    self.calculate_membership_updated(&channel, for_user, &*tx)
 695                        .await?,
 696                ))
 697            } else {
 698                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 699                    channel,
 700                )))
 701            }
 702        })
 703        .await
 704    }
 705
 706    /// Returns the details for the specified channel member.
 707    pub async fn get_channel_participant_details(
 708        &self,
 709        channel_id: ChannelId,
 710        user_id: UserId,
 711    ) -> Result<Vec<proto::ChannelMember>> {
 712        let (role, members) = self
 713            .transaction(move |tx| async move {
 714                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 715                let role = self
 716                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 717                    .await?;
 718                Ok((
 719                    role,
 720                    self.get_channel_participant_details_internal(&channel, &*tx)
 721                        .await?,
 722                ))
 723            })
 724            .await?;
 725
 726        if role == ChannelRole::Admin {
 727            Ok(members
 728                .into_iter()
 729                .map(|channel_member| proto::ChannelMember {
 730                    role: channel_member.role.into(),
 731                    user_id: channel_member.user_id.to_proto(),
 732                    kind: if channel_member.accepted {
 733                        Kind::Member
 734                    } else {
 735                        Kind::Invitee
 736                    }
 737                    .into(),
 738                })
 739                .collect())
 740        } else {
 741            return Ok(members
 742                .into_iter()
 743                .filter_map(|member| {
 744                    if !member.accepted {
 745                        return None;
 746                    }
 747                    Some(proto::ChannelMember {
 748                        role: member.role.into(),
 749                        user_id: member.user_id.to_proto(),
 750                        kind: Kind::Member.into(),
 751                    })
 752                })
 753                .collect());
 754        }
 755    }
 756
 757    async fn get_channel_participant_details_internal(
 758        &self,
 759        channel: &channel::Model,
 760        tx: &DatabaseTransaction,
 761    ) -> Result<Vec<channel_member::Model>> {
 762        Ok(channel_member::Entity::find()
 763            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 764            .all(tx)
 765            .await?)
 766    }
 767
 768    /// Returns the participants in the given channel.
 769    pub async fn get_channel_participants(
 770        &self,
 771        channel: &channel::Model,
 772        tx: &DatabaseTransaction,
 773    ) -> Result<Vec<UserId>> {
 774        let participants = self
 775            .get_channel_participant_details_internal(channel, &*tx)
 776            .await?;
 777        Ok(participants
 778            .into_iter()
 779            .map(|member| member.user_id)
 780            .collect())
 781    }
 782
 783    /// Returns whether the given user is an admin in the specified channel.
 784    pub async fn check_user_is_channel_admin(
 785        &self,
 786        channel: &channel::Model,
 787        user_id: UserId,
 788        tx: &DatabaseTransaction,
 789    ) -> Result<ChannelRole> {
 790        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 791        match role {
 792            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 793            Some(ChannelRole::Member)
 794            | Some(ChannelRole::Banned)
 795            | Some(ChannelRole::Guest)
 796            | None => Err(anyhow!(
 797                "user is not a channel admin or channel does not exist"
 798            ))?,
 799        }
 800    }
 801
 802    /// Returns whether the given user is a member of the specified channel.
 803    pub async fn check_user_is_channel_member(
 804        &self,
 805        channel: &channel::Model,
 806        user_id: UserId,
 807        tx: &DatabaseTransaction,
 808    ) -> Result<ChannelRole> {
 809        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 810        match channel_role {
 811            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 812            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 813                "user is not a channel member or channel does not exist"
 814            ))?,
 815        }
 816    }
 817
 818    /// Returns whether the given user is a participant in the specified channel.
 819    pub async fn check_user_is_channel_participant(
 820        &self,
 821        channel: &channel::Model,
 822        user_id: UserId,
 823        tx: &DatabaseTransaction,
 824    ) -> Result<ChannelRole> {
 825        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 826        match role {
 827            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 828                Ok(role.unwrap())
 829            }
 830            Some(ChannelRole::Banned) | None => Err(anyhow!(
 831                "user is not a channel participant or channel does not exist"
 832            ))?,
 833        }
 834    }
 835
 836    /// Returns a user's pending invite for the given channel, if one exists.
 837    pub async fn pending_invite_for_channel(
 838        &self,
 839        channel: &channel::Model,
 840        user_id: UserId,
 841        tx: &DatabaseTransaction,
 842    ) -> Result<Option<channel_member::Model>> {
 843        let row = channel_member::Entity::find()
 844            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 845            .filter(channel_member::Column::UserId.eq(user_id))
 846            .filter(channel_member::Column::Accepted.eq(false))
 847            .one(&*tx)
 848            .await?;
 849
 850        Ok(row)
 851    }
 852
 853    /// Returns the role for a user in the given channel.
 854    pub async fn channel_role_for_user(
 855        &self,
 856        channel: &channel::Model,
 857        user_id: UserId,
 858        tx: &DatabaseTransaction,
 859    ) -> Result<Option<ChannelRole>> {
 860        let membership = channel_member::Entity::find()
 861            .filter(
 862                channel_member::Column::ChannelId
 863                    .eq(channel.root_id())
 864                    .and(channel_member::Column::UserId.eq(user_id))
 865                    .and(channel_member::Column::Accepted.eq(true)),
 866            )
 867            .one(&*tx)
 868            .await?;
 869
 870        let Some(membership) = membership else {
 871            return Ok(None);
 872        };
 873
 874        if !membership.role.can_see_channel(channel.visibility) {
 875            return Ok(None);
 876        }
 877
 878        Ok(Some(membership.role))
 879    }
 880
 881    // Get the descendants of the given set if channels, ordered by their
 882    // path.
 883    async fn get_channel_descendants_including_self(
 884        &self,
 885        channel_ids: impl IntoIterator<Item = ChannelId>,
 886        tx: &DatabaseTransaction,
 887    ) -> Result<Vec<channel::Model>> {
 888        let mut values = String::new();
 889        for id in channel_ids {
 890            if !values.is_empty() {
 891                values.push_str(", ");
 892            }
 893            write!(&mut values, "({})", id).unwrap();
 894        }
 895
 896        if values.is_empty() {
 897            return Ok(vec![]);
 898        }
 899
 900        let sql = format!(
 901            r#"
 902            SELECT DISTINCT
 903                descendant_channels.*,
 904                descendant_channels.parent_path || descendant_channels.id as full_path
 905            FROM
 906                channels parent_channels, channels descendant_channels
 907            WHERE
 908                descendant_channels.id IN ({values}) OR
 909                (
 910                    parent_channels.id IN ({values}) AND
 911                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
 912                )
 913            ORDER BY
 914                full_path ASC
 915            "#
 916        );
 917
 918        Ok(channel::Entity::find()
 919            .from_raw_sql(Statement::from_string(
 920                self.pool.get_database_backend(),
 921                sql,
 922            ))
 923            .all(tx)
 924            .await?)
 925    }
 926
 927    /// Returns the channel with the given ID.
 928    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 929        self.transaction(|tx| async move {
 930            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 931            self.check_user_is_channel_participant(&channel, user_id, &*tx)
 932                .await?;
 933
 934            Ok(Channel::from_model(channel))
 935        })
 936        .await
 937    }
 938
 939    pub(crate) async fn get_channel_internal(
 940        &self,
 941        channel_id: ChannelId,
 942        tx: &DatabaseTransaction,
 943    ) -> Result<channel::Model> {
 944        Ok(channel::Entity::find_by_id(channel_id)
 945            .one(&*tx)
 946            .await?
 947            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 948    }
 949
 950    pub(crate) async fn get_or_create_channel_room(
 951        &self,
 952        channel_id: ChannelId,
 953        live_kit_room: &str,
 954        environment: &str,
 955        tx: &DatabaseTransaction,
 956    ) -> Result<RoomId> {
 957        let room = room::Entity::find()
 958            .filter(room::Column::ChannelId.eq(channel_id))
 959            .one(&*tx)
 960            .await?;
 961
 962        let room_id = if let Some(room) = room {
 963            if let Some(env) = room.environment {
 964                if &env != environment {
 965                    Err(ErrorCode::WrongReleaseChannel
 966                        .with_tag("required", &env)
 967                        .anyhow())?;
 968                }
 969            }
 970            room.id
 971        } else {
 972            let result = room::Entity::insert(room::ActiveModel {
 973                channel_id: ActiveValue::Set(Some(channel_id)),
 974                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 975                environment: ActiveValue::Set(Some(environment.to_string())),
 976                ..Default::default()
 977            })
 978            .exec(&*tx)
 979            .await?;
 980
 981            result.last_insert_id
 982        };
 983
 984        Ok(room_id)
 985    }
 986
 987    /// Move a channel from one parent to another
 988    pub async fn move_channel(
 989        &self,
 990        channel_id: ChannelId,
 991        new_parent_id: ChannelId,
 992        admin_id: UserId,
 993    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 994        self.transaction(|tx| async move {
 995            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 996            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 997                .await?;
 998            let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
 999
1000            if new_parent.root_id() != channel.root_id() {
1001                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
1002            }
1003
1004            if new_parent
1005                .ancestors_including_self()
1006                .any(|id| id == channel.id)
1007            {
1008                Err(anyhow!(ErrorCode::CircularNesting))?;
1009            }
1010
1011            if channel.visibility == ChannelVisibility::Public
1012                && new_parent.visibility != ChannelVisibility::Public
1013            {
1014                Err(anyhow!(ErrorCode::BadPublicNesting))?;
1015            }
1016
1017            let root_id = channel.root_id();
1018            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1019            let new_path = format!("{}{}/", new_parent.path(), channel.id);
1020
1021            let mut model = channel.into_active_model();
1022            model.parent_path = ActiveValue::Set(new_parent.path());
1023            let channel = model.update(&*tx).await?;
1024
1025            let descendent_ids =
1026                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1027                    self.pool.get_database_backend(),
1028                    "
1029                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1030                    WHERE parent_path LIKE $3 || '%'
1031                    RETURNING id
1032                ",
1033                    [old_path.clone().into(), new_path.into(), old_path.into()],
1034                ))
1035                .all(&*tx)
1036                .await?;
1037
1038            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1039
1040            let channels = channel::Entity::find()
1041                .filter(channel::Column::Id.is_in(all_moved_ids))
1042                .all(&*tx)
1043                .await?
1044                .into_iter()
1045                .map(|c| Channel::from_model(c))
1046                .collect::<Vec<_>>();
1047
1048            let channel_members = channel_member::Entity::find()
1049                .filter(channel_member::Column::ChannelId.eq(root_id))
1050                .all(&*tx)
1051                .await?;
1052
1053            Ok((channels, channel_members))
1054        })
1055        .await
1056    }
1057}
1058
1059#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1060enum QueryIds {
1061    Id,
1062}
1063
1064#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1065enum QueryUserIds {
1066    UserId,
1067}