channels.rs

   1use super::*;
   2use rpc::{
   3    proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
   4    ErrorCode, ErrorCodeExt,
   5};
   6use sea_orm::{DbBackend, TryGetableMany};
   7
   8impl Database {
   9    #[cfg(test)]
  10    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  11        self.transaction(move |tx| async move {
  12            let mut channels = Vec::new();
  13            let mut rows = channel::Entity::find().stream(&*tx).await?;
  14            while let Some(row) = rows.next().await {
  15                let row = row?;
  16                channels.push((row.id, row.name));
  17            }
  18            Ok(channels)
  19        })
  20        .await
  21    }
  22
  23    #[cfg(test)]
  24    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  25        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  26    }
  27
  28    #[cfg(test)]
  29    pub async fn create_sub_channel(
  30        &self,
  31        name: &str,
  32        parent: ChannelId,
  33        creator_id: UserId,
  34    ) -> Result<ChannelId> {
  35        Ok(self
  36            .create_channel(name, Some(parent), creator_id)
  37            .await?
  38            .0
  39            .id)
  40    }
  41
  42    /// Creates a new channel.
  43    pub async fn create_channel(
  44        &self,
  45        name: &str,
  46        parent_channel_id: Option<ChannelId>,
  47        admin_id: UserId,
  48    ) -> Result<(channel::Model, Option<channel_member::Model>)> {
  49        let name = Self::sanitize_channel_name(name)?;
  50        self.transaction(move |tx| async move {
  51            let mut parent = None;
  52            let mut membership = None;
  53
  54            if let Some(parent_channel_id) = parent_channel_id {
  55                let parent_channel = self.get_channel_internal(parent_channel_id, &tx).await?;
  56                self.check_user_is_channel_admin(&parent_channel, admin_id, &tx)
  57                    .await?;
  58                parent = Some(parent_channel);
  59            }
  60
  61            let channel = channel::ActiveModel {
  62                id: ActiveValue::NotSet,
  63                name: ActiveValue::Set(name.to_string()),
  64                visibility: ActiveValue::Set(ChannelVisibility::Members),
  65                parent_path: ActiveValue::Set(
  66                    parent
  67                        .as_ref()
  68                        .map_or(String::new(), |parent| parent.path()),
  69                ),
  70                requires_zed_cla: ActiveValue::NotSet,
  71            }
  72            .insert(&*tx)
  73            .await?;
  74
  75            if parent.is_none() {
  76                membership = Some(
  77                    channel_member::ActiveModel {
  78                        id: ActiveValue::NotSet,
  79                        channel_id: ActiveValue::Set(channel.id),
  80                        user_id: ActiveValue::Set(admin_id),
  81                        accepted: ActiveValue::Set(true),
  82                        role: ActiveValue::Set(ChannelRole::Admin),
  83                    }
  84                    .insert(&*tx)
  85                    .await?,
  86                );
  87            }
  88
  89            Ok((channel, membership))
  90        })
  91        .await
  92    }
  93
  94    /// Adds a user to the specified channel.
  95    pub async fn join_channel(
  96        &self,
  97        channel_id: ChannelId,
  98        user_id: UserId,
  99        connection: ConnectionId,
 100    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 101        self.transaction(move |tx| async move {
 102            let channel = self.get_channel_internal(channel_id, &tx).await?;
 103            let mut role = self.channel_role_for_user(&channel, user_id, &tx).await?;
 104
 105            let mut accept_invite_result = None;
 106
 107            if role.is_none() {
 108                if let Some(invitation) = self
 109                    .pending_invite_for_channel(&channel, user_id, &tx)
 110                    .await?
 111                {
 112                    // note, this may be a parent channel
 113                    role = Some(invitation.role);
 114                    channel_member::Entity::update(channel_member::ActiveModel {
 115                        accepted: ActiveValue::Set(true),
 116                        ..invitation.into_active_model()
 117                    })
 118                    .exec(&*tx)
 119                    .await?;
 120
 121                    accept_invite_result = Some(
 122                        self.calculate_membership_updated(&channel, user_id, &tx)
 123                            .await?,
 124                    );
 125
 126                    debug_assert!(
 127                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 128                    );
 129                } else if channel.visibility == ChannelVisibility::Public {
 130                    role = Some(ChannelRole::Guest);
 131                    channel_member::Entity::insert(channel_member::ActiveModel {
 132                        id: ActiveValue::NotSet,
 133                        channel_id: ActiveValue::Set(channel.root_id()),
 134                        user_id: ActiveValue::Set(user_id),
 135                        accepted: ActiveValue::Set(true),
 136                        role: ActiveValue::Set(ChannelRole::Guest),
 137                    })
 138                    .exec(&*tx)
 139                    .await?;
 140
 141                    accept_invite_result = Some(
 142                        self.calculate_membership_updated(&channel, user_id, &tx)
 143                            .await?,
 144                    );
 145
 146                    debug_assert!(
 147                        self.channel_role_for_user(&channel, user_id, &tx).await? == role
 148                    );
 149                }
 150            }
 151
 152            if role.is_none() || role == Some(ChannelRole::Banned) {
 153                Err(ErrorCode::Forbidden.anyhow())?
 154            }
 155            let role = role.unwrap();
 156
 157            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 158            let room_id = self
 159                .get_or_create_channel_room(channel_id, &live_kit_room, &tx)
 160                .await?;
 161
 162            self.join_channel_room_internal(room_id, user_id, connection, role, &tx)
 163                .await
 164                .map(|jr| (jr, accept_invite_result, role))
 165        })
 166        .await
 167    }
 168
 169    /// Sets the visibility of the given channel.
 170    pub async fn set_channel_visibility(
 171        &self,
 172        channel_id: ChannelId,
 173        visibility: ChannelVisibility,
 174        admin_id: UserId,
 175    ) -> Result<channel::Model> {
 176        self.transaction(move |tx| async move {
 177            let channel = self.get_channel_internal(channel_id, &tx).await?;
 178            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 179                .await?;
 180
 181            if visibility == ChannelVisibility::Public {
 182                if let Some(parent_id) = channel.parent_id() {
 183                    let parent = self.get_channel_internal(parent_id, &tx).await?;
 184
 185                    if parent.visibility != ChannelVisibility::Public {
 186                        Err(ErrorCode::BadPublicNesting
 187                            .with_tag("direction", "parent")
 188                            .anyhow())?;
 189                    }
 190                }
 191            } else if visibility == ChannelVisibility::Members
 192                && self
 193                    .get_channel_descendants_excluding_self([&channel], &tx)
 194                    .await?
 195                    .into_iter()
 196                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 197            {
 198                Err(ErrorCode::BadPublicNesting
 199                    .with_tag("direction", "children")
 200                    .anyhow())?;
 201            }
 202
 203            let mut model = channel.into_active_model();
 204            model.visibility = ActiveValue::Set(visibility);
 205            let channel = model.update(&*tx).await?;
 206
 207            Ok(channel)
 208        })
 209        .await
 210    }
 211
 212    #[cfg(test)]
 213    pub async fn set_channel_requires_zed_cla(
 214        &self,
 215        channel_id: ChannelId,
 216        requires_zed_cla: bool,
 217    ) -> Result<()> {
 218        self.transaction(move |tx| async move {
 219            let channel = self.get_channel_internal(channel_id, &tx).await?;
 220            let mut model = channel.into_active_model();
 221            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 222            model.update(&*tx).await?;
 223            Ok(())
 224        })
 225        .await
 226    }
 227
 228    /// Deletes the channel with the specified ID.
 229    pub async fn delete_channel(
 230        &self,
 231        channel_id: ChannelId,
 232        user_id: UserId,
 233    ) -> Result<(ChannelId, Vec<ChannelId>)> {
 234        self.transaction(move |tx| async move {
 235            let channel = self.get_channel_internal(channel_id, &tx).await?;
 236            self.check_user_is_channel_admin(&channel, user_id, &tx)
 237                .await?;
 238
 239            let channels_to_remove = self
 240                .get_channel_descendants_excluding_self([&channel], &tx)
 241                .await?
 242                .into_iter()
 243                .map(|channel| channel.id)
 244                .chain(Some(channel_id))
 245                .collect::<Vec<_>>();
 246
 247            channel::Entity::delete_many()
 248                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 249                .exec(&*tx)
 250                .await?;
 251
 252            Ok((channel.root_id(), channels_to_remove))
 253        })
 254        .await
 255    }
 256
 257    /// Invites a user to a channel as a member.
 258    pub async fn invite_channel_member(
 259        &self,
 260        channel_id: ChannelId,
 261        invitee_id: UserId,
 262        inviter_id: UserId,
 263        role: ChannelRole,
 264    ) -> Result<InviteMemberResult> {
 265        self.transaction(move |tx| async move {
 266            let channel = self.get_channel_internal(channel_id, &tx).await?;
 267            self.check_user_is_channel_admin(&channel, inviter_id, &tx)
 268                .await?;
 269            if !channel.is_root() {
 270                Err(ErrorCode::NotARootChannel.anyhow())?
 271            }
 272
 273            channel_member::ActiveModel {
 274                id: ActiveValue::NotSet,
 275                channel_id: ActiveValue::Set(channel_id),
 276                user_id: ActiveValue::Set(invitee_id),
 277                accepted: ActiveValue::Set(false),
 278                role: ActiveValue::Set(role),
 279            }
 280            .insert(&*tx)
 281            .await?;
 282
 283            let channel = Channel::from_model(channel);
 284
 285            let notifications = self
 286                .create_notification(
 287                    invitee_id,
 288                    rpc::Notification::ChannelInvitation {
 289                        channel_id: channel_id.to_proto(),
 290                        channel_name: channel.name.clone(),
 291                        inviter_id: inviter_id.to_proto(),
 292                    },
 293                    true,
 294                    &tx,
 295                )
 296                .await?
 297                .into_iter()
 298                .collect();
 299
 300            Ok(InviteMemberResult {
 301                channel,
 302                notifications,
 303            })
 304        })
 305        .await
 306    }
 307
 308    fn sanitize_channel_name(name: &str) -> Result<&str> {
 309        let new_name = name.trim().trim_start_matches('#');
 310        if new_name.is_empty() {
 311            Err(anyhow!("channel name can't be blank"))?;
 312        }
 313        Ok(new_name)
 314    }
 315
 316    /// Renames the specified channel.
 317    pub async fn rename_channel(
 318        &self,
 319        channel_id: ChannelId,
 320        admin_id: UserId,
 321        new_name: &str,
 322    ) -> Result<channel::Model> {
 323        self.transaction(move |tx| async move {
 324            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 325
 326            let channel = self.get_channel_internal(channel_id, &tx).await?;
 327            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 328                .await?;
 329
 330            let mut model = channel.into_active_model();
 331            model.name = ActiveValue::Set(new_name.clone());
 332            let channel = model.update(&*tx).await?;
 333
 334            Ok(channel)
 335        })
 336        .await
 337    }
 338
 339    /// accept or decline an invite to join a channel
 340    pub async fn respond_to_channel_invite(
 341        &self,
 342        channel_id: ChannelId,
 343        user_id: UserId,
 344        accept: bool,
 345    ) -> Result<RespondToChannelInvite> {
 346        self.transaction(move |tx| async move {
 347            let channel = self.get_channel_internal(channel_id, &tx).await?;
 348
 349            let membership_update = if accept {
 350                let rows_affected = channel_member::Entity::update_many()
 351                    .set(channel_member::ActiveModel {
 352                        accepted: ActiveValue::Set(accept),
 353                        ..Default::default()
 354                    })
 355                    .filter(
 356                        channel_member::Column::ChannelId
 357                            .eq(channel_id)
 358                            .and(channel_member::Column::UserId.eq(user_id))
 359                            .and(channel_member::Column::Accepted.eq(false)),
 360                    )
 361                    .exec(&*tx)
 362                    .await?
 363                    .rows_affected;
 364
 365                if rows_affected == 0 {
 366                    Err(anyhow!("no such invitation"))?;
 367                }
 368
 369                Some(
 370                    self.calculate_membership_updated(&channel, user_id, &tx)
 371                        .await?,
 372                )
 373            } else {
 374                let rows_affected = channel_member::Entity::delete_many()
 375                    .filter(
 376                        channel_member::Column::ChannelId
 377                            .eq(channel_id)
 378                            .and(channel_member::Column::UserId.eq(user_id))
 379                            .and(channel_member::Column::Accepted.eq(false)),
 380                    )
 381                    .exec(&*tx)
 382                    .await?
 383                    .rows_affected;
 384                if rows_affected == 0 {
 385                    Err(anyhow!("no such invitation"))?;
 386                }
 387
 388                None
 389            };
 390
 391            Ok(RespondToChannelInvite {
 392                membership_update,
 393                notifications: self
 394                    .mark_notification_as_read_with_response(
 395                        user_id,
 396                        &rpc::Notification::ChannelInvitation {
 397                            channel_id: channel_id.to_proto(),
 398                            channel_name: Default::default(),
 399                            inviter_id: Default::default(),
 400                        },
 401                        accept,
 402                        &tx,
 403                    )
 404                    .await?
 405                    .into_iter()
 406                    .collect(),
 407            })
 408        })
 409        .await
 410    }
 411
 412    async fn calculate_membership_updated(
 413        &self,
 414        channel: &channel::Model,
 415        user_id: UserId,
 416        tx: &DatabaseTransaction,
 417    ) -> Result<MembershipUpdated> {
 418        let new_channels = self
 419            .get_user_channels(user_id, Some(channel), false, tx)
 420            .await?;
 421        let removed_channels = self
 422            .get_channel_descendants_excluding_self([channel], tx)
 423            .await?
 424            .into_iter()
 425            .map(|channel| channel.id)
 426            .chain([channel.id])
 427            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 428            .collect::<Vec<_>>();
 429
 430        Ok(MembershipUpdated {
 431            channel_id: channel.id,
 432            new_channels,
 433            removed_channels,
 434        })
 435    }
 436
 437    /// Removes a channel member.
 438    pub async fn remove_channel_member(
 439        &self,
 440        channel_id: ChannelId,
 441        member_id: UserId,
 442        admin_id: UserId,
 443    ) -> Result<RemoveChannelMemberResult> {
 444        self.transaction(|tx| async move {
 445            let channel = self.get_channel_internal(channel_id, &tx).await?;
 446
 447            if member_id != admin_id {
 448                self.check_user_is_channel_admin(&channel, admin_id, &tx)
 449                    .await?;
 450            }
 451
 452            let result = channel_member::Entity::delete_many()
 453                .filter(
 454                    channel_member::Column::ChannelId
 455                        .eq(channel_id)
 456                        .and(channel_member::Column::UserId.eq(member_id)),
 457                )
 458                .exec(&*tx)
 459                .await?;
 460
 461            if result.rows_affected == 0 {
 462                Err(anyhow!("no such member"))?;
 463            }
 464
 465            Ok(RemoveChannelMemberResult {
 466                membership_update: self
 467                    .calculate_membership_updated(&channel, member_id, &tx)
 468                    .await?,
 469                notification_id: self
 470                    .remove_notification(
 471                        member_id,
 472                        rpc::Notification::ChannelInvitation {
 473                            channel_id: channel_id.to_proto(),
 474                            channel_name: Default::default(),
 475                            inviter_id: Default::default(),
 476                        },
 477                        &tx,
 478                    )
 479                    .await?,
 480            })
 481        })
 482        .await
 483    }
 484
 485    /// Returns all channels for the user with the given ID.
 486    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 487        self.transaction(|tx| async move { self.get_user_channels(user_id, None, true, &tx).await })
 488            .await
 489    }
 490
 491    /// Returns all channels for the user with the given ID that are descendants
 492    /// of the specified ancestor channel.
 493    pub async fn get_user_channels(
 494        &self,
 495        user_id: UserId,
 496        ancestor_channel: Option<&channel::Model>,
 497        include_invites: bool,
 498        tx: &DatabaseTransaction,
 499    ) -> Result<ChannelsForUser> {
 500        let mut filter = channel_member::Column::UserId.eq(user_id);
 501        if !include_invites {
 502            filter = filter.and(channel_member::Column::Accepted.eq(true))
 503        }
 504        if let Some(ancestor) = ancestor_channel {
 505            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 506        }
 507
 508        let mut channels = Vec::<channel::Model>::new();
 509        let mut invited_channels = Vec::<Channel>::new();
 510        let mut channel_memberships = Vec::<channel_member::Model>::new();
 511        let mut rows = channel_member::Entity::find()
 512            .filter(filter)
 513            .inner_join(channel::Entity)
 514            .select_also(channel::Entity)
 515            .stream(tx)
 516            .await?;
 517        while let Some(row) = rows.next().await {
 518            if let (membership, Some(channel)) = row? {
 519                if membership.accepted {
 520                    channel_memberships.push(membership);
 521                    channels.push(channel);
 522                } else {
 523                    invited_channels.push(Channel::from_model(channel));
 524                }
 525            }
 526        }
 527        drop(rows);
 528
 529        let mut descendants = self
 530            .get_channel_descendants_excluding_self(channels.iter(), tx)
 531            .await?;
 532
 533        for channel in channels {
 534            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 535                descendants.insert(ix, channel);
 536            }
 537        }
 538
 539        let roles_by_channel_id = channel_memberships
 540            .iter()
 541            .map(|membership| (membership.channel_id, membership.role))
 542            .collect::<HashMap<_, _>>();
 543
 544        let channels: Vec<Channel> = descendants
 545            .into_iter()
 546            .filter_map(|channel| {
 547                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 548                if parent_role.can_see_channel(channel.visibility) {
 549                    Some(Channel::from_model(channel))
 550                } else {
 551                    None
 552                }
 553            })
 554            .collect();
 555
 556        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 557        enum QueryUserIdsAndChannelIds {
 558            ChannelId,
 559            UserId,
 560        }
 561
 562        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 563        {
 564            let mut rows = room_participant::Entity::find()
 565                .inner_join(room::Entity)
 566                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 567                .select_only()
 568                .column(room::Column::ChannelId)
 569                .column(room_participant::Column::UserId)
 570                .into_values::<_, QueryUserIdsAndChannelIds>()
 571                .stream(tx)
 572                .await?;
 573            while let Some(row) = rows.next().await {
 574                let row: (ChannelId, UserId) = row?;
 575                channel_participants.entry(row.0).or_default().push(row.1)
 576            }
 577        }
 578
 579        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 580
 581        let mut channel_ids_by_buffer_id = HashMap::default();
 582        let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
 583        let mut rows = buffer::Entity::find()
 584            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 585            .stream(tx)
 586            .await?;
 587        while let Some(row) = rows.next().await {
 588            let row = row?;
 589            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 590            latest_buffer_versions.push(ChannelBufferVersion {
 591                channel_id: row.channel_id.0 as u64,
 592                epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
 593                version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
 594                    .latest_operation_lamport_timestamp
 595                    .zip(row.latest_operation_replica_id)
 596                {
 597                    vec![VectorClockEntry {
 598                        timestamp: latest_lamport_timestamp as u32,
 599                        replica_id: latest_replica_id as u32,
 600                    }]
 601                } else {
 602                    vec![]
 603                },
 604            });
 605        }
 606        drop(rows);
 607
 608        let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
 609
 610        let observed_buffer_versions = self
 611            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
 612            .await?;
 613
 614        let observed_channel_messages = self
 615            .observed_channel_messages(&channel_ids, user_id, tx)
 616            .await?;
 617
 618        let hosted_projects = self
 619            .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
 620            .await?;
 621
 622        Ok(ChannelsForUser {
 623            channel_memberships,
 624            channels,
 625            invited_channels,
 626            hosted_projects,
 627            channel_participants,
 628            latest_buffer_versions,
 629            latest_channel_messages,
 630            observed_buffer_versions,
 631            observed_channel_messages,
 632        })
 633    }
 634
 635    /// Sets the role for the specified channel member.
 636    pub async fn set_channel_member_role(
 637        &self,
 638        channel_id: ChannelId,
 639        admin_id: UserId,
 640        for_user: UserId,
 641        role: ChannelRole,
 642    ) -> Result<SetMemberRoleResult> {
 643        self.transaction(|tx| async move {
 644            let channel = self.get_channel_internal(channel_id, &tx).await?;
 645            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 646                .await?;
 647
 648            let membership = channel_member::Entity::find()
 649                .filter(
 650                    channel_member::Column::ChannelId
 651                        .eq(channel_id)
 652                        .and(channel_member::Column::UserId.eq(for_user)),
 653                )
 654                .one(&*tx)
 655                .await?;
 656
 657            let Some(membership) = membership else {
 658                Err(anyhow!("no such member"))?
 659            };
 660
 661            let mut update = membership.into_active_model();
 662            update.role = ActiveValue::Set(role);
 663            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 664
 665            if updated.accepted {
 666                Ok(SetMemberRoleResult::MembershipUpdated(
 667                    self.calculate_membership_updated(&channel, for_user, &tx)
 668                        .await?,
 669                ))
 670            } else {
 671                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 672                    channel,
 673                )))
 674            }
 675        })
 676        .await
 677    }
 678
 679    /// Returns the details for the specified channel member.
 680    pub async fn get_channel_participant_details(
 681        &self,
 682        channel_id: ChannelId,
 683        filter: &str,
 684        limit: u64,
 685        user_id: UserId,
 686    ) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
 687        let members = self
 688            .transaction(move |tx| async move {
 689                let channel = self.get_channel_internal(channel_id, &tx).await?;
 690                self.check_user_is_channel_participant(&channel, user_id, &tx)
 691                    .await?;
 692                let mut query = channel_member::Entity::find()
 693                    .find_also_related(user::Entity)
 694                    .filter(channel_member::Column::ChannelId.eq(channel.root_id()));
 695
 696                if cfg!(any(test, feature = "sqlite")) && self.pool.get_database_backend() == DbBackend::Sqlite {
 697                    query = query.filter(Expr::cust_with_values(
 698                        "UPPER(github_login) LIKE ?",
 699                        [Self::fuzzy_like_string(&filter.to_uppercase())],
 700                    ))
 701                } else {
 702                    query = query.filter(Expr::cust_with_values(
 703                        "github_login ILIKE $1",
 704                        [Self::fuzzy_like_string(filter)],
 705                    ))
 706                }
 707                let members = query.order_by(
 708                        Expr::cust(
 709                            "not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
 710                        ),
 711                        sea_orm::Order::Asc,
 712                    )
 713                    .limit(limit)
 714                    .all(&*tx)
 715                    .await?;
 716
 717                Ok(members)
 718            })
 719            .await?;
 720
 721        let mut users: Vec<proto::User> = Vec::with_capacity(members.len());
 722
 723        let members = members
 724            .into_iter()
 725            .map(|(member, user)| {
 726                if let Some(user) = user {
 727                    users.push(proto::User {
 728                        id: user.id.to_proto(),
 729                        avatar_url: format!(
 730                            "https://github.com/{}.png?size=128",
 731                            user.github_login
 732                        ),
 733                        github_login: user.github_login,
 734                    })
 735                }
 736                proto::ChannelMember {
 737                    role: member.role.into(),
 738                    user_id: member.user_id.to_proto(),
 739                    kind: if member.accepted {
 740                        Kind::Member
 741                    } else {
 742                        Kind::Invitee
 743                    }
 744                    .into(),
 745                }
 746            })
 747            .collect();
 748
 749        Ok((members, users))
 750    }
 751
 752    /// Returns whether the given user is an admin in the specified channel.
 753    pub async fn check_user_is_channel_admin(
 754        &self,
 755        channel: &channel::Model,
 756        user_id: UserId,
 757        tx: &DatabaseTransaction,
 758    ) -> Result<ChannelRole> {
 759        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 760        match role {
 761            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 762            Some(ChannelRole::Member)
 763            | Some(ChannelRole::Talker)
 764            | Some(ChannelRole::Banned)
 765            | Some(ChannelRole::Guest)
 766            | None => Err(anyhow!(
 767                "user is not a channel admin or channel does not exist"
 768            ))?,
 769        }
 770    }
 771
 772    /// Returns whether the given user is a member of the specified channel.
 773    pub async fn check_user_is_channel_member(
 774        &self,
 775        channel: &channel::Model,
 776        user_id: UserId,
 777        tx: &DatabaseTransaction,
 778    ) -> Result<ChannelRole> {
 779        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 780        match channel_role {
 781            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 782            Some(ChannelRole::Banned)
 783            | Some(ChannelRole::Guest)
 784            | Some(ChannelRole::Talker)
 785            | None => Err(anyhow!(
 786                "user is not a channel member or channel does not exist"
 787            ))?,
 788        }
 789    }
 790
 791    /// Returns whether the given user is a participant in the specified channel.
 792    pub async fn check_user_is_channel_participant(
 793        &self,
 794        channel: &channel::Model,
 795        user_id: UserId,
 796        tx: &DatabaseTransaction,
 797    ) -> Result<ChannelRole> {
 798        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 799        match role {
 800            Some(ChannelRole::Admin)
 801            | Some(ChannelRole::Member)
 802            | Some(ChannelRole::Guest)
 803            | Some(ChannelRole::Talker) => Ok(role.unwrap()),
 804            Some(ChannelRole::Banned) | None => Err(anyhow!(
 805                "user is not a channel participant or channel does not exist"
 806            ))?,
 807        }
 808    }
 809
 810    /// Returns a user's pending invite for the given channel, if one exists.
 811    pub async fn pending_invite_for_channel(
 812        &self,
 813        channel: &channel::Model,
 814        user_id: UserId,
 815        tx: &DatabaseTransaction,
 816    ) -> Result<Option<channel_member::Model>> {
 817        let row = channel_member::Entity::find()
 818            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 819            .filter(channel_member::Column::UserId.eq(user_id))
 820            .filter(channel_member::Column::Accepted.eq(false))
 821            .one(tx)
 822            .await?;
 823
 824        Ok(row)
 825    }
 826
 827    /// Returns the role for a user in the given channel.
 828    pub async fn channel_role_for_user(
 829        &self,
 830        channel: &channel::Model,
 831        user_id: UserId,
 832        tx: &DatabaseTransaction,
 833    ) -> Result<Option<ChannelRole>> {
 834        let membership = channel_member::Entity::find()
 835            .filter(
 836                channel_member::Column::ChannelId
 837                    .eq(channel.root_id())
 838                    .and(channel_member::Column::UserId.eq(user_id))
 839                    .and(channel_member::Column::Accepted.eq(true)),
 840            )
 841            .one(tx)
 842            .await?;
 843
 844        let Some(membership) = membership else {
 845            return Ok(None);
 846        };
 847
 848        if !membership.role.can_see_channel(channel.visibility) {
 849            return Ok(None);
 850        }
 851
 852        Ok(Some(membership.role))
 853    }
 854
 855    // Get the descendants of the given set if channels, ordered by their
 856    // path.
 857    pub(crate) async fn get_channel_descendants_excluding_self(
 858        &self,
 859        channels: impl IntoIterator<Item = &channel::Model>,
 860        tx: &DatabaseTransaction,
 861    ) -> Result<Vec<channel::Model>> {
 862        let mut filter = Condition::any();
 863        for channel in channels.into_iter() {
 864            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 865        }
 866
 867        if filter.is_empty() {
 868            return Ok(vec![]);
 869        }
 870
 871        Ok(channel::Entity::find()
 872            .filter(filter)
 873            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 874            .all(tx)
 875            .await?)
 876    }
 877
 878    /// Returns the channel with the given ID.
 879    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 880        self.transaction(|tx| async move {
 881            let channel = self.get_channel_internal(channel_id, &tx).await?;
 882            self.check_user_is_channel_participant(&channel, user_id, &tx)
 883                .await?;
 884
 885            Ok(Channel::from_model(channel))
 886        })
 887        .await
 888    }
 889
 890    pub(crate) async fn get_channel_internal(
 891        &self,
 892        channel_id: ChannelId,
 893        tx: &DatabaseTransaction,
 894    ) -> Result<channel::Model> {
 895        Ok(channel::Entity::find_by_id(channel_id)
 896            .one(tx)
 897            .await?
 898            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 899    }
 900
 901    pub(crate) async fn get_or_create_channel_room(
 902        &self,
 903        channel_id: ChannelId,
 904        live_kit_room: &str,
 905        tx: &DatabaseTransaction,
 906    ) -> Result<RoomId> {
 907        let room = room::Entity::find()
 908            .filter(room::Column::ChannelId.eq(channel_id))
 909            .one(tx)
 910            .await?;
 911
 912        let room_id = if let Some(room) = room {
 913            room.id
 914        } else {
 915            let result = room::Entity::insert(room::ActiveModel {
 916                channel_id: ActiveValue::Set(Some(channel_id)),
 917                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 918                ..Default::default()
 919            })
 920            .exec(tx)
 921            .await?;
 922
 923            result.last_insert_id
 924        };
 925
 926        Ok(room_id)
 927    }
 928
 929    /// Move a channel from one parent to another
 930    pub async fn move_channel(
 931        &self,
 932        channel_id: ChannelId,
 933        new_parent_id: ChannelId,
 934        admin_id: UserId,
 935    ) -> Result<(ChannelId, Vec<Channel>)> {
 936        self.transaction(|tx| async move {
 937            let channel = self.get_channel_internal(channel_id, &tx).await?;
 938            self.check_user_is_channel_admin(&channel, admin_id, &tx)
 939                .await?;
 940            let new_parent = self.get_channel_internal(new_parent_id, &tx).await?;
 941
 942            if new_parent.root_id() != channel.root_id() {
 943                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 944            }
 945
 946            if new_parent
 947                .ancestors_including_self()
 948                .any(|id| id == channel.id)
 949            {
 950                Err(anyhow!(ErrorCode::CircularNesting))?;
 951            }
 952
 953            if channel.visibility == ChannelVisibility::Public
 954                && new_parent.visibility != ChannelVisibility::Public
 955            {
 956                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 957            }
 958
 959            let root_id = channel.root_id();
 960            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 961            let new_path = format!("{}{}/", new_parent.path(), channel.id);
 962
 963            let mut model = channel.into_active_model();
 964            model.parent_path = ActiveValue::Set(new_parent.path());
 965            let channel = model.update(&*tx).await?;
 966
 967            let descendent_ids =
 968                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 969                    self.pool.get_database_backend(),
 970                    "
 971                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 972                    WHERE parent_path LIKE $3 || '%'
 973                    RETURNING id
 974                ",
 975                    [old_path.clone().into(), new_path.into(), old_path.into()],
 976                ))
 977                .all(&*tx)
 978                .await?;
 979
 980            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 981
 982            let channels = channel::Entity::find()
 983                .filter(channel::Column::Id.is_in(all_moved_ids))
 984                .all(&*tx)
 985                .await?
 986                .into_iter()
 987                .map(Channel::from_model)
 988                .collect::<Vec<_>>();
 989
 990            Ok((root_id, channels))
 991        })
 992        .await
 993    }
 994}
 995
 996#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 997enum QueryIds {
 998    Id,
 999}
1000
1001#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1002enum QueryUserIds {
1003    UserId,
1004}