channels.rs

   1use super::*;
   2use anyhow::Context as _;
   3use rpc::{
   4    ErrorCode, ErrorCodeExt,
   5    proto::{ChannelBufferVersion, VectorClockEntry, channel_member::Kind},
   6};
   7use sea_orm::{ActiveValue, DbBackend, TryGetableMany};
   8
   9impl Database {
  10    #[cfg(feature = "test-support")]
  11    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  12        self.transaction(move |tx| async move {
  13            let mut channels = Vec::new();
  14            let mut rows = channel::Entity::find().stream(&*tx).await?;
  15            while let Some(row) = rows.next().await {
  16                let row = row?;
  17                channels.push((row.id, row.name));
  18            }
  19            Ok(channels)
  20        })
  21        .await
  22    }
  23
  24    #[cfg(feature = "test-support")]
  25    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  26        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  27    }
  28
  29    #[cfg(feature = "test-support")]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .0
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let parent_path = parent
  63                .as_ref()
  64                .map_or(String::new(), |parent| parent.path());
  65
  66            // Find the maximum channel_order among siblings to set the new channel at the end
  67            let max_order = if parent_path.is_empty() {
  68                0
  69            } else {
  70                max_order(&parent_path, &tx).await?
  71            };
  72
  73            log::info!(
  74                "Creating channel '{}' with parent_path='{}', max_order={}, new_order={}",
  75                name,
  76                parent_path,
  77                max_order,
  78                max_order + 1
  79            );
  80
  81            let channel = channel::ActiveModel {
  82                id: ActiveValue::NotSet,
  83                name: ActiveValue::Set(name.to_string()),
  84                visibility: ActiveValue::Set(ChannelVisibility::Members),
  85                parent_path: ActiveValue::Set(parent_path),
  86                requires_zed_cla: ActiveValue::NotSet,
  87                channel_order: ActiveValue::Set(max_order + 1),
  88            }
  89            .insert(&*tx)
  90            .await?;
  91
  92            if parent.is_none() {
  93                membership = Some(
  94                    channel_member::ActiveModel {
  95                        id: ActiveValue::NotSet,
  96                        channel_id: ActiveValue::Set(channel.id),
  97                        user_id: ActiveValue::Set(admin_id),
  98                        accepted: ActiveValue::Set(true),
  99                        role: ActiveValue::Set(ChannelRole::Admin),
 100                    }
 101                    .insert(&*tx)
 102                    .await?,
 103                );
 104            }
 105
 106            Ok((channel, membership))
 107        })
 108        .await
 109    }
 110
 111    /// Adds a user to the specified channel.
 112    pub async fn join_channel(
 113        &self,
 114        channel_id: ChannelId,
 115        user_id: UserId,
 116        connection: ConnectionId,
 117    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 118        self.transaction(move |tx| async move {
 119            let channel = self.get_channel_internal(channel_id, &tx).await?;
 120            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 121
 122            let mut accept_invite_result = None;
 123
 124            if role.is_none() {
 125                if let Some(invitation) = self
 126                    .pending_invite_for_channel(&channel, user_id, &tx)
 127                    .await?
 128                {
 129                    // note, this may be a parent channel
 130                    role = Some(invitation.role);
 131                    channel_member::Entity::update(channel_member::ActiveModel {
 132                        accepted: ActiveValue::Set(true),
 133                        ..invitation.into_active_model()
 134                    })
 135                    .exec(&*tx)
 136                    .await?;
 137
 138                    accept_invite_result = Some(
 139                        self.calculate_membership_updated(&channel, user_id, &tx)
 140                            .await?,
 141                    );
 142
 143                    debug_assert!(
 144                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 145                    );
 146                } else if channel.visibility == ChannelVisibility::Public {
 147                    role = Some(ChannelRole::Guest);
 148                    channel_member::Entity::insert(channel_member::ActiveModel {
 149                        id: ActiveValue::NotSet,
 150                        channel_id: ActiveValue::Set(channel.root_id()),
 151                        user_id: ActiveValue::Set(user_id),
 152                        accepted: ActiveValue::Set(true),
 153                        role: ActiveValue::Set(ChannelRole::Guest),
 154                    })
 155                    .exec(&*tx)
 156                    .await?;
 157
 158                    accept_invite_result = Some(
 159                        self.calculate_membership_updated(&channel, user_id, &tx)
 160                            .await?,
 161                    );
 162
 163                    debug_assert!(
 164                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 165                    );
 166                }
 167            }
 168
 169            if role.is_none() || role == Some(ChannelRole::Banned) {
 170                Err(ErrorCode::Forbidden.anyhow())?
 171            }
 172            let role = role.unwrap();
 173
 174            let livekit_room = format!("channel-{}", nanoid::nanoid!(30));
 175            let room_id = self
 176                .get_or_create_channel_room(channel_id, &livekit_room, &tx)
 177                .await?;
 178
 179            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 180                .await
 181                .map(|jr| (jr, accept_invite_result, role))
 182        })
 183        .await
 184    }
 185
 186    /// Sets the visibility of the given channel.
 187    pub async fn set_channel_visibility(
 188        &self,
 189        channel_id: ChannelId,
 190        visibility: ChannelVisibility,
 191        admin_id: UserId,
 192    ) -> Result<channel::Model> {
 193        self.transaction(move |tx| async move {
 194            let channel = self.get_channel_internal(channel_id, &tx).await?;
 195            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 196                .await?;
 197
 198            if visibility == ChannelVisibility::Public {
 199                if let Some(parent_id) = channel.parent_id() {
 200                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 201
 202                    if parent.visibility != ChannelVisibility::Public {
 203                        Err(ErrorCode::BadPublicNesting
 204                            .with_tag("direction", "parent")
 205                            .anyhow())?;
 206                    }
 207                }
 208            } else if visibility == ChannelVisibility::Members
 209                && self
 210                    .get_channel_descendants_excluding_self([&channel], &tx)
 211                    .await?
 212                    .into_iter()
 213                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 214            {
 215                Err(ErrorCode::BadPublicNesting
 216                    .with_tag("direction", "children")
 217                    .anyhow())?;
 218            }
 219
 220            let mut model = channel.into_active_model();
 221            model.visibility = ActiveValue::Set(visibility);
 222            let channel = model.update(&*tx).await?;
 223
 224            Ok(channel)
 225        })
 226        .await
 227    }
 228
 229    #[cfg(feature = "test-support")]
 230    pub async fn set_channel_requires_zed_cla(
 231        &self,
 232        channel_id: ChannelId,
 233        requires_zed_cla: bool,
 234    ) -> Result<()> {
 235        self.transaction(move |tx| async move {
 236            let channel = self.get_channel_internal(channel_id, &tx).await?;
 237            let mut model = channel.into_active_model();
 238            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 239            model.update(&*tx).await?;
 240            Ok(())
 241        })
 242        .await
 243    }
 244
 245    /// Deletes the channel with the specified ID.
 246    pub async fn delete_channel(
 247        &self,
 248        channel_id: ChannelId,
 249        user_id: UserId,
 250    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 251        self.transaction(move |tx| async move {
 252            let channel = self.get_channel_internal(channel_id, &tx).await?;
 253            self.check_user_is_channel_admin(&channel, user_id, &tx)
 254                .await?;
 255
 256            let channels_to_remove = self
 257                .get_channel_descendants_excluding_self([&channel], &tx)
 258                .await?
 259                .into_iter()
 260                .map(|channel| channel.id)
 261                .chain(Some(channel_id))
 262                .collect::<Vec<_>>();
 263
 264            let channel_has_active_participants = room_participant::Entity::find()
 265                .inner_join(room::Entity)
 266                .filter(room::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
 267                .count(&*tx)
 268                .await?
 269                > 0;
 270
 271            if channel_has_active_participants {
 272                Err(anyhow!("can't delete channel while a call is in progress"))?;
 273            }
 274
 275            channel::Entity::delete_many()
 276                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 277                .exec(&*tx)
 278                .await?;
 279
 280            Ok((channel.root_id(), channels_to_remove))
 281        })
 282        .await
 283    }
 284
 285    /// Invites a user to a channel as a member.
 286    pub async fn invite_channel_member(
 287        &self,
 288        channel_id: ChannelId,
 289        invitee_id: UserId,
 290        inviter_id: UserId,
 291        role: ChannelRole,
 292    ) -> Result<InviteMemberResult> {
 293        self.transaction(move |tx| async move {
 294            let channel = self.get_channel_internal(channel_id, &tx).await?;
 295            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 296                .await?;
 297            if !channel.is_root() {
 298                Err(ErrorCode::NotARootChannel.anyhow())?
 299            }
 300
 301            channel_member::ActiveModel {
 302                id: ActiveValue::NotSet,
 303                channel_id: ActiveValue::Set(channel_id),
 304                user_id: ActiveValue::Set(invitee_id),
 305                accepted: ActiveValue::Set(false),
 306                role: ActiveValue::Set(role),
 307            }
 308            .insert(&*tx)
 309            .await?;
 310
 311            let channel = Channel::from_model(channel);
 312
 313            let notifications = self
 314                .create_notification(
 315                    invitee_id,
 316                    rpc::Notification::ChannelInvitation {
 317                        channel_id: channel_id.to_proto(),
 318                        channel_name: channel.name.clone(),
 319                        inviter_id: inviter_id.to_proto(),
 320                    },
 321                    true,
 322                    &tx,
 323                )
 324                .await?
 325                .into_iter()
 326                .collect();
 327
 328            Ok(InviteMemberResult {
 329                channel,
 330                notifications,
 331            })
 332        })
 333        .await
 334    }
 335
 336    fn sanitize_channel_name(name: &str) -> Result<&str> {
 337        let new_name = name.trim().trim_start_matches('#');
 338        if new_name.is_empty() {
 339            Err(anyhow!("channel name can't be blank"))?;
 340        }
 341        Ok(new_name)
 342    }
 343
 344    /// Renames the specified channel.
 345    pub async fn rename_channel(
 346        &self,
 347        channel_id: ChannelId,
 348        admin_id: UserId,
 349        new_name: &str,
 350    ) -> Result<channel::Model> {
 351        self.transaction(move |tx| async move {
 352            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 353
 354            let channel = self.get_channel_internal(channel_id, &tx).await?;
 355            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 356                .await?;
 357
 358            let mut model = channel.into_active_model();
 359            model.name = ActiveValue::Set(new_name.clone());
 360            let channel = model.update(&*tx).await?;
 361
 362            Ok(channel)
 363        })
 364        .await
 365    }
 366
 367    /// accept or decline an invite to join a channel
 368    pub async fn respond_to_channel_invite(
 369        &self,
 370        channel_id: ChannelId,
 371        user_id: UserId,
 372        accept: bool,
 373    ) -> Result<RespondToChannelInvite> {
 374        self.transaction(move |tx| async move {
 375            let channel = self.get_channel_internal(channel_id, &tx).await?;
 376
 377            let membership_update = if accept {
 378                let rows_affected = channel_member::Entity::update_many()
 379                    .set(channel_member::ActiveModel {
 380                        accepted: ActiveValue::Set(accept),
 381                        ..Default::default()
 382                    })
 383                    .filter(
 384                        channel_member::Column::ChannelId
 385                            .eq(channel_id)
 386                            .and(channel_member::Column::UserId.eq(user_id))
 387                            .and(channel_member::Column::Accepted.eq(false)),
 388                    )
 389                    .exec(&*tx)
 390                    .await?
 391                    .rows_affected;
 392
 393                if rows_affected == 0 {
 394                    Err(anyhow!("no such invitation"))?;
 395                }
 396
 397                Some(
 398                    self.calculate_membership_updated(&channel, user_id, &tx)
 399                        .await?,
 400                )
 401            } else {
 402                let rows_affected = channel_member::Entity::delete_many()
 403                    .filter(
 404                        channel_member::Column::ChannelId
 405                            .eq(channel_id)
 406                            .and(channel_member::Column::UserId.eq(user_id))
 407                            .and(channel_member::Column::Accepted.eq(false)),
 408                    )
 409                    .exec(&*tx)
 410                    .await?
 411                    .rows_affected;
 412                if rows_affected == 0 {
 413                    Err(anyhow!("no such invitation"))?;
 414                }
 415
 416                None
 417            };
 418
 419            Ok(RespondToChannelInvite {
 420                membership_update,
 421                notifications: self
 422                    .mark_notification_as_read_with_response(
 423                        user_id,
 424                        &rpc::Notification::ChannelInvitation {
 425                            channel_id: channel_id.to_proto(),
 426                            channel_name: Default::default(),
 427                            inviter_id: Default::default(),
 428                        },
 429                        accept,
 430                        &tx,
 431                    )
 432                    .await?
 433                    .into_iter()
 434                    .collect(),
 435            })
 436        })
 437        .await
 438    }
 439
 440    async fn calculate_membership_updated(
 441        &self,
 442        channel: &channel::Model,
 443        user_id: UserId,
 444        tx: &DatabaseTransaction,
 445    ) -> Result<MembershipUpdated> {
 446        let new_channels = self
 447            .get_user_channels(user_id, Some(channel), false, tx)
 448            .await?;
 449        let removed_channels = self
 450            .get_channel_descendants_excluding_self([channel], tx)
 451            .await?
 452            .into_iter()
 453            .map(|channel| channel.id)
 454            .chain([channel.id])
 455            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 456            .collect::<Vec<_>>();
 457
 458        Ok(MembershipUpdated {
 459            channel_id: channel.id,
 460            new_channels,
 461            removed_channels,
 462        })
 463    }
 464
 465    /// Removes a channel member.
 466    pub async fn remove_channel_member(
 467        &self,
 468        channel_id: ChannelId,
 469        member_id: UserId,
 470        admin_id: UserId,
 471    ) -> Result<RemoveChannelMemberResult> {
 472        self.transaction(|tx| async move {
 473            let channel = self.get_channel_internal(channel_id, &tx).await?;
 474
 475            if member_id != admin_id {
 476                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 477                    .await?;
 478            }
 479
 480            let result = channel_member::Entity::delete_many()
 481                .filter(
 482                    channel_member::Column::ChannelId
 483                        .eq(channel_id)
 484                        .and(channel_member::Column::UserId.eq(member_id)),
 485                )
 486                .exec(&*tx)
 487                .await?;
 488
 489            if result.rows_affected == 0 {
 490                Err(anyhow!("no such member"))?;
 491            }
 492
 493            Ok(RemoveChannelMemberResult {
 494                membership_update: self
 495                    .calculate_membership_updated(&channel, member_id, &tx)
 496                    .await?,
 497                notification_id: self
 498                    .remove_notification(
 499                        member_id,
 500                        rpc::Notification::ChannelInvitation {
 501                            channel_id: channel_id.to_proto(),
 502                            channel_name: Default::default(),
 503                            inviter_id: Default::default(),
 504                        },
 505                        &tx,
 506                    )
 507                    .await?,
 508            })
 509        })
 510        .await
 511    }
 512
 513    /// Returns all channels for the user with the given ID.
 514    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 515        self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
 516            .await
 517    }
 518
 519    /// Returns all channels for the user with the given ID that are descendants
 520    /// of the specified ancestor channel.
 521    pub async fn get_user_channels(
 522        &self,
 523        user_id: UserId,
 524        ancestor_channel: Option<&channel::Model>,
 525        include_invites: bool,
 526        tx: &DatabaseTransaction,
 527    ) -> Result<ChannelsForUser> {
 528        let mut filter = channel_member::Column::UserId.eq(user_id);
 529        if !include_invites {
 530            filter = filter.and(channel_member::Column::Accepted.eq(true))
 531        }
 532        if let Some(ancestor) = ancestor_channel {
 533            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 534        }
 535
 536        let mut channels = Vec::<channel::Model>::new();
 537        let mut invited_channels = Vec::<Channel>::new();
 538        let mut channel_memberships = Vec::<channel_member::Model>::new();
 539        let mut rows = channel_member::Entity::find()
 540            .filter(filter)
 541            .inner_join(channel::Entity)
 542            .select_also(channel::Entity)
 543            .stream(tx)
 544            .await?;
 545        while let Some(row) = rows.next().await {
 546            if let (membership, Some(channel)) = row? {
 547                if membership.accepted {
 548                    channel_memberships.push(membership);
 549                    channels.push(channel);
 550                } else {
 551                    invited_channels.push(Channel::from_model(channel));
 552                }
 553            }
 554        }
 555        drop(rows);
 556
 557        let mut descendants = self
 558            .get_channel_descendants_excluding_self(channels.iter(), tx)
 559            .await?;
 560
 561        descendants.extend(channels);
 562
 563        let roles_by_channel_id = channel_memberships
 564            .iter()
 565            .map(|membership| (membership.channel_id, membership.role))
 566            .collect::<HashMap<_, _>>();
 567
 568        let channels: Vec<Channel> = descendants
 569            .into_iter()
 570            .filter_map(|channel| {
 571                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 572                if parent_role.can_see_channel(channel.visibility) {
 573                    Some(Channel::from_model(channel))
 574                } else {
 575                    None
 576                }
 577            })
 578            .collect();
 579
 580        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 581        enum QueryUserIdsAndChannelIds {
 582            ChannelId,
 583            UserId,
 584        }
 585
 586        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 587        {
 588            let mut rows = room_participant::Entity::find()
 589                .inner_join(room::Entity)
 590                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 591                .select_only()
 592                .column(room::Column::ChannelId)
 593                .column(room_participant::Column::UserId)
 594                .into_values::<_, QueryUserIdsAndChannelIds>()
 595                .stream(tx)
 596                .await?;
 597            while let Some(row) = rows.next().await {
 598                let row: (ChannelId, UserId) = row?;
 599                channel_participants.entry(row.0).or_default().push(row.1)
 600            }
 601        }
 602
 603        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 604
 605        let mut channel_ids_by_buffer_id = HashMap::default();
 606        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 607        let mut rows = buffer::Entity::find()
 608            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 609            .stream(tx)
 610            .await?;
 611        while let Some(row) = rows.next().await {
 612            let row = row?;
 613            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 614            latest_buffer_versions.push(ChannelBufferVersion {
 615                channel_id: row.channel_id.0 as u64,
 616                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 617                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 618                    .latest_operation_lamport_timestamp
 619                    .zip(row.latest_operation_replica_id)
 620                {
 621                    vec![VectorClockEntry {
 622                        timestamp: latest_lamport_timestamp as u32,
 623                        replica_id: latest_replica_id as u32,
 624                    }]
 625                } else {
 626                    vec![]
 627                },
 628            });
 629        }
 630        drop(rows);
 631
 632        let observed_buffer_versions = self
 633            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 634            .await?;
 635
 636        Ok(ChannelsForUser {
 637            channel_memberships,
 638            channels,
 639            invited_channels,
 640            channel_participants,
 641            latest_buffer_versions,
 642            observed_buffer_versions,
 643        })
 644    }
 645
 646    /// Sets the role for the specified channel member.
 647    pub async fn set_channel_member_role(
 648        &self,
 649        channel_id: ChannelId,
 650        admin_id: UserId,
 651        for_user: UserId,
 652        role: ChannelRole,
 653    ) -> Result<SetMemberRoleResult> {
 654        self.transaction(|tx| async move {
 655            let channel = self.get_channel_internal(channel_id, &tx).await?;
 656            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 657                .await?;
 658
 659            let membership = channel_member::Entity::find()
 660                .filter(
 661                    channel_member::Column::ChannelId
 662                        .eq(channel_id)
 663                        .and(channel_member::Column::UserId.eq(for_user)),
 664                )
 665                .one(&*tx)
 666                .await?
 667                .context("no such member")?;
 668
 669            let mut update = membership.into_active_model();
 670            update.role = ActiveValue::Set(role);
 671            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 672
 673            if updated.accepted {
 674                Ok(SetMemberRoleResult::MembershipUpdated(
 675                    self.calculate_membership_updated(&channel, for_user, &tx)
 676                        .await?,
 677                ))
 678            } else {
 679                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 680                    channel,
 681                )))
 682            }
 683        })
 684        .await
 685    }
 686
 687    /// Returns the details for the specified channel member.
 688    pub async fn get_channel_participant_details(
 689        &self,
 690        channel_id: ChannelId,
 691        filter: &str,
 692        limit: u64,
 693        user_id: UserId,
 694    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
 695        let members = self
 696            .transaction(move |tx| async move {
 697                let channel = self.get_channel_internal(channel_id, &tx).await?;
 698                self.check_user_is_channel_participant(&channel, user_id, &tx)
 699                    .await?;
 700                let mut query = channel_member::Entity::find()
 701                    .find_also_related(user::Entity)
 702                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
 703
 704                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
 705                    query = query.filter(Expr::cust_with_values(
 706                        "UPPER(github_login) LIKE ?",
 707                        [Self::fuzzy_like_string(&filter.to_uppercase())],
 708                    ))
 709                } else {
 710                    query = query.filter(Expr::cust_with_values(
 711                        "github_login ILIKE $1",
 712                        [Self::fuzzy_like_string(filter)],
 713                    ))
 714                }
 715                let members = query.order_by(
 716                        Expr::cust(
 717                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
 718                        ),
 719                        sea_orm::Order::Asc,
 720                    )
 721                    .limit(limit)
 722                    .all(&*tx)
 723                    .await?;
 724
 725                Ok(members)
 726            })
 727            .await?;
 728
 729        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
 730
 731        let members = members
 732            .into_iter()
 733            .map(|(member, user)| {
 734                if let Some(user) = user {
 735                    users.push(proto::User {
 736                        id: user.id.to_proto(),
 737                        avatar_url: format!(
 738                            "https://avatars.githubusercontent.com/u/{}?s=128&v=4",
 739                            user.github_user_id
 740                        ),
 741                        github_login: user.github_login,
 742                        name: user.name,
 743                    })
 744                }
 745                proto::ChannelMember {
 746                    role: member.role.into(),
 747                    user_id: member.user_id.to_proto(),
 748                    kind: if member.accepted {
 749                        Kind::Member
 750                    } else {
 751                        Kind::Invitee
 752                    }
 753                    .into(),
 754                }
 755            })
 756            .collect();
 757
 758        Ok((members, users))
 759    }
 760
 761    /// Returns whether the given user is an admin in the specified channel.
 762    pub async fn check_user_is_channel_admin(
 763        &self,
 764        channel: &channel::Model,
 765        user_id: UserId,
 766        tx: &DatabaseTransaction,
 767    ) -> Result<ChannelRole> {
 768        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 769        match role {
 770            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 771            Some(ChannelRole::Member)
 772            | Some(ChannelRole::Talker)
 773            | Some(ChannelRole::Banned)
 774            | Some(ChannelRole::Guest)
 775            | None => Err(anyhow!(
 776                "user is not a channel admin or channel does not exist"
 777            ))?,
 778        }
 779    }
 780
 781    /// Returns whether the given user is a member of the specified channel.
 782    pub async fn check_user_is_channel_member(
 783        &self,
 784        channel: &channel::Model,
 785        user_id: UserId,
 786        tx: &DatabaseTransaction,
 787    ) -> Result<ChannelRole> {
 788        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 789        match channel_role {
 790            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 791            Some(ChannelRole::Banned)
 792            | Some(ChannelRole::Guest)
 793            | Some(ChannelRole::Talker)
 794            | None => Err(anyhow!(
 795                "user is not a channel member or channel does not exist"
 796            ))?,
 797        }
 798    }
 799
 800    /// Returns whether the given user is a participant in the specified channel.
 801    pub async fn check_user_is_channel_participant(
 802        &self,
 803        channel: &channel::Model,
 804        user_id: UserId,
 805        tx: &DatabaseTransaction,
 806    ) -> Result<ChannelRole> {
 807        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 808        match role {
 809            Some(ChannelRole::Admin)
 810            | Some(ChannelRole::Member)
 811            | Some(ChannelRole::Guest)
 812            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 813            Some(ChannelRole::Banned) | None => Err(anyhow!(
 814                "user is not a channel participant or channel does not exist"
 815            ))?,
 816        }
 817    }
 818
 819    /// Returns a user's pending invite for the given channel, if one exists.
 820    pub async fn pending_invite_for_channel(
 821        &self,
 822        channel: &channel::Model,
 823        user_id: UserId,
 824        tx: &DatabaseTransaction,
 825    ) -> Result<Option<channel_member::Model>> {
 826        let row = channel_member::Entity::find()
 827            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 828            .filter(channel_member::Column::UserId.eq(user_id))
 829            .filter(channel_member::Column::Accepted.eq(false))
 830            .one(tx)
 831            .await?;
 832
 833        Ok(row)
 834    }
 835
 836    /// Returns the role for a user in the given channel.
 837    pub async fn channel_role_for_user(
 838        &self,
 839        channel: &channel::Model,
 840        user_id: UserId,
 841        tx: &DatabaseTransaction,
 842    ) -> Result<Option<ChannelRole>> {
 843        let membership = channel_member::Entity::find()
 844            .filter(
 845                channel_member::Column::ChannelId
 846                    .eq(channel.root_id())
 847                    .and(channel_member::Column::UserId.eq(user_id))
 848                    .and(channel_member::Column::Accepted.eq(true)),
 849            )
 850            .one(tx)
 851            .await?;
 852
 853        let Some(membership) = membership else {
 854            return Ok(None);
 855        };
 856
 857        if !membership.role.can_see_channel(channel.visibility) {
 858            return Ok(None);
 859        }
 860
 861        Ok(Some(membership.role))
 862    }
 863
 864    // Get the descendants of the given set if channels, ordered by their
 865    // path.
 866    pub(crate) async fn get_channel_descendants_excluding_self(
 867        &self,
 868        channels: impl IntoIterator<Item = &channel::Model>,
 869        tx: &DatabaseTransaction,
 870    ) -> Result<Vec<channel::Model>> {
 871        let mut filter = Condition::any();
 872        for channel in channels.into_iter() {
 873            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 874        }
 875
 876        if filter.is_empty() {
 877            return Ok(vec![]);
 878        }
 879
 880        Ok(channel::Entity::find()
 881            .filter(filter)
 882            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 883            .all(tx)
 884            .await?)
 885    }
 886
 887    /// Returns the channel with the given ID.
 888    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 889        self.transaction(|tx| async move {
 890            let channel = self.get_channel_internal(channel_id, &tx).await?;
 891            self.check_user_is_channel_participant(&channel, user_id, &tx)
 892                .await?;
 893
 894            Ok(Channel::from_model(channel))
 895        })
 896        .await
 897    }
 898
 899    pub async fn get_channel_internal(
 900        &self,
 901        channel_id: ChannelId,
 902        tx: &DatabaseTransaction,
 903    ) -> Result<channel::Model> {
 904        Ok(channel::Entity::find_by_id(channel_id)
 905            .one(tx)
 906            .await?
 907            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 908    }
 909
 910    pub(crate) async fn get_or_create_channel_room(
 911        &self,
 912        channel_id: ChannelId,
 913        livekit_room: &str,
 914        tx: &DatabaseTransaction,
 915    ) -> Result<RoomId> {
 916        let room = room::Entity::find()
 917            .filter(room::Column::ChannelId.eq(channel_id))
 918            .one(tx)
 919            .await?;
 920
 921        let room_id = if let Some(room) = room {
 922            room.id
 923        } else {
 924            let result = room::Entity::insert(room::ActiveModel {
 925                channel_id: ActiveValue::Set(Some(channel_id)),
 926                live_kit_room: ActiveValue::Set(livekit_room.to_string()),
 927                ..Default::default()
 928            })
 929            .exec(tx)
 930            .await?;
 931
 932            result.last_insert_id
 933        };
 934
 935        Ok(room_id)
 936    }
 937
 938    /// Move a channel from one parent to another
 939    pub async fn move_channel(
 940        &self,
 941        channel_id: ChannelId,
 942        new_parent_id: ChannelId,
 943        admin_id: UserId,
 944    ) -> Result<(ChannelId, Vec<Channel>)> {
 945        self.transaction(|tx| async move {
 946            let channel = self.get_channel_internal(channel_id, &tx).await?;
 947            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 948                .await?;
 949            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 950
 951            if new_parent.root_id() != channel.root_id() {
 952                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 953            }
 954
 955            if new_parent
 956                .ancestors_including_self()
 957                .any(|id| id == channel.id)
 958            {
 959                Err(anyhow!(ErrorCode::CircularNesting))?;
 960            }
 961
 962            if channel.visibility == ChannelVisibility::Public
 963                && new_parent.visibility != ChannelVisibility::Public
 964            {
 965                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 966            }
 967
 968            let root_id = channel.root_id();
 969            let new_parent_path = new_parent.path();
 970            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 971            let new_path = format!("{}{}/", &new_parent_path, channel.id);
 972            let new_order = max_order(&new_parent_path, &tx).await? + 1;
 973
 974            let mut model = channel.into_active_model();
 975            model.parent_path = ActiveValue::Set(new_parent.path());
 976            model.channel_order = ActiveValue::Set(new_order);
 977            let channel = model.update(&*tx).await?;
 978
 979            let descendent_ids =
 980                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 981                    self.pool.get_database_backend(),
 982                    "
 983                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 984                    WHERE parent_path LIKE $3 || '%'
 985                    RETURNING id
 986                ",
 987                    [old_path.clone().into(), new_path.into(), old_path.into()],
 988                ))
 989                .all(&*tx)
 990                .await?;
 991
 992            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 993
 994            let channels = channel::Entity::find()
 995                .filter(channel::Column::Id.is_in(all_moved_ids))
 996                .all(&*tx)
 997                .await?
 998                .into_iter()
 999                .map(Channel::from_model)
1000                .collect::<Vec<_>>();
1001
1002            Ok((root_id, channels))
1003        })
1004        .await
1005    }
1006
1007    pub async fn reorder_channel(
1008        &self,
1009        channel_id: ChannelId,
1010        direction: proto::reorder_channel::Direction,
1011        user_id: UserId,
1012    ) -> Result<Vec<Channel>> {
1013        self.transaction(|tx| async move {
1014            let mut channel = self.get_channel_internal(channel_id, &tx).await?;
1015
1016            if channel.is_root() {
1017                log::info!("Skipping reorder of root channel {}", channel.id,);
1018                return Ok(vec![]);
1019            }
1020
1021            log::info!(
1022                "Reordering channel {} (parent_path: '{}', order: {})",
1023                channel.id,
1024                channel.parent_path,
1025                channel.channel_order
1026            );
1027
1028            // Check if user is admin of the channel
1029            self.check_user_is_channel_admin(&channel, user_id, &tx)
1030                .await?;
1031
1032            // Find the sibling channel to swap with
1033            let sibling_channel = match direction {
1034                proto::reorder_channel::Direction::Up => {
1035                    log::info!(
1036                        "Looking for sibling with parent_path='{}' and order < {}",
1037                        channel.parent_path,
1038                        channel.channel_order
1039                    );
1040                    // Find channel with highest order less than current
1041                    channel::Entity::find()
1042                        .filter(
1043                            channel::Column::ParentPath
1044                                .eq(&channel.parent_path)
1045                                .and(channel::Column::ChannelOrder.lt(channel.channel_order)),
1046                        )
1047                        .order_by_desc(channel::Column::ChannelOrder)
1048                        .one(&*tx)
1049                        .await?
1050                }
1051                proto::reorder_channel::Direction::Down => {
1052                    log::info!(
1053                        "Looking for sibling with parent_path='{}' and order > {}",
1054                        channel.parent_path,
1055                        channel.channel_order
1056                    );
1057                    // Find channel with lowest order greater than current
1058                    channel::Entity::find()
1059                        .filter(
1060                            channel::Column::ParentPath
1061                                .eq(&channel.parent_path)
1062                                .and(channel::Column::ChannelOrder.gt(channel.channel_order)),
1063                        )
1064                        .order_by_asc(channel::Column::ChannelOrder)
1065                        .one(&*tx)
1066                        .await?
1067                }
1068            };
1069
1070            let mut sibling_channel = match sibling_channel {
1071                Some(sibling) => {
1072                    log::info!(
1073                        "Found sibling {} (parent_path: '{}', order: {})",
1074                        sibling.id,
1075                        sibling.parent_path,
1076                        sibling.channel_order
1077                    );
1078                    sibling
1079                }
1080                None => {
1081                    log::warn!("No sibling found to swap with");
1082                    // No sibling to swap with
1083                    return Ok(vec![]);
1084                }
1085            };
1086
1087            let current_order = channel.channel_order;
1088            let sibling_order = sibling_channel.channel_order;
1089
1090            channel::ActiveModel {
1091                id: ActiveValue::Unchanged(sibling_channel.id),
1092                channel_order: ActiveValue::Set(current_order),
1093                ..Default::default()
1094            }
1095            .update(&*tx)
1096            .await?;
1097            sibling_channel.channel_order = current_order;
1098
1099            channel::ActiveModel {
1100                id: ActiveValue::Unchanged(channel.id),
1101                channel_order: ActiveValue::Set(sibling_order),
1102                ..Default::default()
1103            }
1104            .update(&*tx)
1105            .await?;
1106            channel.channel_order = sibling_order;
1107
1108            log::info!(
1109                "Reorder complete. Swapped channels {} and {}",
1110                channel.id,
1111                sibling_channel.id
1112            );
1113
1114            let swapped_channels = vec![
1115                Channel::from_model(channel),
1116                Channel::from_model(sibling_channel),
1117            ];
1118
1119            Ok(swapped_channels)
1120        })
1121        .await
1122    }
1123}
1124
1125async fn max_order(parent_path: &str, tx: &TransactionHandle) -> Result<i32> {
1126    let max_order = channel::Entity::find()
1127        .filter(channel::Column::ParentPath.eq(parent_path))
1128        .select_only()
1129        .column_as(channel::Column::ChannelOrder.max(), "max_order")
1130        .into_tuple::<Option<i32>>()
1131        .one(&**tx)
1132        .await?
1133        .flatten()
1134        .unwrap_or(0);
1135
1136    Ok(max_order)
1137}
1138
1139#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1140enum QueryIds {
1141    Id,
1142}