channels.rs

   1use super::*;
   2use anyhow::Context as _;
   3use rpc::{
   4    ErrorCode, ErrorCodeExt,
   5    proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
   6};
   7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
   8
   9impl Database {
  10    #[cfg(test)]
  11    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  12        self.transaction(move |tx| async move {
  13            let mut channels = Vec::new();
  14            let mut rows = channel::Entity::find().stream(&*tx).await?;
  15            while let Some(row) = rows.next().await {
  16                let row = row?;
  17                channels.push((row.id, row.name));
  18            }
  19            Ok(channels)
  20        })
  21        .await
  22    }
  23
  24    #[cfg(test)]
  25    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  26        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .0
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let parent_path = parent
  63                .as_ref()
  64                .map_or(String::new(), |parent| parent.path());
  65
  66            // Find the maximum channel_order among siblings to set the new channel at the end
  67            let max_order = if parent_path.is_empty() {
  68                0
  69            } else {
  70                max_order(&parent_path, &tx).await?
  71            };
  72
  73            log::info!(
  74                "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
  75                name,
  76                parent_path,
  77                max_order,
  78                max_order + 1
  79            );
  80
  81            let channel = channel::ActiveModel {
  82                id: ActiveValue::NotSet,
  83                name: ActiveValue::Set(name.to_string()),
  84                visibility: ActiveValue::Set(ChannelVisibility::Members),
  85                parent_path: ActiveValue::Set(parent_path),
  86                requires_zed_cla: ActiveValue::NotSet,
  87                channel_order: ActiveValue::Set(max_order + 1),
  88            }
  89            .insert(&*tx)
  90            .await?;
  91
  92            if parent.is_none() {
  93                membership = Some(
  94                    channel_member::ActiveModel {
  95                        id: ActiveValue::NotSet,
  96                        channel_id: ActiveValue::Set(channel.id),
  97                        user_id: ActiveValue::Set(admin_id),
  98                        accepted: ActiveValue::Set(true),
  99                        role: ActiveValue::Set(ChannelRole::Admin),
 100                    }
 101                    .insert(&*tx)
 102                    .await?,
 103                );
 104            }
 105
 106            Ok((channel, membership))
 107        })
 108        .await
 109    }
 110
 111    /// Adds a user to the specified channel.
 112    pub async fn join_channel(
 113        &self,
 114        channel_id: ChannelId,
 115        user_id: UserId,
 116        connection: ConnectionId,
 117    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 118        self.transaction(move |tx| async move {
 119            let channel = self.get_channel_internal(channel_id, &tx).await?;
 120            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 121
 122            let mut accept_invite_result = None;
 123
 124            if role.is_none() {
 125                if let Some(invitation) = self
 126                    .pending_invite_for_channel(&channel, user_id, &tx)
 127                    .await?
 128                {
 129                    // note, this may be a parent channel
 130                    role = Some(invitation.role);
 131                    channel_member::Entity::update(channel_member::ActiveModel {
 132                        accepted: ActiveValue::Set(true),
 133                        ..invitation.into_active_model()
 134                    })
 135                    .exec(&*tx)
 136                    .await?;
 137
 138                    accept_invite_result = Some(
 139                        self.calculate_membership_updated(&channel, user_id, &tx)
 140                            .await?,
 141                    );
 142
 143                    debug_assert!(
 144                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 145                    );
 146                } else if channel.visibility == ChannelVisibility::Public {
 147                    role = Some(ChannelRole::Guest);
 148                    channel_member::Entity::insert(channel_member::ActiveModel {
 149                        id: ActiveValue::NotSet,
 150                        channel_id: ActiveValue::Set(channel.root_id()),
 151                        user_id: ActiveValue::Set(user_id),
 152                        accepted: ActiveValue::Set(true),
 153                        role: ActiveValue::Set(ChannelRole::Guest),
 154                    })
 155                    .exec(&*tx)
 156                    .await?;
 157
 158                    accept_invite_result = Some(
 159                        self.calculate_membership_updated(&channel, user_id, &tx)
 160                            .await?,
 161                    );
 162
 163                    debug_assert!(
 164                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 165                    );
 166                }
 167            }
 168
 169            if role.is_none() || role == Some(ChannelRole::Banned) {
 170                Err(ErrorCode::Forbidden.anyhow())?
 171            }
 172            let role = role.unwrap();
 173
 174            let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
 175            let room_id = self
 176                .get_or_create_channel_room(channel_id, &livekit_room, &tx)
 177                .await?;
 178
 179            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 180                .await
 181                .map(|jr| (jr, accept_invite_result, role))
 182        })
 183        .await
 184    }
 185
 186    /// Sets the visibility of the given channel.
 187    pub async fn set_channel_visibility(
 188        &self,
 189        channel_id: ChannelId,
 190        visibility: ChannelVisibility,
 191        admin_id: UserId,
 192    ) -> Result<channel::Model> {
 193        self.transaction(move |tx| async move {
 194            let channel = self.get_channel_internal(channel_id, &tx).await?;
 195            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 196                .await?;
 197
 198            if visibility == ChannelVisibility::Public {
 199                if let Some(parent_id) = channel.parent_id() {
 200                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 201
 202                    if parent.visibility != ChannelVisibility::Public {
 203                        Err(ErrorCode::BadPublicNesting
 204                            .with_tag("direction", "parent")
 205                            .anyhow())?;
 206                    }
 207                }
 208            } else if visibility == ChannelVisibility::Members
 209                && self
 210                    .get_channel_descendants_excluding_self([&channel], &tx)
 211                    .await?
 212                    .into_iter()
 213                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 214            {
 215                Err(ErrorCode::BadPublicNesting
 216                    .with_tag("direction", "children")
 217                    .anyhow())?;
 218            }
 219
 220            let mut model = channel.into_active_model();
 221            model.visibility = ActiveValue::Set(visibility);
 222            let channel = model.update(&*tx).await?;
 223
 224            Ok(channel)
 225        })
 226        .await
 227    }
 228
 229    #[cfg(test)]
 230    pub async fn set_channel_requires_zed_cla(
 231        &self,
 232        channel_id: ChannelId,
 233        requires_zed_cla: bool,
 234    ) -> Result<()> {
 235        self.transaction(move |tx| async move {
 236            let channel = self.get_channel_internal(channel_id, &tx).await?;
 237            let mut model = channel.into_active_model();
 238            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 239            model.update(&*tx).await?;
 240            Ok(())
 241        })
 242        .await
 243    }
 244
 245    /// Deletes the channel with the specified ID.
 246    pub async fn delete_channel(
 247        &self,
 248        channel_id: ChannelId,
 249        user_id: UserId,
 250    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 251        self.transaction(move |tx| async move {
 252            let channel = self.get_channel_internal(channel_id, &tx).await?;
 253            self.check_user_is_channel_admin(&channel, user_id, &tx)
 254                .await?;
 255
 256            let channels_to_remove = self
 257                .get_channel_descendants_excluding_self([&channel], &tx)
 258                .await?
 259                .into_iter()
 260                .map(|channel| channel.id)
 261                .chain(Some(channel_id))
 262                .collect::<Vec<_>>();
 263
 264            channel::Entity::delete_many()
 265                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 266                .exec(&*tx)
 267                .await?;
 268
 269            Ok((channel.root_id(), channels_to_remove))
 270        })
 271        .await
 272    }
 273
 274    /// Invites a user to a channel as a member.
 275    pub async fn invite_channel_member(
 276        &self,
 277        channel_id: ChannelId,
 278        invitee_id: UserId,
 279        inviter_id: UserId,
 280        role: ChannelRole,
 281    ) -> Result<InviteMemberResult> {
 282        self.transaction(move |tx| async move {
 283            let channel = self.get_channel_internal(channel_id, &tx).await?;
 284            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 285                .await?;
 286            if !channel.is_root() {
 287                Err(ErrorCode::NotARootChannel.anyhow())?
 288            }
 289
 290            channel_member::ActiveModel {
 291                id: ActiveValue::NotSet,
 292                channel_id: ActiveValue::Set(channel_id),
 293                user_id: ActiveValue::Set(invitee_id),
 294                accepted: ActiveValue::Set(false),
 295                role: ActiveValue::Set(role),
 296            }
 297            .insert(&*tx)
 298            .await?;
 299
 300            let channel = Channel::from_model(channel);
 301
 302            let notifications = self
 303                .create_notification(
 304                    invitee_id,
 305                    rpc::Notification::ChannelInvitation {
 306                        channel_id: channel_id.to_proto(),
 307                        channel_name: channel.name.clone(),
 308                        inviter_id: inviter_id.to_proto(),
 309                    },
 310                    true,
 311                    &tx,
 312                )
 313                .await?
 314                .into_iter()
 315                .collect();
 316
 317            Ok(InviteMemberResult {
 318                channel,
 319                notifications,
 320            })
 321        })
 322        .await
 323    }
 324
 325    fn sanitize_channel_name(name: &str) -> Result<&str> {
 326        let new_name = name.trim().trim_start_matches('#');
 327        if new_name.is_empty() {
 328            Err(anyhow!("channel name can't be blank"))?;
 329        }
 330        Ok(new_name)
 331    }
 332
 333    /// Renames the specified channel.
 334    pub async fn rename_channel(
 335        &self,
 336        channel_id: ChannelId,
 337        admin_id: UserId,
 338        new_name: &str,
 339    ) -> Result<channel::Model> {
 340        self.transaction(move |tx| async move {
 341            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 342
 343            let channel = self.get_channel_internal(channel_id, &tx).await?;
 344            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 345                .await?;
 346
 347            let mut model = channel.into_active_model();
 348            model.name = ActiveValue::Set(new_name.clone());
 349            let channel = model.update(&*tx).await?;
 350
 351            Ok(channel)
 352        })
 353        .await
 354    }
 355
 356    /// accept or decline an invite to join a channel
 357    pub async fn respond_to_channel_invite(
 358        &self,
 359        channel_id: ChannelId,
 360        user_id: UserId,
 361        accept: bool,
 362    ) -> Result<RespondToChannelInvite> {
 363        self.transaction(move |tx| async move {
 364            let channel = self.get_channel_internal(channel_id, &tx).await?;
 365
 366            let membership_update = if accept {
 367                let rows_affected = channel_member::Entity::update_many()
 368                    .set(channel_member::ActiveModel {
 369                        accepted: ActiveValue::Set(accept),
 370                        ..Default::default()
 371                    })
 372                    .filter(
 373                        channel_member::Column::ChannelId
 374                            .eq(channel_id)
 375                            .and(channel_member::Column::UserId.eq(user_id))
 376                            .and(channel_member::Column::Accepted.eq(false)),
 377                    )
 378                    .exec(&*tx)
 379                    .await?
 380                    .rows_affected;
 381
 382                if rows_affected == 0 {
 383                    Err(anyhow!("no such invitation"))?;
 384                }
 385
 386                Some(
 387                    self.calculate_membership_updated(&channel, user_id, &tx)
 388                        .await?,
 389                )
 390            } else {
 391                let rows_affected = channel_member::Entity::delete_many()
 392                    .filter(
 393                        channel_member::Column::ChannelId
 394                            .eq(channel_id)
 395                            .and(channel_member::Column::UserId.eq(user_id))
 396                            .and(channel_member::Column::Accepted.eq(false)),
 397                    )
 398                    .exec(&*tx)
 399                    .await?
 400                    .rows_affected;
 401                if rows_affected == 0 {
 402                    Err(anyhow!("no such invitation"))?;
 403                }
 404
 405                None
 406            };
 407
 408            Ok(RespondToChannelInvite {
 409                membership_update,
 410                notifications: self
 411                    .mark_notification_as_read_with_response(
 412                        user_id,
 413                        &rpc::Notification::ChannelInvitation {
 414                            channel_id: channel_id.to_proto(),
 415                            channel_name: Default::default(),
 416                            inviter_id: Default::default(),
 417                        },
 418                        accept,
 419                        &tx,
 420                    )
 421                    .await?
 422                    .into_iter()
 423                    .collect(),
 424            })
 425        })
 426        .await
 427    }
 428
 429    async fn calculate_membership_updated(
 430        &self,
 431        channel: &channel::Model,
 432        user_id: UserId,
 433        tx: &DatabaseTransaction,
 434    ) -> Result<MembershipUpdated> {
 435        let new_channels = self
 436            .get_user_channels(user_id, Some(channel), false, tx)
 437            .await?;
 438        let removed_channels = self
 439            .get_channel_descendants_excluding_self([channel], tx)
 440            .await?
 441            .into_iter()
 442            .map(|channel| channel.id)
 443            .chain([channel.id])
 444            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 445            .collect::<Vec<_>>();
 446
 447        Ok(MembershipUpdated {
 448            channel_id: channel.id,
 449            new_channels,
 450            removed_channels,
 451        })
 452    }
 453
 454    /// Removes a channel member.
 455    pub async fn remove_channel_member(
 456        &self,
 457        channel_id: ChannelId,
 458        member_id: UserId,
 459        admin_id: UserId,
 460    ) -> Result<RemoveChannelMemberResult> {
 461        self.transaction(|tx| async move {
 462            let channel = self.get_channel_internal(channel_id, &tx).await?;
 463
 464            if member_id != admin_id {
 465                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 466                    .await?;
 467            }
 468
 469            let result = channel_member::Entity::delete_many()
 470                .filter(
 471                    channel_member::Column::ChannelId
 472                        .eq(channel_id)
 473                        .and(channel_member::Column::UserId.eq(member_id)),
 474                )
 475                .exec(&*tx)
 476                .await?;
 477
 478            if result.rows_affected == 0 {
 479                Err(anyhow!("no such member"))?;
 480            }
 481
 482            Ok(RemoveChannelMemberResult {
 483                membership_update: self
 484                    .calculate_membership_updated(&channel, member_id, &tx)
 485                    .await?,
 486                notification_id: self
 487                    .remove_notification(
 488                        member_id,
 489                        rpc::Notification::ChannelInvitation {
 490                            channel_id: channel_id.to_proto(),
 491                            channel_name: Default::default(),
 492                            inviter_id: Default::default(),
 493                        },
 494                        &tx,
 495                    )
 496                    .await?,
 497            })
 498        })
 499        .await
 500    }
 501
 502    /// Returns all channels for the user with the given ID.
 503    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 504        self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
 505            .await
 506    }
 507
 508    /// Returns all channels for the user with the given ID that are descendants
 509    /// of the specified ancestor channel.
 510    pub async fn get_user_channels(
 511        &self,
 512        user_id: UserId,
 513        ancestor_channel: Option<&channel::Model>,
 514        include_invites: bool,
 515        tx: &DatabaseTransaction,
 516    ) -> Result<ChannelsForUser> {
 517        let mut filter = channel_member::Column::UserId.eq(user_id);
 518        if !include_invites {
 519            filter = filter.and(channel_member::Column::Accepted.eq(true))
 520        }
 521        if let Some(ancestor) = ancestor_channel {
 522            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 523        }
 524
 525        let mut channels = Vec::<channel::Model>::new();
 526        let mut invited_channels = Vec::<Channel>::new();
 527        let mut channel_memberships = Vec::<channel_member::Model>::new();
 528        let mut rows = channel_member::Entity::find()
 529            .filter(filter)
 530            .inner_join(channel::Entity)
 531            .select_also(channel::Entity)
 532            .stream(tx)
 533            .await?;
 534        while let Some(row) = rows.next().await {
 535            if let (membership, Some(channel)) = row? {
 536                if membership.accepted {
 537                    channel_memberships.push(membership);
 538                    channels.push(channel);
 539                } else {
 540                    invited_channels.push(Channel::from_model(channel));
 541                }
 542            }
 543        }
 544        drop(rows);
 545
 546        let mut descendants = self
 547            .get_channel_descendants_excluding_self(channels.iter(), tx)
 548            .await?;
 549
 550        descendants.extend(channels);
 551
 552        let roles_by_channel_id = channel_memberships
 553            .iter()
 554            .map(|membership| (membership.channel_id, membership.role))
 555            .collect::<HashMap<_, _>>();
 556
 557        let channels: Vec<Channel> = descendants
 558            .into_iter()
 559            .filter_map(|channel| {
 560                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 561                if parent_role.can_see_channel(channel.visibility) {
 562                    Some(Channel::from_model(channel))
 563                } else {
 564                    None
 565                }
 566            })
 567            .collect();
 568
 569        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 570        enum QueryUserIdsAndChannelIds {
 571            ChannelId,
 572            UserId,
 573        }
 574
 575        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 576        {
 577            let mut rows = room_participant::Entity::find()
 578                .inner_join(room::Entity)
 579                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 580                .select_only()
 581                .column(room::Column::ChannelId)
 582                .column(room_participant::Column::UserId)
 583                .into_values::<_, QueryUserIdsAndChannelIds>()
 584                .stream(tx)
 585                .await?;
 586            while let Some(row) = rows.next().await {
 587                let row: (ChannelId, UserId) = row?;
 588                channel_participants.entry(row.0).or_default().push(row.1)
 589            }
 590        }
 591
 592        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 593
 594        let mut channel_ids_by_buffer_id = HashMap::default();
 595        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 596        let mut rows = buffer::Entity::find()
 597            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 598            .stream(tx)
 599            .await?;
 600        while let Some(row) = rows.next().await {
 601            let row = row?;
 602            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 603            latest_buffer_versions.push(ChannelBufferVersion {
 604                channel_id: row.channel_id.0 as u64,
 605                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 606                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 607                    .latest_operation_lamport_timestamp
 608                    .zip(row.latest_operation_replica_id)
 609                {
 610                    vec![VectorClockEntry {
 611                        timestamp: latest_lamport_timestamp as u32,
 612                        replica_id: latest_replica_id as u32,
 613                    }]
 614                } else {
 615                    vec![]
 616                },
 617            });
 618        }
 619        drop(rows);
 620
 621        let observed_buffer_versions = self
 622            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 623            .await?;
 624
 625        Ok(ChannelsForUser {
 626            channel_memberships,
 627            channels,
 628            invited_channels,
 629            channel_participants,
 630            latest_buffer_versions,
 631            observed_buffer_versions,
 632        })
 633    }
 634
 635    /// Sets the role for the specified channel member.
 636    pub async fn set_channel_member_role(
 637        &self,
 638        channel_id: ChannelId,
 639        admin_id: UserId,
 640        for_user: UserId,
 641        role: ChannelRole,
 642    ) -> Result<SetMemberRoleResult> {
 643        self.transaction(|tx| async move {
 644            let channel = self.get_channel_internal(channel_id, &tx).await?;
 645            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 646                .await?;
 647
 648            let membership = channel_member::Entity::find()
 649                .filter(
 650                    channel_member::Column::ChannelId
 651                        .eq(channel_id)
 652                        .and(channel_member::Column::UserId.eq(for_user)),
 653                )
 654                .one(&*tx)
 655                .await?
 656                .context("no such member")?;
 657
 658            let mut update = membership.into_active_model();
 659            update.role = ActiveValue::Set(role);
 660            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 661
 662            if updated.accepted {
 663                Ok(SetMemberRoleResult::MembershipUpdated(
 664                    self.calculate_membership_updated(&channel, for_user, &tx)
 665                        .await?,
 666                ))
 667            } else {
 668                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 669                    channel,
 670                )))
 671            }
 672        })
 673        .await
 674    }
 675
 676    /// Returns the details for the specified channel member.
 677    pub async fn get_channel_participant_details(
 678        &self,
 679        channel_id: ChannelId,
 680        filter: &str,
 681        limit: u64,
 682        user_id: UserId,
 683    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
 684        let members = self
 685            .transaction(move |tx| async move {
 686                let channel = self.get_channel_internal(channel_id, &tx).await?;
 687                self.check_user_is_channel_participant(&channel, user_id, &tx)
 688                    .await?;
 689                let mut query = channel_member::Entity::find()
 690                    .find_also_related(user::Entity)
 691                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
 692
 693                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
 694                    query = query.filter(Expr::cust_with_values(
 695                        "UPPER(github_login) LIKE ?",
 696                        [Self::fuzzy_like_string(&filter.to_uppercase())],
 697                    ))
 698                } else {
 699                    query = query.filter(Expr::cust_with_values(
 700                        "github_login ILIKE $1",
 701                        [Self::fuzzy_like_string(filter)],
 702                    ))
 703                }
 704                let members = query.order_by(
 705                        Expr::cust(
 706                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
 707                        ),
 708                        sea_orm::Order::Asc,
 709                    )
 710                    .limit(limit)
 711                    .all(&*tx)
 712                    .await?;
 713
 714                Ok(members)
 715            })
 716            .await?;
 717
 718        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
 719
 720        let members = members
 721            .into_iter()
 722            .map(|(member, user)| {
 723                if let Some(user) = user {
 724                    users.push(proto::User {
 725                        id: user.id.to_proto(),
 726                        avatar_url: format!(
 727                            "https://avatars.githubusercontent.com/u/{}?s=128&v=4",
 728                            user.github_user_id
 729                        ),
 730                        github_login: user.github_login,
 731                        name: user.name,
 732                    })
 733                }
 734                proto::ChannelMember {
 735                    role: member.role.into(),
 736                    user_id: member.user_id.to_proto(),
 737                    kind: if member.accepted {
 738                        Kind::Member
 739                    } else {
 740                        Kind::Invitee
 741                    }
 742                    .into(),
 743                }
 744            })
 745            .collect();
 746
 747        Ok((members, users))
 748    }
 749
 750    /// Returns whether the given user is an admin in the specified channel.
 751    pub async fn check_user_is_channel_admin(
 752        &self,
 753        channel: &channel::Model,
 754        user_id: UserId,
 755        tx: &DatabaseTransaction,
 756    ) -> Result<ChannelRole> {
 757        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 758        match role {
 759            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 760            Some(ChannelRole::Member)
 761            | Some(ChannelRole::Talker)
 762            | Some(ChannelRole::Banned)
 763            | Some(ChannelRole::Guest)
 764            | None => Err(anyhow!(
 765                "user is not a channel admin or channel does not exist"
 766            ))?,
 767        }
 768    }
 769
 770    /// Returns whether the given user is a member of the specified channel.
 771    pub async fn check_user_is_channel_member(
 772        &self,
 773        channel: &channel::Model,
 774        user_id: UserId,
 775        tx: &DatabaseTransaction,
 776    ) -> Result<ChannelRole> {
 777        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 778        match channel_role {
 779            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 780            Some(ChannelRole::Banned)
 781            | Some(ChannelRole::Guest)
 782            | Some(ChannelRole::Talker)
 783            | None => Err(anyhow!(
 784                "user is not a channel member or channel does not exist"
 785            ))?,
 786        }
 787    }
 788
 789    /// Returns whether the given user is a participant in the specified channel.
 790    pub async fn check_user_is_channel_participant(
 791        &self,
 792        channel: &channel::Model,
 793        user_id: UserId,
 794        tx: &DatabaseTransaction,
 795    ) -> Result<ChannelRole> {
 796        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 797        match role {
 798            Some(ChannelRole::Admin)
 799            | Some(ChannelRole::Member)
 800            | Some(ChannelRole::Guest)
 801            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 802            Some(ChannelRole::Banned) | None => Err(anyhow!(
 803                "user is not a channel participant or channel does not exist"
 804            ))?,
 805        }
 806    }
 807
 808    /// Returns a user's pending invite for the given channel, if one exists.
 809    pub async fn pending_invite_for_channel(
 810        &self,
 811        channel: &channel::Model,
 812        user_id: UserId,
 813        tx: &DatabaseTransaction,
 814    ) -> Result<Option<channel_member::Model>> {
 815        let row = channel_member::Entity::find()
 816            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 817            .filter(channel_member::Column::UserId.eq(user_id))
 818            .filter(channel_member::Column::Accepted.eq(false))
 819            .one(tx)
 820            .await?;
 821
 822        Ok(row)
 823    }
 824
 825    /// Returns the role for a user in the given channel.
 826    pub async fn channel_role_for_user(
 827        &self,
 828        channel: &channel::Model,
 829        user_id: UserId,
 830        tx: &DatabaseTransaction,
 831    ) -> Result<Option<ChannelRole>> {
 832        let membership = channel_member::Entity::find()
 833            .filter(
 834                channel_member::Column::ChannelId
 835                    .eq(channel.root_id())
 836                    .and(channel_member::Column::UserId.eq(user_id))
 837                    .and(channel_member::Column::Accepted.eq(true)),
 838            )
 839            .one(tx)
 840            .await?;
 841
 842        let Some(membership) = membership else {
 843            return Ok(None);
 844        };
 845
 846        if !membership.role.can_see_channel(channel.visibility) {
 847            return Ok(None);
 848        }
 849
 850        Ok(Some(membership.role))
 851    }
 852
 853    // Get the descendants of the given set if channels, ordered by their
 854    // path.
 855    pub(crate) async fn get_channel_descendants_excluding_self(
 856        &self,
 857        channels: impl IntoIterator<Item = &channel::Model>,
 858        tx: &DatabaseTransaction,
 859    ) -> Result<Vec<channel::Model>> {
 860        let mut filter = Condition::any();
 861        for channel in channels.into_iter() {
 862            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 863        }
 864
 865        if filter.is_empty() {
 866            return Ok(vec![]);
 867        }
 868
 869        Ok(channel::Entity::find()
 870            .filter(filter)
 871            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 872            .all(tx)
 873            .await?)
 874    }
 875
 876    /// Returns the channel with the given ID.
 877    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 878        self.transaction(|tx| async move {
 879            let channel = self.get_channel_internal(channel_id, &tx).await?;
 880            self.check_user_is_channel_participant(&channel, user_id, &tx)
 881                .await?;
 882
 883            Ok(Channel::from_model(channel))
 884        })
 885        .await
 886    }
 887
 888    pub(crate) async fn get_channel_internal(
 889        &self,
 890        channel_id: ChannelId,
 891        tx: &DatabaseTransaction,
 892    ) -> Result<channel::Model> {
 893        Ok(channel::Entity::find_by_id(channel_id)
 894            .one(tx)
 895            .await?
 896            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 897    }
 898
 899    pub(crate) async fn get_or_create_channel_room(
 900        &self,
 901        channel_id: ChannelId,
 902        livekit_room: &str,
 903        tx: &DatabaseTransaction,
 904    ) -> Result<RoomId> {
 905        let room = room::Entity::find()
 906            .filter(room::Column::ChannelId.eq(channel_id))
 907            .one(tx)
 908            .await?;
 909
 910        let room_id = if let Some(room) = room {
 911            room.id
 912        } else {
 913            let result = room::Entity::insert(room::ActiveModel {
 914                channel_id: ActiveValue::Set(Some(channel_id)),
 915                live_kit_room: ActiveValue::Set(livekit_room.to_string()),
 916                ..Default::default()
 917            })
 918            .exec(tx)
 919            .await?;
 920
 921            result.last_insert_id
 922        };
 923
 924        Ok(room_id)
 925    }
 926
 927    /// Move a channel from one parent to another
 928    pub async fn move_channel(
 929        &self,
 930        channel_id: ChannelId,
 931        new_parent_id: ChannelId,
 932        admin_id: UserId,
 933    ) -> Result<(ChannelId, Vec<Channel>)> {
 934        self.transaction(|tx| async move {
 935            let channel = self.get_channel_internal(channel_id, &tx).await?;
 936            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 937                .await?;
 938            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 939
 940            if new_parent.root_id() != channel.root_id() {
 941                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 942            }
 943
 944            if new_parent
 945                .ancestors_including_self()
 946                .any(|id| id == channel.id)
 947            {
 948                Err(anyhow!(ErrorCode::CircularNesting))?;
 949            }
 950
 951            if channel.visibility == ChannelVisibility::Public
 952                && new_parent.visibility != ChannelVisibility::Public
 953            {
 954                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 955            }
 956
 957            let root_id = channel.root_id();
 958            let new_parent_path = new_parent.path();
 959            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 960            let new_path = format!("{}{}/", &new_parent_path, channel.id);
 961            let new_order = max_order(&new_parent_path, &tx).await? + 1;
 962
 963            let mut model = channel.into_active_model();
 964            model.parent_path = ActiveValue::Set(new_parent.path());
 965            model.channel_order = ActiveValue::Set(new_order);
 966            let channel = model.update(&*tx).await?;
 967
 968            let descendent_ids =
 969                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 970                    self.pool.get_database_backend(),
 971                    "
 972                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 973                    WHERE parent_path LIKE $3 || '%'
 974                    RETURNING id
 975                ",
 976                    [old_path.clone().into(), new_path.into(), old_path.into()],
 977                ))
 978                .all(&*tx)
 979                .await?;
 980
 981            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 982
 983            let channels = channel::Entity::find()
 984                .filter(channel::Column::Id.is_in(all_moved_ids))
 985                .all(&*tx)
 986                .await?
 987                .into_iter()
 988                .map(Channel::from_model)
 989                .collect::<Vec<_>>();
 990
 991            Ok((root_id, channels))
 992        })
 993        .await
 994    }
 995
 996    pub async fn reorder_channel(
 997        &self,
 998        channel_id: ChannelId,
 999        direction: proto::reorder_channel::Direction,
1000        user_id: UserId,
1001    ) -> Result<Vec<Channel>> {
1002        self.transaction(|tx| async move {
1003            let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1004
1005            if channel.is_root() {
1006                log::info!("Skipping reorder of root channel {}", channel.id,);
1007                return Ok(vec![]);
1008            }
1009
1010            log::info!(
1011                "Reordering channel {} (parent_path: '{}', order: {})",
1012                channel.id,
1013                channel.parent_path,
1014                channel.channel_order
1015            );
1016
1017            // Check if user is admin of the channel
1018            self.check_user_is_channel_admin(&channel, user_id, &tx)
1019                .await?;
1020
1021            // Find the sibling channel to swap with
1022            let sibling_channel = match direction {
1023                proto::reorder_channel::Direction::Up => {
1024                    log::info!(
1025                        "Looking for sibling with parent_path='{}' and order < {}",
1026                        channel.parent_path,
1027                        channel.channel_order
1028                    );
1029                    // Find channel with highest order less than current
1030                    channel::Entity::find()
1031                        .filter(
1032                            channel::Column::ParentPath
1033                                .eq(&channel.parent_path)
1034                                .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1035                        )
1036                        .order_by_desc(channel::Column::ChannelOrder)
1037                        .one(&*tx)
1038                        .await?
1039                }
1040                proto::reorder_channel::Direction::Down => {
1041                    log::info!(
1042                        "Looking for sibling with parent_path='{}' and order > {}",
1043                        channel.parent_path,
1044                        channel.channel_order
1045                    );
1046                    // Find channel with lowest order greater than current
1047                    channel::Entity::find()
1048                        .filter(
1049                            channel::Column::ParentPath
1050                                .eq(&channel.parent_path)
1051                                .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1052                        )
1053                        .order_by_asc(channel::Column::ChannelOrder)
1054                        .one(&*tx)
1055                        .await?
1056                }
1057            };
1058
1059            let mut sibling_channel = match sibling_channel {
1060                Some(sibling) => {
1061                    log::info!(
1062                        "Found sibling {} (parent_path: '{}', order: {})",
1063                        sibling.id,
1064                        sibling.parent_path,
1065                        sibling.channel_order
1066                    );
1067                    sibling
1068                }
1069                None => {
1070                    log::warn!("No sibling found to swap with");
1071                    // No sibling to swap with
1072                    return Ok(vec![]);
1073                }
1074            };
1075
1076            let current_order = channel.channel_order;
1077            let sibling_order = sibling_channel.channel_order;
1078
1079            channel::ActiveModel {
1080                id: ActiveValue::Unchanged(sibling_channel.id),
1081                channel_order: ActiveValue::Set(current_order),
1082                ..Default::default()
1083            }
1084            .update(&*tx)
1085            .await?;
1086            sibling_channel.channel_order = current_order;
1087
1088            channel::ActiveModel {
1089                id: ActiveValue::Unchanged(channel.id),
1090                channel_order: ActiveValue::Set(sibling_order),
1091                ..Default::default()
1092            }
1093            .update(&*tx)
1094            .await?;
1095            channel.channel_order = sibling_order;
1096
1097            log::info!(
1098                "Reorder complete. Swapped channels {} and {}",
1099                channel.id,
1100                sibling_channel.id
1101            );
1102
1103            let swapped_channels = vec![
1104                Channel::from_model(channel),
1105                Channel::from_model(sibling_channel),
1106            ];
1107
1108            Ok(swapped_channels)
1109        })
1110        .await
1111    }
1112}
1113
1114async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1115    let max_order = channel::Entity::find()
1116        .filter(channel::Column::ParentPath.eq(parent_path))
1117        .select_only()
1118        .column_as(channel::Column::ChannelOrder.max(), "max_order")
1119        .into_tuple::<Option<i32>>()
1120        .one(&**tx)
1121        .await?
1122        .flatten()
1123        .unwrap_or(0);
1124
1125    Ok(max_order)
1126}
1127
1128#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1129enum QueryIds {
1130    Id,
1131}