channels.rs

   1use super::*;
   2use anyhow::Context as _;
   3use rpc::{
   4    ErrorCode, ErrorCodeExt,
   5    proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
   6};
   7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
   8
   9impl Database {
  10    #[cfg(test)]
  11    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  12        self.transaction(move |tx| async move {
  13            let mut channels = Vec::new();
  14            let mut rows = channel::Entity::find().stream(&*tx).await?;
  15            while let Some(row) = rows.next().await {
  16                let row = row?;
  17                channels.push((row.id, row.name));
  18            }
  19            Ok(channels)
  20        })
  21        .await
  22    }
  23
  24    #[cfg(test)]
  25    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  26        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .0
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let parent_path = parent
  63                .as_ref()
  64                .map_or(String::new(), |parent| parent.path());
  65
  66            // Find the maximum channel_order among siblings to set the new channel at the end
  67            let max_order = if parent_path.is_empty() {
  68                0
  69            } else {
  70                max_order(&parent_path, &tx).await?
  71            };
  72
  73            log::info!(
  74                "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
  75                name,
  76                parent_path,
  77                max_order,
  78                max_order + 1
  79            );
  80
  81            let channel = channel::ActiveModel {
  82                id: ActiveValue::NotSet,
  83                name: ActiveValue::Set(name.to_string()),
  84                visibility: ActiveValue::Set(ChannelVisibility::Members),
  85                parent_path: ActiveValue::Set(parent_path),
  86                requires_zed_cla: ActiveValue::NotSet,
  87                channel_order: ActiveValue::Set(max_order + 1),
  88            }
  89            .insert(&*tx)
  90            .await?;
  91
  92            if parent.is_none() {
  93                membership = Some(
  94                    channel_member::ActiveModel {
  95                        id: ActiveValue::NotSet,
  96                        channel_id: ActiveValue::Set(channel.id),
  97                        user_id: ActiveValue::Set(admin_id),
  98                        accepted: ActiveValue::Set(true),
  99                        role: ActiveValue::Set(ChannelRole::Admin),
 100                    }
 101                    .insert(&*tx)
 102                    .await?,
 103                );
 104            }
 105
 106            Ok((channel, membership))
 107        })
 108        .await
 109    }
 110
 111    /// Adds a user to the specified channel.
 112    pub async fn join_channel(
 113        &self,
 114        channel_id: ChannelId,
 115        user_id: UserId,
 116        connection: ConnectionId,
 117    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 118        self.transaction(move |tx| async move {
 119            let channel = self.get_channel_internal(channel_id, &tx).await?;
 120            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 121
 122            let mut accept_invite_result = None;
 123
 124            if role.is_none() {
 125                if let Some(invitation) = self
 126                    .pending_invite_for_channel(&channel, user_id, &tx)
 127                    .await?
 128                {
 129                    // note, this may be a parent channel
 130                    role = Some(invitation.role);
 131                    channel_member::Entity::update(channel_member::ActiveModel {
 132                        accepted: ActiveValue::Set(true),
 133                        ..invitation.into_active_model()
 134                    })
 135                    .exec(&*tx)
 136                    .await?;
 137
 138                    accept_invite_result = Some(
 139                        self.calculate_membership_updated(&channel, user_id, &tx)
 140                            .await?,
 141                    );
 142
 143                    debug_assert!(
 144                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 145                    );
 146                } else if channel.visibility == ChannelVisibility::Public {
 147                    role = Some(ChannelRole::Guest);
 148                    channel_member::Entity::insert(channel_member::ActiveModel {
 149                        id: ActiveValue::NotSet,
 150                        channel_id: ActiveValue::Set(channel.root_id()),
 151                        user_id: ActiveValue::Set(user_id),
 152                        accepted: ActiveValue::Set(true),
 153                        role: ActiveValue::Set(ChannelRole::Guest),
 154                    })
 155                    .exec(&*tx)
 156                    .await?;
 157
 158                    accept_invite_result = Some(
 159                        self.calculate_membership_updated(&channel, user_id, &tx)
 160                            .await?,
 161                    );
 162
 163                    debug_assert!(
 164                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 165                    );
 166                }
 167            }
 168
 169            if role.is_none() || role == Some(ChannelRole::Banned) {
 170                Err(ErrorCode::Forbidden.anyhow())?
 171            }
 172            let role = role.unwrap();
 173
 174            let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
 175            let room_id = self
 176                .get_or_create_channel_room(channel_id, &livekit_room, &tx)
 177                .await?;
 178
 179            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 180                .await
 181                .map(|jr| (jr, accept_invite_result, role))
 182        })
 183        .await
 184    }
 185
 186    /// Sets the visibility of the given channel.
 187    pub async fn set_channel_visibility(
 188        &self,
 189        channel_id: ChannelId,
 190        visibility: ChannelVisibility,
 191        admin_id: UserId,
 192    ) -> Result<channel::Model> {
 193        self.transaction(move |tx| async move {
 194            let channel = self.get_channel_internal(channel_id, &tx).await?;
 195            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 196                .await?;
 197
 198            if visibility == ChannelVisibility::Public {
 199                if let Some(parent_id) = channel.parent_id() {
 200                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 201
 202                    if parent.visibility != ChannelVisibility::Public {
 203                        Err(ErrorCode::BadPublicNesting
 204                            .with_tag("direction", "parent")
 205                            .anyhow())?;
 206                    }
 207                }
 208            } else if visibility == ChannelVisibility::Members
 209                && self
 210                    .get_channel_descendants_excluding_self([&channel], &tx)
 211                    .await?
 212                    .into_iter()
 213                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 214            {
 215                Err(ErrorCode::BadPublicNesting
 216                    .with_tag("direction", "children")
 217                    .anyhow())?;
 218            }
 219
 220            let mut model = channel.into_active_model();
 221            model.visibility = ActiveValue::Set(visibility);
 222            let channel = model.update(&*tx).await?;
 223
 224            Ok(channel)
 225        })
 226        .await
 227    }
 228
 229    #[cfg(test)]
 230    pub async fn set_channel_requires_zed_cla(
 231        &self,
 232        channel_id: ChannelId,
 233        requires_zed_cla: bool,
 234    ) -> Result<()> {
 235        self.transaction(move |tx| async move {
 236            let channel = self.get_channel_internal(channel_id, &tx).await?;
 237            let mut model = channel.into_active_model();
 238            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 239            model.update(&*tx).await?;
 240            Ok(())
 241        })
 242        .await
 243    }
 244
 245    /// Deletes the channel with the specified ID.
 246    pub async fn delete_channel(
 247        &self,
 248        channel_id: ChannelId,
 249        user_id: UserId,
 250    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 251        self.transaction(move |tx| async move {
 252            let channel = self.get_channel_internal(channel_id, &tx).await?;
 253            self.check_user_is_channel_admin(&channel, user_id, &tx)
 254                .await?;
 255
 256            let channels_to_remove = self
 257                .get_channel_descendants_excluding_self([&channel], &tx)
 258                .await?
 259                .into_iter()
 260                .map(|channel| channel.id)
 261                .chain(Some(channel_id))
 262                .collect::<Vec<_>>();
 263
 264            channel::Entity::delete_many()
 265                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 266                .exec(&*tx)
 267                .await?;
 268
 269            Ok((channel.root_id(), channels_to_remove))
 270        })
 271        .await
 272    }
 273
 274    /// Invites a user to a channel as a member.
 275    pub async fn invite_channel_member(
 276        &self,
 277        channel_id: ChannelId,
 278        invitee_id: UserId,
 279        inviter_id: UserId,
 280        role: ChannelRole,
 281    ) -> Result<InviteMemberResult> {
 282        self.transaction(move |tx| async move {
 283            let channel = self.get_channel_internal(channel_id, &tx).await?;
 284            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 285                .await?;
 286            if !channel.is_root() {
 287                Err(ErrorCode::NotARootChannel.anyhow())?
 288            }
 289
 290            channel_member::ActiveModel {
 291                id: ActiveValue::NotSet,
 292                channel_id: ActiveValue::Set(channel_id),
 293                user_id: ActiveValue::Set(invitee_id),
 294                accepted: ActiveValue::Set(false),
 295                role: ActiveValue::Set(role),
 296            }
 297            .insert(&*tx)
 298            .await?;
 299
 300            let channel = Channel::from_model(channel);
 301
 302            let notifications = self
 303                .create_notification(
 304                    invitee_id,
 305                    rpc::Notification::ChannelInvitation {
 306                        channel_id: channel_id.to_proto(),
 307                        channel_name: channel.name.clone(),
 308                        inviter_id: inviter_id.to_proto(),
 309                    },
 310                    true,
 311                    &tx,
 312                )
 313                .await?
 314                .into_iter()
 315                .collect();
 316
 317            Ok(InviteMemberResult {
 318                channel,
 319                notifications,
 320            })
 321        })
 322        .await
 323    }
 324
 325    fn sanitize_channel_name(name: &str) -> Result<&str> {
 326        let new_name = name.trim().trim_start_matches('#');
 327        if new_name.is_empty() {
 328            Err(anyhow!("channel name can't be blank"))?;
 329        }
 330        Ok(new_name)
 331    }
 332
 333    /// Renames the specified channel.
 334    pub async fn rename_channel(
 335        &self,
 336        channel_id: ChannelId,
 337        admin_id: UserId,
 338        new_name: &str,
 339    ) -> Result<channel::Model> {
 340        self.transaction(move |tx| async move {
 341            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 342
 343            let channel = self.get_channel_internal(channel_id, &tx).await?;
 344            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 345                .await?;
 346
 347            let mut model = channel.into_active_model();
 348            model.name = ActiveValue::Set(new_name.clone());
 349            let channel = model.update(&*tx).await?;
 350
 351            Ok(channel)
 352        })
 353        .await
 354    }
 355
 356    /// accept or decline an invite to join a channel
 357    pub async fn respond_to_channel_invite(
 358        &self,
 359        channel_id: ChannelId,
 360        user_id: UserId,
 361        accept: bool,
 362    ) -> Result<RespondToChannelInvite> {
 363        self.transaction(move |tx| async move {
 364            let channel = self.get_channel_internal(channel_id, &tx).await?;
 365
 366            let membership_update = if accept {
 367                let rows_affected = channel_member::Entity::update_many()
 368                    .set(channel_member::ActiveModel {
 369                        accepted: ActiveValue::Set(accept),
 370                        ..Default::default()
 371                    })
 372                    .filter(
 373                        channel_member::Column::ChannelId
 374                            .eq(channel_id)
 375                            .and(channel_member::Column::UserId.eq(user_id))
 376                            .and(channel_member::Column::Accepted.eq(false)),
 377                    )
 378                    .exec(&*tx)
 379                    .await?
 380                    .rows_affected;
 381
 382                if rows_affected == 0 {
 383                    Err(anyhow!("no such invitation"))?;
 384                }
 385
 386                Some(
 387                    self.calculate_membership_updated(&channel, user_id, &tx)
 388                        .await?,
 389                )
 390            } else {
 391                let rows_affected = channel_member::Entity::delete_many()
 392                    .filter(
 393                        channel_member::Column::ChannelId
 394                            .eq(channel_id)
 395                            .and(channel_member::Column::UserId.eq(user_id))
 396                            .and(channel_member::Column::Accepted.eq(false)),
 397                    )
 398                    .exec(&*tx)
 399                    .await?
 400                    .rows_affected;
 401                if rows_affected == 0 {
 402                    Err(anyhow!("no such invitation"))?;
 403                }
 404
 405                None
 406            };
 407
 408            Ok(RespondToChannelInvite {
 409                membership_update,
 410                notifications: self
 411                    .mark_notification_as_read_with_response(
 412                        user_id,
 413                        &rpc::Notification::ChannelInvitation {
 414                            channel_id: channel_id.to_proto(),
 415                            channel_name: Default::default(),
 416                            inviter_id: Default::default(),
 417                        },
 418                        accept,
 419                        &tx,
 420                    )
 421                    .await?
 422                    .into_iter()
 423                    .collect(),
 424            })
 425        })
 426        .await
 427    }
 428
 429    async fn calculate_membership_updated(
 430        &self,
 431        channel: &channel::Model,
 432        user_id: UserId,
 433        tx: &DatabaseTransaction,
 434    ) -> Result<MembershipUpdated> {
 435        let new_channels = self
 436            .get_user_channels(user_id, Some(channel), false, tx)
 437            .await?;
 438        let removed_channels = self
 439            .get_channel_descendants_excluding_self([channel], tx)
 440            .await?
 441            .into_iter()
 442            .map(|channel| channel.id)
 443            .chain([channel.id])
 444            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 445            .collect::<Vec<_>>();
 446
 447        Ok(MembershipUpdated {
 448            channel_id: channel.id,
 449            new_channels,
 450            removed_channels,
 451        })
 452    }
 453
 454    /// Removes a channel member.
 455    pub async fn remove_channel_member(
 456        &self,
 457        channel_id: ChannelId,
 458        member_id: UserId,
 459        admin_id: UserId,
 460    ) -> Result<RemoveChannelMemberResult> {
 461        self.transaction(|tx| async move {
 462            let channel = self.get_channel_internal(channel_id, &tx).await?;
 463
 464            if member_id != admin_id {
 465                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 466                    .await?;
 467            }
 468
 469            let result = channel_member::Entity::delete_many()
 470                .filter(
 471                    channel_member::Column::ChannelId
 472                        .eq(channel_id)
 473                        .and(channel_member::Column::UserId.eq(member_id)),
 474                )
 475                .exec(&*tx)
 476                .await?;
 477
 478            if result.rows_affected == 0 {
 479                Err(anyhow!("no such member"))?;
 480            }
 481
 482            Ok(RemoveChannelMemberResult {
 483                membership_update: self
 484                    .calculate_membership_updated(&channel, member_id, &tx)
 485                    .await?,
 486                notification_id: self
 487                    .remove_notification(
 488                        member_id,
 489                        rpc::Notification::ChannelInvitation {
 490                            channel_id: channel_id.to_proto(),
 491                            channel_name: Default::default(),
 492                            inviter_id: Default::default(),
 493                        },
 494                        &tx,
 495                    )
 496                    .await?,
 497            })
 498        })
 499        .await
 500    }
 501
 502    /// Returns all channels for the user with the given ID.
 503    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 504        self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
 505            .await
 506    }
 507
 508    /// Returns all channels for the user with the given ID that are descendants
 509    /// of the specified ancestor channel.
 510    pub async fn get_user_channels(
 511        &self,
 512        user_id: UserId,
 513        ancestor_channel: Option<&channel::Model>,
 514        include_invites: bool,
 515        tx: &DatabaseTransaction,
 516    ) -> Result<ChannelsForUser> {
 517        let mut filter = channel_member::Column::UserId.eq(user_id);
 518        if !include_invites {
 519            filter = filter.and(channel_member::Column::Accepted.eq(true))
 520        }
 521        if let Some(ancestor) = ancestor_channel {
 522            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 523        }
 524
 525        let mut channels = Vec::<channel::Model>::new();
 526        let mut invited_channels = Vec::<Channel>::new();
 527        let mut channel_memberships = Vec::<channel_member::Model>::new();
 528        let mut rows = channel_member::Entity::find()
 529            .filter(filter)
 530            .inner_join(channel::Entity)
 531            .select_also(channel::Entity)
 532            .stream(tx)
 533            .await?;
 534        while let Some(row) = rows.next().await {
 535            if let (membership, Some(channel)) = row? {
 536                if membership.accepted {
 537                    channel_memberships.push(membership);
 538                    channels.push(channel);
 539                } else {
 540                    invited_channels.push(Channel::from_model(channel));
 541                }
 542            }
 543        }
 544        drop(rows);
 545
 546        let mut descendants = self
 547            .get_channel_descendants_excluding_self(channels.iter(), tx)
 548            .await?;
 549
 550        descendants.extend(channels);
 551
 552        let roles_by_channel_id = channel_memberships
 553            .iter()
 554            .map(|membership| (membership.channel_id, membership.role))
 555            .collect::<HashMap<_, _>>();
 556
 557        let channels: Vec<Channel> = descendants
 558            .into_iter()
 559            .filter_map(|channel| {
 560                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 561                if parent_role.can_see_channel(channel.visibility) {
 562                    Some(Channel::from_model(channel))
 563                } else {
 564                    None
 565                }
 566            })
 567            .collect();
 568
 569        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 570        enum QueryUserIdsAndChannelIds {
 571            ChannelId,
 572            UserId,
 573        }
 574
 575        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 576        {
 577            let mut rows = room_participant::Entity::find()
 578                .inner_join(room::Entity)
 579                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 580                .select_only()
 581                .column(room::Column::ChannelId)
 582                .column(room_participant::Column::UserId)
 583                .into_values::<_, QueryUserIdsAndChannelIds>()
 584                .stream(tx)
 585                .await?;
 586            while let Some(row) = rows.next().await {
 587                let row: (ChannelId, UserId) = row?;
 588                channel_participants.entry(row.0).or_default().push(row.1)
 589            }
 590        }
 591
 592        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 593
 594        let mut channel_ids_by_buffer_id = HashMap::default();
 595        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 596        let mut rows = buffer::Entity::find()
 597            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 598            .stream(tx)
 599            .await?;
 600        while let Some(row) = rows.next().await {
 601            let row = row?;
 602            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 603            latest_buffer_versions.push(ChannelBufferVersion {
 604                channel_id: row.channel_id.0 as u64,
 605                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 606                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 607                    .latest_operation_lamport_timestamp
 608                    .zip(row.latest_operation_replica_id)
 609                {
 610                    vec![VectorClockEntry {
 611                        timestamp: latest_lamport_timestamp as u32,
 612                        replica_id: latest_replica_id as u32,
 613                    }]
 614                } else {
 615                    vec![]
 616                },
 617            });
 618        }
 619        drop(rows);
 620
 621        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 622
 623        let observed_buffer_versions = self
 624            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 625            .await?;
 626
 627        let observed_channel_messages = self
 628            .observed_channel_messages(&channel_ids, user_id, tx)
 629            .await?;
 630
 631        Ok(ChannelsForUser {
 632            channel_memberships,
 633            channels,
 634            invited_channels,
 635            channel_participants,
 636            latest_buffer_versions,
 637            latest_channel_messages,
 638            observed_buffer_versions,
 639            observed_channel_messages,
 640        })
 641    }
 642
 643    /// Sets the role for the specified channel member.
 644    pub async fn set_channel_member_role(
 645        &self,
 646        channel_id: ChannelId,
 647        admin_id: UserId,
 648        for_user: UserId,
 649        role: ChannelRole,
 650    ) -> Result<SetMemberRoleResult> {
 651        self.transaction(|tx| async move {
 652            let channel = self.get_channel_internal(channel_id, &tx).await?;
 653            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 654                .await?;
 655
 656            let membership = channel_member::Entity::find()
 657                .filter(
 658                    channel_member::Column::ChannelId
 659                        .eq(channel_id)
 660                        .and(channel_member::Column::UserId.eq(for_user)),
 661                )
 662                .one(&*tx)
 663                .await?
 664                .context("no such member")?;
 665
 666            let mut update = membership.into_active_model();
 667            update.role = ActiveValue::Set(role);
 668            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 669
 670            if updated.accepted {
 671                Ok(SetMemberRoleResult::MembershipUpdated(
 672                    self.calculate_membership_updated(&channel, for_user, &tx)
 673                        .await?,
 674                ))
 675            } else {
 676                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 677                    channel,
 678                )))
 679            }
 680        })
 681        .await
 682    }
 683
 684    /// Returns the details for the specified channel member.
 685    pub async fn get_channel_participant_details(
 686        &self,
 687        channel_id: ChannelId,
 688        filter: &str,
 689        limit: u64,
 690        user_id: UserId,
 691    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
 692        let members = self
 693            .transaction(move |tx| async move {
 694                let channel = self.get_channel_internal(channel_id, &tx).await?;
 695                self.check_user_is_channel_participant(&channel, user_id, &tx)
 696                    .await?;
 697                let mut query = channel_member::Entity::find()
 698                    .find_also_related(user::Entity)
 699                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
 700
 701                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
 702                    query = query.filter(Expr::cust_with_values(
 703                        "UPPER(github_login) LIKE ?",
 704                        [Self::fuzzy_like_string(&filter.to_uppercase())],
 705                    ))
 706                } else {
 707                    query = query.filter(Expr::cust_with_values(
 708                        "github_login ILIKE $1",
 709                        [Self::fuzzy_like_string(filter)],
 710                    ))
 711                }
 712                let members = query.order_by(
 713                        Expr::cust(
 714                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
 715                        ),
 716                        sea_orm::Order::Asc,
 717                    )
 718                    .limit(limit)
 719                    .all(&*tx)
 720                    .await?;
 721
 722                Ok(members)
 723            })
 724            .await?;
 725
 726        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
 727
 728        let members = members
 729            .into_iter()
 730            .map(|(member, user)| {
 731                if let Some(user) = user {
 732                    users.push(proto::User {
 733                        id: user.id.to_proto(),
 734                        avatar_url: format!(
 735                            "https://avatars.githubusercontent.com/u/{}?s=128&v=4",
 736                            user.github_user_id
 737                        ),
 738                        github_login: user.github_login,
 739                        name: user.name,
 740                    })
 741                }
 742                proto::ChannelMember {
 743                    role: member.role.into(),
 744                    user_id: member.user_id.to_proto(),
 745                    kind: if member.accepted {
 746                        Kind::Member
 747                    } else {
 748                        Kind::Invitee
 749                    }
 750                    .into(),
 751                }
 752            })
 753            .collect();
 754
 755        Ok((members, users))
 756    }
 757
 758    /// Returns whether the given user is an admin in the specified channel.
 759    pub async fn check_user_is_channel_admin(
 760        &self,
 761        channel: &channel::Model,
 762        user_id: UserId,
 763        tx: &DatabaseTransaction,
 764    ) -> Result<ChannelRole> {
 765        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 766        match role {
 767            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 768            Some(ChannelRole::Member)
 769            | Some(ChannelRole::Talker)
 770            | Some(ChannelRole::Banned)
 771            | Some(ChannelRole::Guest)
 772            | None => Err(anyhow!(
 773                "user is not a channel admin or channel does not exist"
 774            ))?,
 775        }
 776    }
 777
 778    /// Returns whether the given user is a member of the specified channel.
 779    pub async fn check_user_is_channel_member(
 780        &self,
 781        channel: &channel::Model,
 782        user_id: UserId,
 783        tx: &DatabaseTransaction,
 784    ) -> Result<ChannelRole> {
 785        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 786        match channel_role {
 787            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 788            Some(ChannelRole::Banned)
 789            | Some(ChannelRole::Guest)
 790            | Some(ChannelRole::Talker)
 791            | None => Err(anyhow!(
 792                "user is not a channel member or channel does not exist"
 793            ))?,
 794        }
 795    }
 796
 797    /// Returns whether the given user is a participant in the specified channel.
 798    pub async fn check_user_is_channel_participant(
 799        &self,
 800        channel: &channel::Model,
 801        user_id: UserId,
 802        tx: &DatabaseTransaction,
 803    ) -> Result<ChannelRole> {
 804        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 805        match role {
 806            Some(ChannelRole::Admin)
 807            | Some(ChannelRole::Member)
 808            | Some(ChannelRole::Guest)
 809            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 810            Some(ChannelRole::Banned) | None => Err(anyhow!(
 811                "user is not a channel participant or channel does not exist"
 812            ))?,
 813        }
 814    }
 815
 816    /// Returns a user's pending invite for the given channel, if one exists.
 817    pub async fn pending_invite_for_channel(
 818        &self,
 819        channel: &channel::Model,
 820        user_id: UserId,
 821        tx: &DatabaseTransaction,
 822    ) -> Result<Option<channel_member::Model>> {
 823        let row = channel_member::Entity::find()
 824            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 825            .filter(channel_member::Column::UserId.eq(user_id))
 826            .filter(channel_member::Column::Accepted.eq(false))
 827            .one(tx)
 828            .await?;
 829
 830        Ok(row)
 831    }
 832
 833    /// Returns the role for a user in the given channel.
 834    pub async fn channel_role_for_user(
 835        &self,
 836        channel: &channel::Model,
 837        user_id: UserId,
 838        tx: &DatabaseTransaction,
 839    ) -> Result<Option<ChannelRole>> {
 840        let membership = channel_member::Entity::find()
 841            .filter(
 842                channel_member::Column::ChannelId
 843                    .eq(channel.root_id())
 844                    .and(channel_member::Column::UserId.eq(user_id))
 845                    .and(channel_member::Column::Accepted.eq(true)),
 846            )
 847            .one(tx)
 848            .await?;
 849
 850        let Some(membership) = membership else {
 851            return Ok(None);
 852        };
 853
 854        if !membership.role.can_see_channel(channel.visibility) {
 855            return Ok(None);
 856        }
 857
 858        Ok(Some(membership.role))
 859    }
 860
 861    // Get the descendants of the given set if channels, ordered by their
 862    // path.
 863    pub(crate) async fn get_channel_descendants_excluding_self(
 864        &self,
 865        channels: impl IntoIterator<Item = &channel::Model>,
 866        tx: &DatabaseTransaction,
 867    ) -> Result<Vec<channel::Model>> {
 868        let mut filter = Condition::any();
 869        for channel in channels.into_iter() {
 870            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 871        }
 872
 873        if filter.is_empty() {
 874            return Ok(vec![]);
 875        }
 876
 877        Ok(channel::Entity::find()
 878            .filter(filter)
 879            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 880            .all(tx)
 881            .await?)
 882    }
 883
 884    /// Returns the channel with the given ID.
 885    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 886        self.transaction(|tx| async move {
 887            let channel = self.get_channel_internal(channel_id, &tx).await?;
 888            self.check_user_is_channel_participant(&channel, user_id, &tx)
 889                .await?;
 890
 891            Ok(Channel::from_model(channel))
 892        })
 893        .await
 894    }
 895
 896    pub(crate) async fn get_channel_internal(
 897        &self,
 898        channel_id: ChannelId,
 899        tx: &DatabaseTransaction,
 900    ) -> Result<channel::Model> {
 901        Ok(channel::Entity::find_by_id(channel_id)
 902            .one(tx)
 903            .await?
 904            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 905    }
 906
 907    pub(crate) async fn get_or_create_channel_room(
 908        &self,
 909        channel_id: ChannelId,
 910        livekit_room: &str,
 911        tx: &DatabaseTransaction,
 912    ) -> Result<RoomId> {
 913        let room = room::Entity::find()
 914            .filter(room::Column::ChannelId.eq(channel_id))
 915            .one(tx)
 916            .await?;
 917
 918        let room_id = if let Some(room) = room {
 919            room.id
 920        } else {
 921            let result = room::Entity::insert(room::ActiveModel {
 922                channel_id: ActiveValue::Set(Some(channel_id)),
 923                live_kit_room: ActiveValue::Set(livekit_room.to_string()),
 924                ..Default::default()
 925            })
 926            .exec(tx)
 927            .await?;
 928
 929            result.last_insert_id
 930        };
 931
 932        Ok(room_id)
 933    }
 934
 935    /// Move a channel from one parent to another
 936    pub async fn move_channel(
 937        &self,
 938        channel_id: ChannelId,
 939        new_parent_id: ChannelId,
 940        admin_id: UserId,
 941    ) -> Result<(ChannelId, Vec<Channel>)> {
 942        self.transaction(|tx| async move {
 943            let channel = self.get_channel_internal(channel_id, &tx).await?;
 944            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 945                .await?;
 946            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 947
 948            if new_parent.root_id() != channel.root_id() {
 949                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 950            }
 951
 952            if new_parent
 953                .ancestors_including_self()
 954                .any(|id| id == channel.id)
 955            {
 956                Err(anyhow!(ErrorCode::CircularNesting))?;
 957            }
 958
 959            if channel.visibility == ChannelVisibility::Public
 960                && new_parent.visibility != ChannelVisibility::Public
 961            {
 962                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 963            }
 964
 965            let root_id = channel.root_id();
 966            let new_parent_path = new_parent.path();
 967            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 968            let new_path = format!("{}{}/", &new_parent_path, channel.id);
 969            let new_order = max_order(&new_parent_path, &tx).await? + 1;
 970
 971            let mut model = channel.into_active_model();
 972            model.parent_path = ActiveValue::Set(new_parent.path());
 973            model.channel_order = ActiveValue::Set(new_order);
 974            let channel = model.update(&*tx).await?;
 975
 976            let descendent_ids =
 977                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 978                    self.pool.get_database_backend(),
 979                    "
 980                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 981                    WHERE parent_path LIKE $3 || '%'
 982                    RETURNING id
 983                ",
 984                    [old_path.clone().into(), new_path.into(), old_path.into()],
 985                ))
 986                .all(&*tx)
 987                .await?;
 988
 989            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 990
 991            let channels = channel::Entity::find()
 992                .filter(channel::Column::Id.is_in(all_moved_ids))
 993                .all(&*tx)
 994                .await?
 995                .into_iter()
 996                .map(Channel::from_model)
 997                .collect::<Vec<_>>();
 998
 999            Ok((root_id, channels))
1000        })
1001        .await
1002    }
1003
1004    pub async fn reorder_channel(
1005        &self,
1006        channel_id: ChannelId,
1007        direction: proto::reorder_channel::Direction,
1008        user_id: UserId,
1009    ) -> Result<Vec<Channel>> {
1010        self.transaction(|tx| async move {
1011            let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1012
1013            if channel.is_root() {
1014                log::info!("Skipping reorder of root channel {}", channel.id,);
1015                return Ok(vec![]);
1016            }
1017
1018            log::info!(
1019                "Reordering channel {} (parent_path: '{}', order: {})",
1020                channel.id,
1021                channel.parent_path,
1022                channel.channel_order
1023            );
1024
1025            // Check if user is admin of the channel
1026            self.check_user_is_channel_admin(&channel, user_id, &tx)
1027                .await?;
1028
1029            // Find the sibling channel to swap with
1030            let sibling_channel = match direction {
1031                proto::reorder_channel::Direction::Up => {
1032                    log::info!(
1033                        "Looking for sibling with parent_path='{}' and order < {}",
1034                        channel.parent_path,
1035                        channel.channel_order
1036                    );
1037                    // Find channel with highest order less than current
1038                    channel::Entity::find()
1039                        .filter(
1040                            channel::Column::ParentPath
1041                                .eq(&channel.parent_path)
1042                                .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1043                        )
1044                        .order_by_desc(channel::Column::ChannelOrder)
1045                        .one(&*tx)
1046                        .await?
1047                }
1048                proto::reorder_channel::Direction::Down => {
1049                    log::info!(
1050                        "Looking for sibling with parent_path='{}' and order > {}",
1051                        channel.parent_path,
1052                        channel.channel_order
1053                    );
1054                    // Find channel with lowest order greater than current
1055                    channel::Entity::find()
1056                        .filter(
1057                            channel::Column::ParentPath
1058                                .eq(&channel.parent_path)
1059                                .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1060                        )
1061                        .order_by_asc(channel::Column::ChannelOrder)
1062                        .one(&*tx)
1063                        .await?
1064                }
1065            };
1066
1067            let mut sibling_channel = match sibling_channel {
1068                Some(sibling) => {
1069                    log::info!(
1070                        "Found sibling {} (parent_path: '{}', order: {})",
1071                        sibling.id,
1072                        sibling.parent_path,
1073                        sibling.channel_order
1074                    );
1075                    sibling
1076                }
1077                None => {
1078                    log::warn!("No sibling found to swap with");
1079                    // No sibling to swap with
1080                    return Ok(vec![]);
1081                }
1082            };
1083
1084            let current_order = channel.channel_order;
1085            let sibling_order = sibling_channel.channel_order;
1086
1087            channel::ActiveModel {
1088                id: ActiveValue::Unchanged(sibling_channel.id),
1089                channel_order: ActiveValue::Set(current_order),
1090                ..Default::default()
1091            }
1092            .update(&*tx)
1093            .await?;
1094            sibling_channel.channel_order = current_order;
1095
1096            channel::ActiveModel {
1097                id: ActiveValue::Unchanged(channel.id),
1098                channel_order: ActiveValue::Set(sibling_order),
1099                ..Default::default()
1100            }
1101            .update(&*tx)
1102            .await?;
1103            channel.channel_order = sibling_order;
1104
1105            log::info!(
1106                "Reorder complete. Swapped channels {} and {}",
1107                channel.id,
1108                sibling_channel.id
1109            );
1110
1111            let swapped_channels = vec![
1112                Channel::from_model(channel),
1113                Channel::from_model(sibling_channel),
1114            ];
1115
1116            Ok(swapped_channels)
1117        })
1118        .await
1119    }
1120}
1121
1122async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1123    let max_order = channel::Entity::find()
1124        .filter(channel::Column::ParentPath.eq(parent_path))
1125        .select_only()
1126        .column_as(channel::Column::ChannelOrder.max(), "max_order")
1127        .into_tuple::<Option<i32>>()
1128        .one(&**tx)
1129        .await?
1130        .flatten()
1131        .unwrap_or(0);
1132
1133    Ok(max_order)
1134}
1135
1136#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1137enum QueryIds {
1138    Id,
1139}
1140
1141#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1142enum QueryUserIds {
1143    UserId,
1144}