channels.rs

   1use super::*;
   2use anyhow::Context as _;
   3use rpc::{
   4    ErrorCode, ErrorCodeExt,
   5    proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
   6};
   7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
   8
   9impl Database {
  10    #[cfg(test)]
  11    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  12        self.transaction(move |tx| async move {
  13            let mut channels = Vec::new();
  14            let mut rows = channel::Entity::find().stream(&*tx).await?;
  15            while let Some(row) = rows.next().await {
  16                let row = row?;
  17                channels.push((row.id, row.name));
  18            }
  19            Ok(channels)
  20        })
  21        .await
  22    }
  23
  24    #[cfg(test)]
  25    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  26        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .0
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let parent_path = parent
  63                .as_ref()
  64                .map_or(String::new(), |parent| parent.path());
  65
  66            // Find the maximum channel_order among siblings to set the new channel at the end
  67            let max_order = if parent_path.is_empty() {
  68                0
  69            } else {
  70                max_order(&parent_path, &tx).await?
  71            };
  72
  73            log::info!(
  74                "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
  75                name,
  76                parent_path,
  77                max_order,
  78                max_order + 1
  79            );
  80
  81            let channel = channel::ActiveModel {
  82                id: ActiveValue::NotSet,
  83                name: ActiveValue::Set(name.to_string()),
  84                visibility: ActiveValue::Set(ChannelVisibility::Members),
  85                parent_path: ActiveValue::Set(parent_path),
  86                requires_zed_cla: ActiveValue::NotSet,
  87                channel_order: ActiveValue::Set(max_order + 1),
  88            }
  89            .insert(&*tx)
  90            .await?;
  91
  92            if parent.is_none() {
  93                membership = Some(
  94                    channel_member::ActiveModel {
  95                        id: ActiveValue::NotSet,
  96                        channel_id: ActiveValue::Set(channel.id),
  97                        user_id: ActiveValue::Set(admin_id),
  98                        accepted: ActiveValue::Set(true),
  99                        role: ActiveValue::Set(ChannelRole::Admin),
 100                    }
 101                    .insert(&*tx)
 102                    .await?,
 103                );
 104            }
 105
 106            Ok((channel, membership))
 107        })
 108        .await
 109    }
 110
 111    /// Adds a user to the specified channel.
 112    pub async fn join_channel(
 113        &self,
 114        channel_id: ChannelId,
 115        user_id: UserId,
 116        connection: ConnectionId,
 117    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 118        self.transaction(move |tx| async move {
 119            let channel = self.get_channel_internal(channel_id, &tx).await?;
 120            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 121
 122            let mut accept_invite_result = None;
 123
 124            if role.is_none() {
 125                if let Some(invitation) = self
 126                    .pending_invite_for_channel(&channel, user_id, &tx)
 127                    .await?
 128                {
 129                    // note, this may be a parent channel
 130                    role = Some(invitation.role);
 131                    channel_member::Entity::update(channel_member::ActiveModel {
 132                        accepted: ActiveValue::Set(true),
 133                        ..invitation.into_active_model()
 134                    })
 135                    .exec(&*tx)
 136                    .await?;
 137
 138                    accept_invite_result = Some(
 139                        self.calculate_membership_updated(&channel, user_id, &tx)
 140                            .await?,
 141                    );
 142
 143                    debug_assert!(
 144                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 145                    );
 146                } else if channel.visibility == ChannelVisibility::Public {
 147                    role = Some(ChannelRole::Guest);
 148                    channel_member::Entity::insert(channel_member::ActiveModel {
 149                        id: ActiveValue::NotSet,
 150                        channel_id: ActiveValue::Set(channel.root_id()),
 151                        user_id: ActiveValue::Set(user_id),
 152                        accepted: ActiveValue::Set(true),
 153                        role: ActiveValue::Set(ChannelRole::Guest),
 154                    })
 155                    .exec(&*tx)
 156                    .await?;
 157
 158                    accept_invite_result = Some(
 159                        self.calculate_membership_updated(&channel, user_id, &tx)
 160                            .await?,
 161                    );
 162
 163                    debug_assert!(
 164                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 165                    );
 166                }
 167            }
 168
 169            if role.is_none() || role == Some(ChannelRole::Banned) {
 170                Err(ErrorCode::Forbidden.anyhow())?
 171            }
 172            let role = role.unwrap();
 173
 174            let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
 175            let room_id = self
 176                .get_or_create_channel_room(channel_id, &livekit_room, &tx)
 177                .await?;
 178
 179            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 180                .await
 181                .map(|jr| (jr, accept_invite_result, role))
 182        })
 183        .await
 184    }
 185
 186    /// Sets the visibility of the given channel.
 187    pub async fn set_channel_visibility(
 188        &self,
 189        channel_id: ChannelId,
 190        visibility: ChannelVisibility,
 191        admin_id: UserId,
 192    ) -> Result<channel::Model> {
 193        self.transaction(move |tx| async move {
 194            let channel = self.get_channel_internal(channel_id, &tx).await?;
 195            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 196                .await?;
 197
 198            if visibility == ChannelVisibility::Public {
 199                if let Some(parent_id) = channel.parent_id() {
 200                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 201
 202                    if parent.visibility != ChannelVisibility::Public {
 203                        Err(ErrorCode::BadPublicNesting
 204                            .with_tag("direction", "parent")
 205                            .anyhow())?;
 206                    }
 207                }
 208            } else if visibility == ChannelVisibility::Members
 209                && self
 210                    .get_channel_descendants_excluding_self([&channel], &tx)
 211                    .await?
 212                    .into_iter()
 213                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 214            {
 215                Err(ErrorCode::BadPublicNesting
 216                    .with_tag("direction", "children")
 217                    .anyhow())?;
 218            }
 219
 220            let mut model = channel.into_active_model();
 221            model.visibility = ActiveValue::Set(visibility);
 222            let channel = model.update(&*tx).await?;
 223
 224            Ok(channel)
 225        })
 226        .await
 227    }
 228
 229    #[cfg(test)]
 230    pub async fn set_channel_requires_zed_cla(
 231        &self,
 232        channel_id: ChannelId,
 233        requires_zed_cla: bool,
 234    ) -> Result<()> {
 235        self.transaction(move |tx| async move {
 236            let channel = self.get_channel_internal(channel_id, &tx).await?;
 237            let mut model = channel.into_active_model();
 238            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 239            model.update(&*tx).await?;
 240            Ok(())
 241        })
 242        .await
 243    }
 244
 245    /// Deletes the channel with the specified ID.
 246    pub async fn delete_channel(
 247        &self,
 248        channel_id: ChannelId,
 249        user_id: UserId,
 250    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 251        self.transaction(move |tx| async move {
 252            let channel = self.get_channel_internal(channel_id, &tx).await?;
 253            self.check_user_is_channel_admin(&channel, user_id, &tx)
 254                .await?;
 255
 256            let channels_to_remove = self
 257                .get_channel_descendants_excluding_self([&channel], &tx)
 258                .await?
 259                .into_iter()
 260                .map(|channel| channel.id)
 261                .chain(Some(channel_id))
 262                .collect::<Vec<_>>();
 263
 264            channel::Entity::delete_many()
 265                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 266                .exec(&*tx)
 267                .await?;
 268
 269            Ok((channel.root_id(), channels_to_remove))
 270        })
 271        .await
 272    }
 273
 274    /// Invites a user to a channel as a member.
 275    pub async fn invite_channel_member(
 276        &self,
 277        channel_id: ChannelId,
 278        invitee_id: UserId,
 279        inviter_id: UserId,
 280        role: ChannelRole,
 281    ) -> Result<InviteMemberResult> {
 282        self.transaction(move |tx| async move {
 283            let channel = self.get_channel_internal(channel_id, &tx).await?;
 284            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 285                .await?;
 286            if !channel.is_root() {
 287                Err(ErrorCode::NotARootChannel.anyhow())?
 288            }
 289
 290            channel_member::ActiveModel {
 291                id: ActiveValue::NotSet,
 292                channel_id: ActiveValue::Set(channel_id),
 293                user_id: ActiveValue::Set(invitee_id),
 294                accepted: ActiveValue::Set(false),
 295                role: ActiveValue::Set(role),
 296            }
 297            .insert(&*tx)
 298            .await?;
 299
 300            let channel = Channel::from_model(channel);
 301
 302            let notifications = self
 303                .create_notification(
 304                    invitee_id,
 305                    rpc::Notification::ChannelInvitation {
 306                        channel_id: channel_id.to_proto(),
 307                        channel_name: channel.name.clone(),
 308                        inviter_id: inviter_id.to_proto(),
 309                    },
 310                    true,
 311                    &tx,
 312                )
 313                .await?
 314                .into_iter()
 315                .collect();
 316
 317            Ok(InviteMemberResult {
 318                channel,
 319                notifications,
 320            })
 321        })
 322        .await
 323    }
 324
 325    fn sanitize_channel_name(name: &str) -> Result<&str> {
 326        let new_name = name.trim().trim_start_matches('#');
 327        if new_name.is_empty() {
 328            Err(anyhow!("channel name can't be blank"))?;
 329        }
 330        Ok(new_name)
 331    }
 332
 333    /// Renames the specified channel.
 334    pub async fn rename_channel(
 335        &self,
 336        channel_id: ChannelId,
 337        admin_id: UserId,
 338        new_name: &str,
 339    ) -> Result<channel::Model> {
 340        self.transaction(move |tx| async move {
 341            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 342
 343            let channel = self.get_channel_internal(channel_id, &tx).await?;
 344            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 345                .await?;
 346
 347            let mut model = channel.into_active_model();
 348            model.name = ActiveValue::Set(new_name.clone());
 349            let channel = model.update(&*tx).await?;
 350
 351            Ok(channel)
 352        })
 353        .await
 354    }
 355
 356    /// accept or decline an invite to join a channel
 357    pub async fn respond_to_channel_invite(
 358        &self,
 359        channel_id: ChannelId,
 360        user_id: UserId,
 361        accept: bool,
 362    ) -> Result<RespondToChannelInvite> {
 363        self.transaction(move |tx| async move {
 364            let channel = self.get_channel_internal(channel_id, &tx).await?;
 365
 366            let membership_update = if accept {
 367                let rows_affected = channel_member::Entity::update_many()
 368                    .set(channel_member::ActiveModel {
 369                        accepted: ActiveValue::Set(accept),
 370                        ..Default::default()
 371                    })
 372                    .filter(
 373                        channel_member::Column::ChannelId
 374                            .eq(channel_id)
 375                            .and(channel_member::Column::UserId.eq(user_id))
 376                            .and(channel_member::Column::Accepted.eq(false)),
 377                    )
 378                    .exec(&*tx)
 379                    .await?
 380                    .rows_affected;
 381
 382                if rows_affected == 0 {
 383                    Err(anyhow!("no such invitation"))?;
 384                }
 385
 386                Some(
 387                    self.calculate_membership_updated(&channel, user_id, &tx)
 388                        .await?,
 389                )
 390            } else {
 391                let rows_affected = channel_member::Entity::delete_many()
 392                    .filter(
 393                        channel_member::Column::ChannelId
 394                            .eq(channel_id)
 395                            .and(channel_member::Column::UserId.eq(user_id))
 396                            .and(channel_member::Column::Accepted.eq(false)),
 397                    )
 398                    .exec(&*tx)
 399                    .await?
 400                    .rows_affected;
 401                if rows_affected == 0 {
 402                    Err(anyhow!("no such invitation"))?;
 403                }
 404
 405                None
 406            };
 407
 408            Ok(RespondToChannelInvite {
 409                membership_update,
 410                notifications: self
 411                    .mark_notification_as_read_with_response(
 412                        user_id,
 413                        &rpc::Notification::ChannelInvitation {
 414                            channel_id: channel_id.to_proto(),
 415                            channel_name: Default::default(),
 416                            inviter_id: Default::default(),
 417                        },
 418                        accept,
 419                        &tx,
 420                    )
 421                    .await?
 422                    .into_iter()
 423                    .collect(),
 424            })
 425        })
 426        .await
 427    }
 428
 429    async fn calculate_membership_updated(
 430        &self,
 431        channel: &channel::Model,
 432        user_id: UserId,
 433        tx: &DatabaseTransaction,
 434    ) -> Result<MembershipUpdated> {
 435        let new_channels = self
 436            .get_user_channels(user_id, Some(channel), false, tx)
 437            .await?;
 438        let removed_channels = self
 439            .get_channel_descendants_excluding_self([channel], tx)
 440            .await?
 441            .into_iter()
 442            .map(|channel| channel.id)
 443            .chain([channel.id])
 444            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 445            .collect::<Vec<_>>();
 446
 447        Ok(MembershipUpdated {
 448            channel_id: channel.id,
 449            new_channels,
 450            removed_channels,
 451        })
 452    }
 453
 454    /// Removes a channel member.
 455    pub async fn remove_channel_member(
 456        &self,
 457        channel_id: ChannelId,
 458        member_id: UserId,
 459        admin_id: UserId,
 460    ) -> Result<RemoveChannelMemberResult> {
 461        self.transaction(|tx| async move {
 462            let channel = self.get_channel_internal(channel_id, &tx).await?;
 463
 464            if member_id != admin_id {
 465                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 466                    .await?;
 467            }
 468
 469            let result = channel_member::Entity::delete_many()
 470                .filter(
 471                    channel_member::Column::ChannelId
 472                        .eq(channel_id)
 473                        .and(channel_member::Column::UserId.eq(member_id)),
 474                )
 475                .exec(&*tx)
 476                .await?;
 477
 478            if result.rows_affected == 0 {
 479                Err(anyhow!("no such member"))?;
 480            }
 481
 482            Ok(RemoveChannelMemberResult {
 483                membership_update: self
 484                    .calculate_membership_updated(&channel, member_id, &tx)
 485                    .await?,
 486                notification_id: self
 487                    .remove_notification(
 488                        member_id,
 489                        rpc::Notification::ChannelInvitation {
 490                            channel_id: channel_id.to_proto(),
 491                            channel_name: Default::default(),
 492                            inviter_id: Default::default(),
 493                        },
 494                        &tx,
 495                    )
 496                    .await?,
 497            })
 498        })
 499        .await
 500    }
 501
 502    /// Returns all channels for the user with the given ID.
 503    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 504        self.weak_transaction(
 505            |tx| async move { self.get_user_channels(user_id, None, true, &tx).await },
 506        )
 507        .await
 508    }
 509
 510    /// Returns all channels for the user with the given ID that are descendants
 511    /// of the specified ancestor channel.
 512    pub async fn get_user_channels(
 513        &self,
 514        user_id: UserId,
 515        ancestor_channel: Option<&channel::Model>,
 516        include_invites: bool,
 517        tx: &DatabaseTransaction,
 518    ) -> Result<ChannelsForUser> {
 519        let mut filter = channel_member::Column::UserId.eq(user_id);
 520        if !include_invites {
 521            filter = filter.and(channel_member::Column::Accepted.eq(true))
 522        }
 523        if let Some(ancestor) = ancestor_channel {
 524            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 525        }
 526
 527        let mut channels = Vec::<channel::Model>::new();
 528        let mut invited_channels = Vec::<Channel>::new();
 529        let mut channel_memberships = Vec::<channel_member::Model>::new();
 530        let mut rows = channel_member::Entity::find()
 531            .filter(filter)
 532            .inner_join(channel::Entity)
 533            .select_also(channel::Entity)
 534            .stream(tx)
 535            .await?;
 536        while let Some(row) = rows.next().await {
 537            if let (membership, Some(channel)) = row? {
 538                if membership.accepted {
 539                    channel_memberships.push(membership);
 540                    channels.push(channel);
 541                } else {
 542                    invited_channels.push(Channel::from_model(channel));
 543                }
 544            }
 545        }
 546        drop(rows);
 547
 548        let mut descendants = self
 549            .get_channel_descendants_excluding_self(channels.iter(), tx)
 550            .await?;
 551
 552        descendants.extend(channels);
 553
 554        let roles_by_channel_id = channel_memberships
 555            .iter()
 556            .map(|membership| (membership.channel_id, membership.role))
 557            .collect::<HashMap<_, _>>();
 558
 559        let channels: Vec<Channel> = descendants
 560            .into_iter()
 561            .filter_map(|channel| {
 562                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 563                if parent_role.can_see_channel(channel.visibility) {
 564                    Some(Channel::from_model(channel))
 565                } else {
 566                    None
 567                }
 568            })
 569            .collect();
 570
 571        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 572        enum QueryUserIdsAndChannelIds {
 573            ChannelId,
 574            UserId,
 575        }
 576
 577        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 578        {
 579            let mut rows = room_participant::Entity::find()
 580                .inner_join(room::Entity)
 581                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 582                .select_only()
 583                .column(room::Column::ChannelId)
 584                .column(room_participant::Column::UserId)
 585                .into_values::<_, QueryUserIdsAndChannelIds>()
 586                .stream(tx)
 587                .await?;
 588            while let Some(row) = rows.next().await {
 589                let row: (ChannelId, UserId) = row?;
 590                channel_participants.entry(row.0).or_default().push(row.1)
 591            }
 592        }
 593
 594        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 595
 596        let mut channel_ids_by_buffer_id = HashMap::default();
 597        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 598        let mut rows = buffer::Entity::find()
 599            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 600            .stream(tx)
 601            .await?;
 602        while let Some(row) = rows.next().await {
 603            let row = row?;
 604            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 605            latest_buffer_versions.push(ChannelBufferVersion {
 606                channel_id: row.channel_id.0 as u64,
 607                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 608                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 609                    .latest_operation_lamport_timestamp
 610                    .zip(row.latest_operation_replica_id)
 611                {
 612                    vec![VectorClockEntry {
 613                        timestamp: latest_lamport_timestamp as u32,
 614                        replica_id: latest_replica_id as u32,
 615                    }]
 616                } else {
 617                    vec![]
 618                },
 619            });
 620        }
 621        drop(rows);
 622
 623        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 624
 625        let observed_buffer_versions = self
 626            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 627            .await?;
 628
 629        let observed_channel_messages = self
 630            .observed_channel_messages(&channel_ids, user_id, tx)
 631            .await?;
 632
 633        Ok(ChannelsForUser {
 634            channel_memberships,
 635            channels,
 636            invited_channels,
 637            channel_participants,
 638            latest_buffer_versions,
 639            latest_channel_messages,
 640            observed_buffer_versions,
 641            observed_channel_messages,
 642        })
 643    }
 644
 645    /// Sets the role for the specified channel member.
 646    pub async fn set_channel_member_role(
 647        &self,
 648        channel_id: ChannelId,
 649        admin_id: UserId,
 650        for_user: UserId,
 651        role: ChannelRole,
 652    ) -> Result<SetMemberRoleResult> {
 653        self.transaction(|tx| async move {
 654            let channel = self.get_channel_internal(channel_id, &tx).await?;
 655            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 656                .await?;
 657
 658            let membership = channel_member::Entity::find()
 659                .filter(
 660                    channel_member::Column::ChannelId
 661                        .eq(channel_id)
 662                        .and(channel_member::Column::UserId.eq(for_user)),
 663                )
 664                .one(&*tx)
 665                .await?
 666                .context("no such member")?;
 667
 668            let mut update = membership.into_active_model();
 669            update.role = ActiveValue::Set(role);
 670            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 671
 672            if updated.accepted {
 673                Ok(SetMemberRoleResult::MembershipUpdated(
 674                    self.calculate_membership_updated(&channel, for_user, &tx)
 675                        .await?,
 676                ))
 677            } else {
 678                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 679                    channel,
 680                )))
 681            }
 682        })
 683        .await
 684    }
 685
 686    /// Returns the details for the specified channel member.
 687    pub async fn get_channel_participant_details(
 688        &self,
 689        channel_id: ChannelId,
 690        filter: &str,
 691        limit: u64,
 692        user_id: UserId,
 693    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
 694        let members = self
 695            .transaction(move |tx| async move {
 696                let channel = self.get_channel_internal(channel_id, &tx).await?;
 697                self.check_user_is_channel_participant(&channel, user_id, &tx)
 698                    .await?;
 699                let mut query = channel_member::Entity::find()
 700                    .find_also_related(user::Entity)
 701                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
 702
 703                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
 704                    query = query.filter(Expr::cust_with_values(
 705                        "UPPER(github_login) LIKE ?",
 706                        [Self::fuzzy_like_string(&filter.to_uppercase())],
 707                    ))
 708                } else {
 709                    query = query.filter(Expr::cust_with_values(
 710                        "github_login ILIKE $1",
 711                        [Self::fuzzy_like_string(filter)],
 712                    ))
 713                }
 714                let members = query.order_by(
 715                        Expr::cust(
 716                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
 717                        ),
 718                        sea_orm::Order::Asc,
 719                    )
 720                    .limit(limit)
 721                    .all(&*tx)
 722                    .await?;
 723
 724                Ok(members)
 725            })
 726            .await?;
 727
 728        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
 729
 730        let members = members
 731            .into_iter()
 732            .map(|(member, user)| {
 733                if let Some(user) = user {
 734                    users.push(proto::User {
 735                        id: user.id.to_proto(),
 736                        avatar_url: format!(
 737                            "https://avatars.githubusercontent.com/u/{}?s=128&v=4",
 738                            user.github_user_id
 739                        ),
 740                        github_login: user.github_login,
 741                        name: user.name,
 742                    })
 743                }
 744                proto::ChannelMember {
 745                    role: member.role.into(),
 746                    user_id: member.user_id.to_proto(),
 747                    kind: if member.accepted {
 748                        Kind::Member
 749                    } else {
 750                        Kind::Invitee
 751                    }
 752                    .into(),
 753                }
 754            })
 755            .collect();
 756
 757        Ok((members, users))
 758    }
 759
 760    /// Returns whether the given user is an admin in the specified channel.
 761    pub async fn check_user_is_channel_admin(
 762        &self,
 763        channel: &channel::Model,
 764        user_id: UserId,
 765        tx: &DatabaseTransaction,
 766    ) -> Result<ChannelRole> {
 767        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 768        match role {
 769            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 770            Some(ChannelRole::Member)
 771            | Some(ChannelRole::Talker)
 772            | Some(ChannelRole::Banned)
 773            | Some(ChannelRole::Guest)
 774            | None => Err(anyhow!(
 775                "user is not a channel admin or channel does not exist"
 776            ))?,
 777        }
 778    }
 779
 780    /// Returns whether the given user is a member of the specified channel.
 781    pub async fn check_user_is_channel_member(
 782        &self,
 783        channel: &channel::Model,
 784        user_id: UserId,
 785        tx: &DatabaseTransaction,
 786    ) -> Result<ChannelRole> {
 787        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 788        match channel_role {
 789            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 790            Some(ChannelRole::Banned)
 791            | Some(ChannelRole::Guest)
 792            | Some(ChannelRole::Talker)
 793            | None => Err(anyhow!(
 794                "user is not a channel member or channel does not exist"
 795            ))?,
 796        }
 797    }
 798
 799    /// Returns whether the given user is a participant in the specified channel.
 800    pub async fn check_user_is_channel_participant(
 801        &self,
 802        channel: &channel::Model,
 803        user_id: UserId,
 804        tx: &DatabaseTransaction,
 805    ) -> Result<ChannelRole> {
 806        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 807        match role {
 808            Some(ChannelRole::Admin)
 809            | Some(ChannelRole::Member)
 810            | Some(ChannelRole::Guest)
 811            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 812            Some(ChannelRole::Banned) | None => Err(anyhow!(
 813                "user is not a channel participant or channel does not exist"
 814            ))?,
 815        }
 816    }
 817
 818    /// Returns a user's pending invite for the given channel, if one exists.
 819    pub async fn pending_invite_for_channel(
 820        &self,
 821        channel: &channel::Model,
 822        user_id: UserId,
 823        tx: &DatabaseTransaction,
 824    ) -> Result<Option<channel_member::Model>> {
 825        let row = channel_member::Entity::find()
 826            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 827            .filter(channel_member::Column::UserId.eq(user_id))
 828            .filter(channel_member::Column::Accepted.eq(false))
 829            .one(tx)
 830            .await?;
 831
 832        Ok(row)
 833    }
 834
 835    /// Returns the role for a user in the given channel.
 836    pub async fn channel_role_for_user(
 837        &self,
 838        channel: &channel::Model,
 839        user_id: UserId,
 840        tx: &DatabaseTransaction,
 841    ) -> Result<Option<ChannelRole>> {
 842        let membership = channel_member::Entity::find()
 843            .filter(
 844                channel_member::Column::ChannelId
 845                    .eq(channel.root_id())
 846                    .and(channel_member::Column::UserId.eq(user_id))
 847                    .and(channel_member::Column::Accepted.eq(true)),
 848            )
 849            .one(tx)
 850            .await?;
 851
 852        let Some(membership) = membership else {
 853            return Ok(None);
 854        };
 855
 856        if !membership.role.can_see_channel(channel.visibility) {
 857            return Ok(None);
 858        }
 859
 860        Ok(Some(membership.role))
 861    }
 862
 863    // Get the descendants of the given set if channels, ordered by their
 864    // path.
 865    pub(crate) async fn get_channel_descendants_excluding_self(
 866        &self,
 867        channels: impl IntoIterator<Item = &channel::Model>,
 868        tx: &DatabaseTransaction,
 869    ) -> Result<Vec<channel::Model>> {
 870        let mut filter = Condition::any();
 871        for channel in channels.into_iter() {
 872            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 873        }
 874
 875        if filter.is_empty() {
 876            return Ok(vec![]);
 877        }
 878
 879        Ok(channel::Entity::find()
 880            .filter(filter)
 881            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 882            .all(tx)
 883            .await?)
 884    }
 885
 886    /// Returns the channel with the given ID.
 887    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 888        self.transaction(|tx| async move {
 889            let channel = self.get_channel_internal(channel_id, &tx).await?;
 890            self.check_user_is_channel_participant(&channel, user_id, &tx)
 891                .await?;
 892
 893            Ok(Channel::from_model(channel))
 894        })
 895        .await
 896    }
 897
 898    pub(crate) async fn get_channel_internal(
 899        &self,
 900        channel_id: ChannelId,
 901        tx: &DatabaseTransaction,
 902    ) -> Result<channel::Model> {
 903        Ok(channel::Entity::find_by_id(channel_id)
 904            .one(tx)
 905            .await?
 906            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 907    }
 908
 909    pub(crate) async fn get_or_create_channel_room(
 910        &self,
 911        channel_id: ChannelId,
 912        livekit_room: &str,
 913        tx: &DatabaseTransaction,
 914    ) -> Result<RoomId> {
 915        let room = room::Entity::find()
 916            .filter(room::Column::ChannelId.eq(channel_id))
 917            .one(tx)
 918            .await?;
 919
 920        let room_id = if let Some(room) = room {
 921            room.id
 922        } else {
 923            let result = room::Entity::insert(room::ActiveModel {
 924                channel_id: ActiveValue::Set(Some(channel_id)),
 925                live_kit_room: ActiveValue::Set(livekit_room.to_string()),
 926                ..Default::default()
 927            })
 928            .exec(tx)
 929            .await?;
 930
 931            result.last_insert_id
 932        };
 933
 934        Ok(room_id)
 935    }
 936
 937    /// Move a channel from one parent to another
 938    pub async fn move_channel(
 939        &self,
 940        channel_id: ChannelId,
 941        new_parent_id: ChannelId,
 942        admin_id: UserId,
 943    ) -> Result<(ChannelId, Vec<Channel>)> {
 944        self.transaction(|tx| async move {
 945            let channel = self.get_channel_internal(channel_id, &tx).await?;
 946            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 947                .await?;
 948            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 949
 950            if new_parent.root_id() != channel.root_id() {
 951                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 952            }
 953
 954            if new_parent
 955                .ancestors_including_self()
 956                .any(|id| id == channel.id)
 957            {
 958                Err(anyhow!(ErrorCode::CircularNesting))?;
 959            }
 960
 961            if channel.visibility == ChannelVisibility::Public
 962                && new_parent.visibility != ChannelVisibility::Public
 963            {
 964                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 965            }
 966
 967            let root_id = channel.root_id();
 968            let new_parent_path = new_parent.path();
 969            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 970            let new_path = format!("{}{}/", &new_parent_path, channel.id);
 971            let new_order = max_order(&new_parent_path, &tx).await? + 1;
 972
 973            let mut model = channel.into_active_model();
 974            model.parent_path = ActiveValue::Set(new_parent.path());
 975            model.channel_order = ActiveValue::Set(new_order);
 976            let channel = model.update(&*tx).await?;
 977
 978            let descendent_ids =
 979                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 980                    self.pool.get_database_backend(),
 981                    "
 982                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 983                    WHERE parent_path LIKE $3 || '%'
 984                    RETURNING id
 985                ",
 986                    [old_path.clone().into(), new_path.into(), old_path.into()],
 987                ))
 988                .all(&*tx)
 989                .await?;
 990
 991            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 992
 993            let channels = channel::Entity::find()
 994                .filter(channel::Column::Id.is_in(all_moved_ids))
 995                .all(&*tx)
 996                .await?
 997                .into_iter()
 998                .map(Channel::from_model)
 999                .collect::<Vec<_>>();
1000
1001            Ok((root_id, channels))
1002        })
1003        .await
1004    }
1005
1006    pub async fn reorder_channel(
1007        &self,
1008        channel_id: ChannelId,
1009        direction: proto::reorder_channel::Direction,
1010        user_id: UserId,
1011    ) -> Result<Vec<Channel>> {
1012        self.transaction(|tx| async move {
1013            let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1014
1015            if channel.is_root() {
1016                log::info!("Skipping reorder of root channel {}", channel.id,);
1017                return Ok(vec![]);
1018            }
1019
1020            log::info!(
1021                "Reordering channel {} (parent_path: '{}', order: {})",
1022                channel.id,
1023                channel.parent_path,
1024                channel.channel_order
1025            );
1026
1027            // Check if user is admin of the channel
1028            self.check_user_is_channel_admin(&channel, user_id, &tx)
1029                .await?;
1030
1031            // Find the sibling channel to swap with
1032            let sibling_channel = match direction {
1033                proto::reorder_channel::Direction::Up => {
1034                    log::info!(
1035                        "Looking for sibling with parent_path='{}' and order < {}",
1036                        channel.parent_path,
1037                        channel.channel_order
1038                    );
1039                    // Find channel with highest order less than current
1040                    channel::Entity::find()
1041                        .filter(
1042                            channel::Column::ParentPath
1043                                .eq(&channel.parent_path)
1044                                .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1045                        )
1046                        .order_by_desc(channel::Column::ChannelOrder)
1047                        .one(&*tx)
1048                        .await?
1049                }
1050                proto::reorder_channel::Direction::Down => {
1051                    log::info!(
1052                        "Looking for sibling with parent_path='{}' and order > {}",
1053                        channel.parent_path,
1054                        channel.channel_order
1055                    );
1056                    // Find channel with lowest order greater than current
1057                    channel::Entity::find()
1058                        .filter(
1059                            channel::Column::ParentPath
1060                                .eq(&channel.parent_path)
1061                                .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1062                        )
1063                        .order_by_asc(channel::Column::ChannelOrder)
1064                        .one(&*tx)
1065                        .await?
1066                }
1067            };
1068
1069            let mut sibling_channel = match sibling_channel {
1070                Some(sibling) => {
1071                    log::info!(
1072                        "Found sibling {} (parent_path: '{}', order: {})",
1073                        sibling.id,
1074                        sibling.parent_path,
1075                        sibling.channel_order
1076                    );
1077                    sibling
1078                }
1079                None => {
1080                    log::warn!("No sibling found to swap with");
1081                    // No sibling to swap with
1082                    return Ok(vec![]);
1083                }
1084            };
1085
1086            let current_order = channel.channel_order;
1087            let sibling_order = sibling_channel.channel_order;
1088
1089            channel::ActiveModel {
1090                id: ActiveValue::Unchanged(sibling_channel.id),
1091                channel_order: ActiveValue::Set(current_order),
1092                ..Default::default()
1093            }
1094            .update(&*tx)
1095            .await?;
1096            sibling_channel.channel_order = current_order;
1097
1098            channel::ActiveModel {
1099                id: ActiveValue::Unchanged(channel.id),
1100                channel_order: ActiveValue::Set(sibling_order),
1101                ..Default::default()
1102            }
1103            .update(&*tx)
1104            .await?;
1105            channel.channel_order = sibling_order;
1106
1107            log::info!(
1108                "Reorder complete. Swapped channels {} and {}",
1109                channel.id,
1110                sibling_channel.id
1111            );
1112
1113            let swapped_channels = vec![
1114                Channel::from_model(channel),
1115                Channel::from_model(sibling_channel),
1116            ];
1117
1118            Ok(swapped_channels)
1119        })
1120        .await
1121    }
1122}
1123
1124async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1125    let max_order = channel::Entity::find()
1126        .filter(channel::Column::ParentPath.eq(parent_path))
1127        .select_only()
1128        .column_as(channel::Column::ChannelOrder.max(), "max_order")
1129        .into_tuple::<Option<i32>>()
1130        .one(&**tx)
1131        .await?
1132        .flatten()
1133        .unwrap_or(0);
1134
1135    Ok(max_order)
1136}
1137
1138#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1139enum QueryIds {
1140    Id,
1141}
1142
1143#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1144enum QueryUserIds {
1145    UserId,
1146}