channels.rs

   1use super::*;
   2use anyhow::Context as _;
   3use rpc::{
   4    ErrorCode, ErrorCodeExt,
   5    proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
   6};
   7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
   8
   9impl Database {
  10    #[cfg(test)]
  11    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  12        self.transaction(move |tx| async move {
  13            let mut channels = Vec::new();
  14            let mut rows = channel::Entity::find().stream(&*tx).await?;
  15            while let Some(row) = rows.next().await {
  16                let row = row?;
  17                channels.push((row.id, row.name));
  18            }
  19            Ok(channels)
  20        })
  21        .await
  22    }
  23
  24    #[cfg(test)]
  25    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  26        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .0
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let parent_path = parent
  63                .as_ref()
  64                .map_or(String::new(), |parent| parent.path());
  65
  66            // Find the maximum channel_order among siblings to set the new channel at the end
  67            let max_order = if parent_path.is_empty() {
  68                0
  69            } else {
  70                max_order(&parent_path, &tx).await?
  71            };
  72
  73            log::info!(
  74                "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
  75                name,
  76                parent_path,
  77                max_order,
  78                max_order + 1
  79            );
  80
  81            let channel = channel::ActiveModel {
  82                id: ActiveValue::NotSet,
  83                name: ActiveValue::Set(name.to_string()),
  84                visibility: ActiveValue::Set(ChannelVisibility::Members),
  85                parent_path: ActiveValue::Set(parent_path),
  86                requires_zed_cla: ActiveValue::NotSet,
  87                channel_order: ActiveValue::Set(max_order + 1),
  88            }
  89            .insert(&*tx)
  90            .await?;
  91
  92            if parent.is_none() {
  93                membership = Some(
  94                    channel_member::ActiveModel {
  95                        id: ActiveValue::NotSet,
  96                        channel_id: ActiveValue::Set(channel.id),
  97                        user_id: ActiveValue::Set(admin_id),
  98                        accepted: ActiveValue::Set(true),
  99                        role: ActiveValue::Set(ChannelRole::Admin),
 100                    }
 101                    .insert(&*tx)
 102                    .await?,
 103                );
 104            }
 105
 106            Ok((channel, membership))
 107        })
 108        .await
 109    }
 110
 111    /// Adds a user to the specified channel.
 112    pub async fn join_channel(
 113        &self,
 114        channel_id: ChannelId,
 115        user_id: UserId,
 116        connection: ConnectionId,
 117    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 118        self.transaction(move |tx| async move {
 119            let channel = self.get_channel_internal(channel_id, &tx).await?;
 120            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 121
 122            let mut accept_invite_result = None;
 123
 124            if role.is_none() {
 125                if let Some(invitation) = self
 126                    .pending_invite_for_channel(&channel, user_id, &tx)
 127                    .await?
 128                {
 129                    // note, this may be a parent channel
 130                    role = Some(invitation.role);
 131                    channel_member::Entity::update(channel_member::ActiveModel {
 132                        accepted: ActiveValue::Set(true),
 133                        ..invitation.into_active_model()
 134                    })
 135                    .exec(&*tx)
 136                    .await?;
 137
 138                    accept_invite_result = Some(
 139                        self.calculate_membership_updated(&channel, user_id, &tx)
 140                            .await?,
 141                    );
 142
 143                    debug_assert!(
 144                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 145                    );
 146                } else if channel.visibility == ChannelVisibility::Public {
 147                    role = Some(ChannelRole::Guest);
 148                    channel_member::Entity::insert(channel_member::ActiveModel {
 149                        id: ActiveValue::NotSet,
 150                        channel_id: ActiveValue::Set(channel.root_id()),
 151                        user_id: ActiveValue::Set(user_id),
 152                        accepted: ActiveValue::Set(true),
 153                        role: ActiveValue::Set(ChannelRole::Guest),
 154                    })
 155                    .exec(&*tx)
 156                    .await?;
 157
 158                    accept_invite_result = Some(
 159                        self.calculate_membership_updated(&channel, user_id, &tx)
 160                            .await?,
 161                    );
 162
 163                    debug_assert!(
 164                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 165                    );
 166                }
 167            }
 168
 169            if role.is_none() || role == Some(ChannelRole::Banned) {
 170                Err(ErrorCode::Forbidden.anyhow())?
 171            }
 172            let role = role.unwrap();
 173
 174            let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
 175            let room_id = self
 176                .get_or_create_channel_room(channel_id, &livekit_room, &tx)
 177                .await?;
 178
 179            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 180                .await
 181                .map(|jr| (jr, accept_invite_result, role))
 182        })
 183        .await
 184    }
 185
 186    /// Sets the visibility of the given channel.
 187    pub async fn set_channel_visibility(
 188        &self,
 189        channel_id: ChannelId,
 190        visibility: ChannelVisibility,
 191        admin_id: UserId,
 192    ) -> Result<channel::Model> {
 193        self.transaction(move |tx| async move {
 194            let channel = self.get_channel_internal(channel_id, &tx).await?;
 195            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 196                .await?;
 197
 198            if visibility == ChannelVisibility::Public {
 199                if let Some(parent_id) = channel.parent_id() {
 200                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 201
 202                    if parent.visibility != ChannelVisibility::Public {
 203                        Err(ErrorCode::BadPublicNesting
 204                            .with_tag("direction", "parent")
 205                            .anyhow())?;
 206                    }
 207                }
 208            } else if visibility == ChannelVisibility::Members
 209                && self
 210                    .get_channel_descendants_excluding_self([&channel], &tx)
 211                    .await?
 212                    .into_iter()
 213                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 214            {
 215                Err(ErrorCode::BadPublicNesting
 216                    .with_tag("direction", "children")
 217                    .anyhow())?;
 218            }
 219
 220            let mut model = channel.into_active_model();
 221            model.visibility = ActiveValue::Set(visibility);
 222            let channel = model.update(&*tx).await?;
 223
 224            Ok(channel)
 225        })
 226        .await
 227    }
 228
 229    #[cfg(test)]
 230    pub async fn set_channel_requires_zed_cla(
 231        &self,
 232        channel_id: ChannelId,
 233        requires_zed_cla: bool,
 234    ) -> Result<()> {
 235        self.transaction(move |tx| async move {
 236            let channel = self.get_channel_internal(channel_id, &tx).await?;
 237            let mut model = channel.into_active_model();
 238            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 239            model.update(&*tx).await?;
 240            Ok(())
 241        })
 242        .await
 243    }
 244
 245    /// Deletes the channel with the specified ID.
 246    pub async fn delete_channel(
 247        &self,
 248        channel_id: ChannelId,
 249        user_id: UserId,
 250    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 251        self.transaction(move |tx| async move {
 252            let channel = self.get_channel_internal(channel_id, &tx).await?;
 253            self.check_user_is_channel_admin(&channel, user_id, &tx)
 254                .await?;
 255
 256            let channels_to_remove = self
 257                .get_channel_descendants_excluding_self([&channel], &tx)
 258                .await?
 259                .into_iter()
 260                .map(|channel| channel.id)
 261                .chain(Some(channel_id))
 262                .collect::<Vec<_>>();
 263
 264            channel::Entity::delete_many()
 265                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 266                .exec(&*tx)
 267                .await?;
 268
 269            Ok((channel.root_id(), channels_to_remove))
 270        })
 271        .await
 272    }
 273
 274    /// Invites a user to a channel as a member.
 275    pub async fn invite_channel_member(
 276        &self,
 277        channel_id: ChannelId,
 278        invitee_id: UserId,
 279        inviter_id: UserId,
 280        role: ChannelRole,
 281    ) -> Result<InviteMemberResult> {
 282        self.transaction(move |tx| async move {
 283            let channel = self.get_channel_internal(channel_id, &tx).await?;
 284            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 285                .await?;
 286            if !channel.is_root() {
 287                Err(ErrorCode::NotARootChannel.anyhow())?
 288            }
 289
 290            channel_member::ActiveModel {
 291                id: ActiveValue::NotSet,
 292                channel_id: ActiveValue::Set(channel_id),
 293                user_id: ActiveValue::Set(invitee_id),
 294                accepted: ActiveValue::Set(false),
 295                role: ActiveValue::Set(role),
 296            }
 297            .insert(&*tx)
 298            .await?;
 299
 300            let channel = Channel::from_model(channel);
 301
 302            let notifications = self
 303                .create_notification(
 304                    invitee_id,
 305                    rpc::Notification::ChannelInvitation {
 306                        channel_id: channel_id.to_proto(),
 307                        channel_name: channel.name.clone(),
 308                        inviter_id: inviter_id.to_proto(),
 309                    },
 310                    true,
 311                    &tx,
 312                )
 313                .await?
 314                .into_iter()
 315                .collect();
 316
 317            Ok(InviteMemberResult {
 318                channel,
 319                notifications,
 320            })
 321        })
 322        .await
 323    }
 324
 325    fn sanitize_channel_name(name: &str) -> Result<&str> {
 326        let new_name = name.trim().trim_start_matches('#');
 327        if new_name.is_empty() {
 328            Err(anyhow!("channel name can't be blank"))?;
 329        }
 330        Ok(new_name)
 331    }
 332
 333    /// Renames the specified channel.
 334    pub async fn rename_channel(
 335        &self,
 336        channel_id: ChannelId,
 337        admin_id: UserId,
 338        new_name: &str,
 339    ) -> Result<channel::Model> {
 340        self.transaction(move |tx| async move {
 341            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 342
 343            let channel = self.get_channel_internal(channel_id, &tx).await?;
 344            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 345                .await?;
 346
 347            let mut model = channel.into_active_model();
 348            model.name = ActiveValue::Set(new_name.clone());
 349            let channel = model.update(&*tx).await?;
 350
 351            Ok(channel)
 352        })
 353        .await
 354    }
 355
 356    /// accept or decline an invite to join a channel
 357    pub async fn respond_to_channel_invite(
 358        &self,
 359        channel_id: ChannelId,
 360        user_id: UserId,
 361        accept: bool,
 362    ) -> Result<RespondToChannelInvite> {
 363        self.transaction(move |tx| async move {
 364            let channel = self.get_channel_internal(channel_id, &tx).await?;
 365
 366            let membership_update = if accept {
 367                let rows_affected = channel_member::Entity::update_many()
 368                    .set(channel_member::ActiveModel {
 369                        accepted: ActiveValue::Set(accept),
 370                        ..Default::default()
 371                    })
 372                    .filter(
 373                        channel_member::Column::ChannelId
 374                            .eq(channel_id)
 375                            .and(channel_member::Column::UserId.eq(user_id))
 376                            .and(channel_member::Column::Accepted.eq(false)),
 377                    )
 378                    .exec(&*tx)
 379                    .await?
 380                    .rows_affected;
 381
 382                if rows_affected == 0 {
 383                    Err(anyhow!("no such invitation"))?;
 384                }
 385
 386                Some(
 387                    self.calculate_membership_updated(&channel, user_id, &tx)
 388                        .await?,
 389                )
 390            } else {
 391                let rows_affected = channel_member::Entity::delete_many()
 392                    .filter(
 393                        channel_member::Column::ChannelId
 394                            .eq(channel_id)
 395                            .and(channel_member::Column::UserId.eq(user_id))
 396                            .and(channel_member::Column::Accepted.eq(false)),
 397                    )
 398                    .exec(&*tx)
 399                    .await?
 400                    .rows_affected;
 401                if rows_affected == 0 {
 402                    Err(anyhow!("no such invitation"))?;
 403                }
 404
 405                None
 406            };
 407
 408            Ok(RespondToChannelInvite {
 409                membership_update,
 410                notifications: self
 411                    .mark_notification_as_read_with_response(
 412                        user_id,
 413                        &rpc::Notification::ChannelInvitation {
 414                            channel_id: channel_id.to_proto(),
 415                            channel_name: Default::default(),
 416                            inviter_id: Default::default(),
 417                        },
 418                        accept,
 419                        &tx,
 420                    )
 421                    .await?
 422                    .into_iter()
 423                    .collect(),
 424            })
 425        })
 426        .await
 427    }
 428
 429    async fn calculate_membership_updated(
 430        &self,
 431        channel: &channel::Model,
 432        user_id: UserId,
 433        tx: &DatabaseTransaction,
 434    ) -> Result<MembershipUpdated> {
 435        let new_channels = self
 436            .get_user_channels(user_id, Some(channel), false, tx)
 437            .await?;
 438        let removed_channels = self
 439            .get_channel_descendants_excluding_self([channel], tx)
 440            .await?
 441            .into_iter()
 442            .map(|channel| channel.id)
 443            .chain([channel.id])
 444            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 445            .collect::<Vec<_>>();
 446
 447        Ok(MembershipUpdated {
 448            channel_id: channel.id,
 449            new_channels,
 450            removed_channels,
 451        })
 452    }
 453
 454    /// Removes a channel member.
 455    pub async fn remove_channel_member(
 456        &self,
 457        channel_id: ChannelId,
 458        member_id: UserId,
 459        admin_id: UserId,
 460    ) -> Result<RemoveChannelMemberResult> {
 461        self.transaction(|tx| async move {
 462            let channel = self.get_channel_internal(channel_id, &tx).await?;
 463
 464            if member_id != admin_id {
 465                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 466                    .await?;
 467            }
 468
 469            let result = channel_member::Entity::delete_many()
 470                .filter(
 471                    channel_member::Column::ChannelId
 472                        .eq(channel_id)
 473                        .and(channel_member::Column::UserId.eq(member_id)),
 474                )
 475                .exec(&*tx)
 476                .await?;
 477
 478            if result.rows_affected == 0 {
 479                Err(anyhow!("no such member"))?;
 480            }
 481
 482            Ok(RemoveChannelMemberResult {
 483                membership_update: self
 484                    .calculate_membership_updated(&channel, member_id, &tx)
 485                    .await?,
 486                notification_id: self
 487                    .remove_notification(
 488                        member_id,
 489                        rpc::Notification::ChannelInvitation {
 490                            channel_id: channel_id.to_proto(),
 491                            channel_name: Default::default(),
 492                            inviter_id: Default::default(),
 493                        },
 494                        &tx,
 495                    )
 496                    .await?,
 497            })
 498        })
 499        .await
 500    }
 501
 502    /// Returns all channels for the user with the given ID.
 503    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 504        self.weak_transaction(
 505            |tx| async move { self.get_user_channels(user_id, None, true, &tx).await },
 506        )
 507        .await
 508    }
 509
 510    /// Returns all channels for the user with the given ID that are descendants
 511    /// of the specified ancestor channel.
 512    pub async fn get_user_channels(
 513        &self,
 514        user_id: UserId,
 515        ancestor_channel: Option<&channel::Model>,
 516        include_invites: bool,
 517        tx: &DatabaseTransaction,
 518    ) -> Result<ChannelsForUser> {
 519        let mut filter = channel_member::Column::UserId.eq(user_id);
 520        if !include_invites {
 521            filter = filter.and(channel_member::Column::Accepted.eq(true))
 522        }
 523        if let Some(ancestor) = ancestor_channel {
 524            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 525        }
 526
 527        let mut channels = Vec::<channel::Model>::new();
 528        let mut invited_channels = Vec::<Channel>::new();
 529        let mut channel_memberships = Vec::<channel_member::Model>::new();
 530        let mut rows = channel_member::Entity::find()
 531            .filter(filter)
 532            .inner_join(channel::Entity)
 533            .select_also(channel::Entity)
 534            .stream(tx)
 535            .await?;
 536        while let Some(row) = rows.next().await {
 537            if let (membership, Some(channel)) = row? {
 538                if membership.accepted {
 539                    channel_memberships.push(membership);
 540                    channels.push(channel);
 541                } else {
 542                    invited_channels.push(Channel::from_model(channel));
 543                }
 544            }
 545        }
 546        drop(rows);
 547
 548        let mut descendants = self
 549            .get_channel_descendants_excluding_self(channels.iter(), tx)
 550            .await?;
 551
 552        descendants.extend(channels);
 553
 554        let roles_by_channel_id = channel_memberships
 555            .iter()
 556            .map(|membership| (membership.channel_id, membership.role))
 557            .collect::<HashMap<_, _>>();
 558
 559        let channels: Vec<Channel> = descendants
 560            .into_iter()
 561            .filter_map(|channel| {
 562                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 563                if parent_role.can_see_channel(channel.visibility) {
 564                    Some(Channel::from_model(channel))
 565                } else {
 566                    None
 567                }
 568            })
 569            .collect();
 570
 571        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 572        enum QueryUserIdsAndChannelIds {
 573            ChannelId,
 574            UserId,
 575        }
 576
 577        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 578        {
 579            let mut rows = room_participant::Entity::find()
 580                .inner_join(room::Entity)
 581                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 582                .select_only()
 583                .column(room::Column::ChannelId)
 584                .column(room_participant::Column::UserId)
 585                .into_values::<_, QueryUserIdsAndChannelIds>()
 586                .stream(tx)
 587                .await?;
 588            while let Some(row) = rows.next().await {
 589                let row: (ChannelId, UserId) = row?;
 590                channel_participants.entry(row.0).or_default().push(row.1)
 591            }
 592        }
 593
 594        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 595
 596        let mut channel_ids_by_buffer_id = HashMap::default();
 597        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 598        let mut rows = buffer::Entity::find()
 599            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 600            .stream(tx)
 601            .await?;
 602        while let Some(row) = rows.next().await {
 603            let row = row?;
 604            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 605            latest_buffer_versions.push(ChannelBufferVersion {
 606                channel_id: row.channel_id.0 as u64,
 607                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 608                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 609                    .latest_operation_lamport_timestamp
 610                    .zip(row.latest_operation_replica_id)
 611                {
 612                    vec![VectorClockEntry {
 613                        timestamp: latest_lamport_timestamp as u32,
 614                        replica_id: latest_replica_id as u32,
 615                    }]
 616                } else {
 617                    vec![]
 618                },
 619            });
 620        }
 621        drop(rows);
 622
 623        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 624
 625        let observed_buffer_versions = self
 626            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 627            .await?;
 628
 629        let observed_channel_messages = self
 630            .observed_channel_messages(&channel_ids, user_id, tx)
 631            .await?;
 632
 633        Ok(ChannelsForUser {
 634            channel_memberships,
 635            channels,
 636            invited_channels,
 637            channel_participants,
 638            latest_buffer_versions,
 639            latest_channel_messages,
 640            observed_buffer_versions,
 641            observed_channel_messages,
 642        })
 643    }
 644
 645    /// Sets the role for the specified channel member.
 646    pub async fn set_channel_member_role(
 647        &self,
 648        channel_id: ChannelId,
 649        admin_id: UserId,
 650        for_user: UserId,
 651        role: ChannelRole,
 652    ) -> Result<SetMemberRoleResult> {
 653        self.transaction(|tx| async move {
 654            let channel = self.get_channel_internal(channel_id, &tx).await?;
 655            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 656                .await?;
 657
 658            let membership = channel_member::Entity::find()
 659                .filter(
 660                    channel_member::Column::ChannelId
 661                        .eq(channel_id)
 662                        .and(channel_member::Column::UserId.eq(for_user)),
 663                )
 664                .one(&*tx)
 665                .await?
 666                .context("no such member")?;
 667
 668            let mut update = membership.into_active_model();
 669            update.role = ActiveValue::Set(role);
 670            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 671
 672            if updated.accepted {
 673                Ok(SetMemberRoleResult::MembershipUpdated(
 674                    self.calculate_membership_updated(&channel, for_user, &tx)
 675                        .await?,
 676                ))
 677            } else {
 678                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 679                    channel,
 680                )))
 681            }
 682        })
 683        .await
 684    }
 685
 686    /// Returns the details for the specified channel member.
 687    pub async fn get_channel_participant_details(
 688        &self,
 689        channel_id: ChannelId,
 690        filter: &str,
 691        limit: u64,
 692        user_id: UserId,
 693    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
 694        let members = self
 695            .transaction(move |tx| async move {
 696                let channel = self.get_channel_internal(channel_id, &tx).await?;
 697                self.check_user_is_channel_participant(&channel, user_id, &tx)
 698                    .await?;
 699                let mut query = channel_member::Entity::find()
 700                    .find_also_related(user::Entity)
 701                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
 702
 703                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
 704                    query = query.filter(Expr::cust_with_values(
 705                        "UPPER(github_login) LIKE ?",
 706                        [Self::fuzzy_like_string(&filter.to_uppercase())],
 707                    ))
 708                } else {
 709                    query = query.filter(Expr::cust_with_values(
 710                        "github_login ILIKE $1",
 711                        [Self::fuzzy_like_string(filter)],
 712                    ))
 713                }
 714                let members = query.order_by(
 715                        Expr::cust(
 716                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
 717                        ),
 718                        sea_orm::Order::Asc,
 719                    )
 720                    .limit(limit)
 721                    .all(&*tx)
 722                    .await?;
 723
 724                Ok(members)
 725            })
 726            .await?;
 727
 728        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
 729
 730        let members = members
 731            .into_iter()
 732            .map(|(member, user)| {
 733                if let Some(user) = user {
 734                    users.push(proto::User {
 735                        id: user.id.to_proto(),
 736                        avatar_url: format!(
 737                            "https://github.com/{}.png?size=128",
 738                            user.github_login
 739                        ),
 740                        github_login: user.github_login,
 741                        name: user.name,
 742                        email: user.email_address,
 743                    })
 744                }
 745                proto::ChannelMember {
 746                    role: member.role.into(),
 747                    user_id: member.user_id.to_proto(),
 748                    kind: if member.accepted {
 749                        Kind::Member
 750                    } else {
 751                        Kind::Invitee
 752                    }
 753                    .into(),
 754                }
 755            })
 756            .collect();
 757
 758        Ok((members, users))
 759    }
 760
 761    /// Returns whether the given user is an admin in the specified channel.
 762    pub async fn check_user_is_channel_admin(
 763        &self,
 764        channel: &channel::Model,
 765        user_id: UserId,
 766        tx: &DatabaseTransaction,
 767    ) -> Result<ChannelRole> {
 768        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 769        match role {
 770            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 771            Some(ChannelRole::Member)
 772            | Some(ChannelRole::Talker)
 773            | Some(ChannelRole::Banned)
 774            | Some(ChannelRole::Guest)
 775            | None => Err(anyhow!(
 776                "user is not a channel admin or channel does not exist"
 777            ))?,
 778        }
 779    }
 780
 781    /// Returns whether the given user is a member of the specified channel.
 782    pub async fn check_user_is_channel_member(
 783        &self,
 784        channel: &channel::Model,
 785        user_id: UserId,
 786        tx: &DatabaseTransaction,
 787    ) -> Result<ChannelRole> {
 788        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 789        match channel_role {
 790            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 791            Some(ChannelRole::Banned)
 792            | Some(ChannelRole::Guest)
 793            | Some(ChannelRole::Talker)
 794            | None => Err(anyhow!(
 795                "user is not a channel member or channel does not exist"
 796            ))?,
 797        }
 798    }
 799
 800    /// Returns whether the given user is a participant in the specified channel.
 801    pub async fn check_user_is_channel_participant(
 802        &self,
 803        channel: &channel::Model,
 804        user_id: UserId,
 805        tx: &DatabaseTransaction,
 806    ) -> Result<ChannelRole> {
 807        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 808        match role {
 809            Some(ChannelRole::Admin)
 810            | Some(ChannelRole::Member)
 811            | Some(ChannelRole::Guest)
 812            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 813            Some(ChannelRole::Banned) | None => Err(anyhow!(
 814                "user is not a channel participant or channel does not exist"
 815            ))?,
 816        }
 817    }
 818
 819    /// Returns a user's pending invite for the given channel, if one exists.
 820    pub async fn pending_invite_for_channel(
 821        &self,
 822        channel: &channel::Model,
 823        user_id: UserId,
 824        tx: &DatabaseTransaction,
 825    ) -> Result<Option<channel_member::Model>> {
 826        let row = channel_member::Entity::find()
 827            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 828            .filter(channel_member::Column::UserId.eq(user_id))
 829            .filter(channel_member::Column::Accepted.eq(false))
 830            .one(tx)
 831            .await?;
 832
 833        Ok(row)
 834    }
 835
 836    /// Returns the role for a user in the given channel.
 837    pub async fn channel_role_for_user(
 838        &self,
 839        channel: &channel::Model,
 840        user_id: UserId,
 841        tx: &DatabaseTransaction,
 842    ) -> Result<Option<ChannelRole>> {
 843        let membership = channel_member::Entity::find()
 844            .filter(
 845                channel_member::Column::ChannelId
 846                    .eq(channel.root_id())
 847                    .and(channel_member::Column::UserId.eq(user_id))
 848                    .and(channel_member::Column::Accepted.eq(true)),
 849            )
 850            .one(tx)
 851            .await?;
 852
 853        let Some(membership) = membership else {
 854            return Ok(None);
 855        };
 856
 857        if !membership.role.can_see_channel(channel.visibility) {
 858            return Ok(None);
 859        }
 860
 861        Ok(Some(membership.role))
 862    }
 863
 864    // Get the descendants of the given set if channels, ordered by their
 865    // path.
 866    pub(crate) async fn get_channel_descendants_excluding_self(
 867        &self,
 868        channels: impl IntoIterator<Item = &channel::Model>,
 869        tx: &DatabaseTransaction,
 870    ) -> Result<Vec<channel::Model>> {
 871        let mut filter = Condition::any();
 872        for channel in channels.into_iter() {
 873            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 874        }
 875
 876        if filter.is_empty() {
 877            return Ok(vec![]);
 878        }
 879
 880        Ok(channel::Entity::find()
 881            .filter(filter)
 882            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 883            .all(tx)
 884            .await?)
 885    }
 886
 887    /// Returns the channel with the given ID.
 888    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 889        self.transaction(|tx| async move {
 890            let channel = self.get_channel_internal(channel_id, &tx).await?;
 891            self.check_user_is_channel_participant(&channel, user_id, &tx)
 892                .await?;
 893
 894            Ok(Channel::from_model(channel))
 895        })
 896        .await
 897    }
 898
 899    pub(crate) async fn get_channel_internal(
 900        &self,
 901        channel_id: ChannelId,
 902        tx: &DatabaseTransaction,
 903    ) -> Result<channel::Model> {
 904        Ok(channel::Entity::find_by_id(channel_id)
 905            .one(tx)
 906            .await?
 907            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 908    }
 909
 910    pub(crate) async fn get_or_create_channel_room(
 911        &self,
 912        channel_id: ChannelId,
 913        livekit_room: &str,
 914        tx: &DatabaseTransaction,
 915    ) -> Result<RoomId> {
 916        let room = room::Entity::find()
 917            .filter(room::Column::ChannelId.eq(channel_id))
 918            .one(tx)
 919            .await?;
 920
 921        let room_id = if let Some(room) = room {
 922            room.id
 923        } else {
 924            let result = room::Entity::insert(room::ActiveModel {
 925                channel_id: ActiveValue::Set(Some(channel_id)),
 926                live_kit_room: ActiveValue::Set(livekit_room.to_string()),
 927                ..Default::default()
 928            })
 929            .exec(tx)
 930            .await?;
 931
 932            result.last_insert_id
 933        };
 934
 935        Ok(room_id)
 936    }
 937
 938    /// Move a channel from one parent to another
 939    pub async fn move_channel(
 940        &self,
 941        channel_id: ChannelId,
 942        new_parent_id: ChannelId,
 943        admin_id: UserId,
 944    ) -> Result<(ChannelId, Vec<Channel>)> {
 945        self.transaction(|tx| async move {
 946            let channel = self.get_channel_internal(channel_id, &tx).await?;
 947            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 948                .await?;
 949            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 950
 951            if new_parent.root_id() != channel.root_id() {
 952                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 953            }
 954
 955            if new_parent
 956                .ancestors_including_self()
 957                .any(|id| id == channel.id)
 958            {
 959                Err(anyhow!(ErrorCode::CircularNesting))?;
 960            }
 961
 962            if channel.visibility == ChannelVisibility::Public
 963                && new_parent.visibility != ChannelVisibility::Public
 964            {
 965                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 966            }
 967
 968            let root_id = channel.root_id();
 969            let new_parent_path = new_parent.path();
 970            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 971            let new_path = format!("{}{}/", &new_parent_path, channel.id);
 972            let new_order = max_order(&new_parent_path, &tx).await? + 1;
 973
 974            let mut model = channel.into_active_model();
 975            model.parent_path = ActiveValue::Set(new_parent.path());
 976            model.channel_order = ActiveValue::Set(new_order);
 977            let channel = model.update(&*tx).await?;
 978
 979            let descendent_ids =
 980                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 981                    self.pool.get_database_backend(),
 982                    "
 983                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 984                    WHERE parent_path LIKE $3 || '%'
 985                    RETURNING id
 986                ",
 987                    [old_path.clone().into(), new_path.into(), old_path.into()],
 988                ))
 989                .all(&*tx)
 990                .await?;
 991
 992            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 993
 994            let channels = channel::Entity::find()
 995                .filter(channel::Column::Id.is_in(all_moved_ids))
 996                .all(&*tx)
 997                .await?
 998                .into_iter()
 999                .map(Channel::from_model)
1000                .collect::<Vec<_>>();
1001
1002            Ok((root_id, channels))
1003        })
1004        .await
1005    }
1006
1007    pub async fn reorder_channel(
1008        &self,
1009        channel_id: ChannelId,
1010        direction: proto::reorder_channel::Direction,
1011        user_id: UserId,
1012    ) -> Result<Vec<Channel>> {
1013        self.transaction(|tx| async move {
1014            let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1015
1016            if channel.is_root() {
1017                log::info!("Skipping reorder of root channel {}", channel.id,);
1018                return Ok(vec![]);
1019            }
1020
1021            log::info!(
1022                "Reordering channel {} (parent_path: '{}', order: {})",
1023                channel.id,
1024                channel.parent_path,
1025                channel.channel_order
1026            );
1027
1028            // Check if user is admin of the channel
1029            self.check_user_is_channel_admin(&channel, user_id, &tx)
1030                .await?;
1031
1032            // Find the sibling channel to swap with
1033            let sibling_channel = match direction {
1034                proto::reorder_channel::Direction::Up => {
1035                    log::info!(
1036                        "Looking for sibling with parent_path='{}' and order < {}",
1037                        channel.parent_path,
1038                        channel.channel_order
1039                    );
1040                    // Find channel with highest order less than current
1041                    channel::Entity::find()
1042                        .filter(
1043                            channel::Column::ParentPath
1044                                .eq(&channel.parent_path)
1045                                .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1046                        )
1047                        .order_by_desc(channel::Column::ChannelOrder)
1048                        .one(&*tx)
1049                        .await?
1050                }
1051                proto::reorder_channel::Direction::Down => {
1052                    log::info!(
1053                        "Looking for sibling with parent_path='{}' and order > {}",
1054                        channel.parent_path,
1055                        channel.channel_order
1056                    );
1057                    // Find channel with lowest order greater than current
1058                    channel::Entity::find()
1059                        .filter(
1060                            channel::Column::ParentPath
1061                                .eq(&channel.parent_path)
1062                                .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1063                        )
1064                        .order_by_asc(channel::Column::ChannelOrder)
1065                        .one(&*tx)
1066                        .await?
1067                }
1068            };
1069
1070            let mut sibling_channel = match sibling_channel {
1071                Some(sibling) => {
1072                    log::info!(
1073                        "Found sibling {} (parent_path: '{}', order: {})",
1074                        sibling.id,
1075                        sibling.parent_path,
1076                        sibling.channel_order
1077                    );
1078                    sibling
1079                }
1080                None => {
1081                    log::warn!("No sibling found to swap with");
1082                    // No sibling to swap with
1083                    return Ok(vec![]);
1084                }
1085            };
1086
1087            let current_order = channel.channel_order;
1088            let sibling_order = sibling_channel.channel_order;
1089
1090            channel::ActiveModel {
1091                id: ActiveValue::Unchanged(sibling_channel.id),
1092                channel_order: ActiveValue::Set(current_order),
1093                ..Default::default()
1094            }
1095            .update(&*tx)
1096            .await?;
1097            sibling_channel.channel_order = current_order;
1098
1099            channel::ActiveModel {
1100                id: ActiveValue::Unchanged(channel.id),
1101                channel_order: ActiveValue::Set(sibling_order),
1102                ..Default::default()
1103            }
1104            .update(&*tx)
1105            .await?;
1106            channel.channel_order = sibling_order;
1107
1108            log::info!(
1109                "Reorder complete. Swapped channels {} and {}",
1110                channel.id,
1111                sibling_channel.id
1112            );
1113
1114            let swapped_channels = vec![
1115                Channel::from_model(channel),
1116                Channel::from_model(sibling_channel),
1117            ];
1118
1119            Ok(swapped_channels)
1120        })
1121        .await
1122    }
1123}
1124
1125async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1126    let max_order = channel::Entity::find()
1127        .filter(channel::Column::ParentPath.eq(parent_path))
1128        .select_only()
1129        .column_as(channel::Column::ChannelOrder.max(), "max_order")
1130        .into_tuple::<Option<i32>>()
1131        .one(&**tx)
1132        .await?
1133        .flatten()
1134        .unwrap_or(0);
1135
1136    Ok(max_order)
1137}
1138
1139#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1140enum QueryIds {
1141    Id,
1142}
1143
1144#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1145enum QueryUserIds {
1146    UserId,
1147}