channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    /// Adds a user to the specified channel.
 101    pub async fn join_channel(
 102        &self,
 103        channel_id: ChannelId,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 107        self.transaction(move |tx| async move {
 108            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 109            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 110
 111            let mut accept_invite_result = None;
 112
 113            if role.is_none() {
 114                if let Some(invitation) = self
 115                    .pending_invite_for_channel(&channel, user_id, &*tx)
 116                    .await?
 117                {
 118                    // note, this may be a parent channel
 119                    role = Some(invitation.role);
 120                    channel_member::Entity::update(channel_member::ActiveModel {
 121                        accepted: ActiveValue::Set(true),
 122                        ..invitation.into_active_model()
 123                    })
 124                    .exec(&*tx)
 125                    .await?;
 126
 127                    accept_invite_result = Some(
 128                        self.calculate_membership_updated(&channel, user_id, &*tx)
 129                            .await?,
 130                    );
 131
 132                    debug_assert!(
 133                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 134                    );
 135                } else if channel.visibility == ChannelVisibility::Public {
 136                    role = Some(ChannelRole::Guest);
 137                    channel_member::Entity::insert(channel_member::ActiveModel {
 138                        id: ActiveValue::NotSet,
 139                        channel_id: ActiveValue::Set(channel.root_id()),
 140                        user_id: ActiveValue::Set(user_id),
 141                        accepted: ActiveValue::Set(true),
 142                        role: ActiveValue::Set(ChannelRole::Guest),
 143                    })
 144                    .exec(&*tx)
 145                    .await?;
 146
 147                    accept_invite_result = Some(
 148                        self.calculate_membership_updated(&channel, user_id, &*tx)
 149                            .await?,
 150                    );
 151
 152                    debug_assert!(
 153                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 154                    );
 155                }
 156            }
 157
 158            if role.is_none() || role == Some(ChannelRole::Banned) {
 159                Err(ErrorCode::Forbidden.anyhow())?
 160            }
 161            let role = role.unwrap();
 162
 163            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 164            let room_id = self
 165                .get_or_create_channel_room(channel_id, &live_kit_room, &*tx)
 166                .await?;
 167
 168            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 169                .await
 170                .map(|jr| (jr, accept_invite_result, role))
 171        })
 172        .await
 173    }
 174
 175    /// Sets the visibility of the given channel.
 176    pub async fn set_channel_visibility(
 177        &self,
 178        channel_id: ChannelId,
 179        visibility: ChannelVisibility,
 180        admin_id: UserId,
 181    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 182        self.transaction(move |tx| async move {
 183            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 184            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 185                .await?;
 186
 187            if visibility == ChannelVisibility::Public {
 188                if let Some(parent_id) = channel.parent_id() {
 189                    let parent = self.get_channel_internal(parent_id, &*tx).await?;
 190
 191                    if parent.visibility != ChannelVisibility::Public {
 192                        Err(ErrorCode::BadPublicNesting
 193                            .with_tag("direction", "parent")
 194                            .anyhow())?;
 195                    }
 196                }
 197            } else if visibility == ChannelVisibility::Members {
 198                if self
 199                    .get_channel_descendants_excluding_self([&channel], &*tx)
 200                    .await?
 201                    .into_iter()
 202                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 203                {
 204                    Err(ErrorCode::BadPublicNesting
 205                        .with_tag("direction", "children")
 206                        .anyhow())?;
 207                }
 208            }
 209
 210            let mut model = channel.into_active_model();
 211            model.visibility = ActiveValue::Set(visibility);
 212            let channel = model.update(&*tx).await?;
 213
 214            let channel_members = channel_member::Entity::find()
 215                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 216                .all(&*tx)
 217                .await?;
 218
 219            Ok((Channel::from_model(channel), channel_members))
 220        })
 221        .await
 222    }
 223
 224    #[cfg(test)]
 225    pub async fn set_channel_requires_zed_cla(
 226        &self,
 227        channel_id: ChannelId,
 228        requires_zed_cla: bool,
 229    ) -> Result<()> {
 230        self.transaction(move |tx| async move {
 231            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 232            let mut model = channel.into_active_model();
 233            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 234            model.update(&*tx).await?;
 235            Ok(())
 236        })
 237        .await
 238    }
 239
 240    /// Deletes the channel with the specified ID.
 241    pub async fn delete_channel(
 242        &self,
 243        channel_id: ChannelId,
 244        user_id: UserId,
 245    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 246        self.transaction(move |tx| async move {
 247            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 248            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 249                .await?;
 250
 251            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 252                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 253                .select_only()
 254                .column(channel_member::Column::UserId)
 255                .distinct()
 256                .into_values::<_, QueryUserIds>()
 257                .all(&*tx)
 258                .await?;
 259
 260            let channels_to_remove = self
 261                .get_channel_descendants_excluding_self([&channel], &*tx)
 262                .await?
 263                .into_iter()
 264                .map(|channel| channel.id)
 265                .chain(Some(channel_id))
 266                .collect::<Vec<_>>();
 267
 268            channel::Entity::delete_many()
 269                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 270                .exec(&*tx)
 271                .await?;
 272
 273            Ok((channels_to_remove, members_to_notify))
 274        })
 275        .await
 276    }
 277
 278    /// Invites a user to a channel as a member.
 279    pub async fn invite_channel_member(
 280        &self,
 281        channel_id: ChannelId,
 282        invitee_id: UserId,
 283        inviter_id: UserId,
 284        role: ChannelRole,
 285    ) -> Result<InviteMemberResult> {
 286        self.transaction(move |tx| async move {
 287            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 288            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 289                .await?;
 290            if !channel.is_root() {
 291                Err(ErrorCode::NotARootChannel.anyhow())?
 292            }
 293
 294            channel_member::ActiveModel {
 295                id: ActiveValue::NotSet,
 296                channel_id: ActiveValue::Set(channel_id),
 297                user_id: ActiveValue::Set(invitee_id),
 298                accepted: ActiveValue::Set(false),
 299                role: ActiveValue::Set(role),
 300            }
 301            .insert(&*tx)
 302            .await?;
 303
 304            let channel = Channel::from_model(channel);
 305
 306            let notifications = self
 307                .create_notification(
 308                    invitee_id,
 309                    rpc::Notification::ChannelInvitation {
 310                        channel_id: channel_id.to_proto(),
 311                        channel_name: channel.name.clone(),
 312                        inviter_id: inviter_id.to_proto(),
 313                    },
 314                    true,
 315                    &*tx,
 316                )
 317                .await?
 318                .into_iter()
 319                .collect();
 320
 321            Ok(InviteMemberResult {
 322                channel,
 323                notifications,
 324            })
 325        })
 326        .await
 327    }
 328
 329    fn sanitize_channel_name(name: &str) -> Result<&str> {
 330        let new_name = name.trim().trim_start_matches('#');
 331        if new_name == "" {
 332            Err(anyhow!("channel name can't be blank"))?;
 333        }
 334        Ok(new_name)
 335    }
 336
 337    /// Renames the specified channel.
 338    pub async fn rename_channel(
 339        &self,
 340        channel_id: ChannelId,
 341        admin_id: UserId,
 342        new_name: &str,
 343    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 344        self.transaction(move |tx| async move {
 345            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 346
 347            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 348            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 349                .await?;
 350
 351            let mut model = channel.into_active_model();
 352            model.name = ActiveValue::Set(new_name.clone());
 353            let channel = model.update(&*tx).await?;
 354
 355            let channel_members = channel_member::Entity::find()
 356                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 357                .all(&*tx)
 358                .await?;
 359
 360            Ok((Channel::from_model(channel), channel_members))
 361        })
 362        .await
 363    }
 364
 365    /// accept or decline an invite to join a channel
 366    pub async fn respond_to_channel_invite(
 367        &self,
 368        channel_id: ChannelId,
 369        user_id: UserId,
 370        accept: bool,
 371    ) -> Result<RespondToChannelInvite> {
 372        self.transaction(move |tx| async move {
 373            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 374
 375            let membership_update = if accept {
 376                let rows_affected = channel_member::Entity::update_many()
 377                    .set(channel_member::ActiveModel {
 378                        accepted: ActiveValue::Set(accept),
 379                        ..Default::default()
 380                    })
 381                    .filter(
 382                        channel_member::Column::ChannelId
 383                            .eq(channel_id)
 384                            .and(channel_member::Column::UserId.eq(user_id))
 385                            .and(channel_member::Column::Accepted.eq(false)),
 386                    )
 387                    .exec(&*tx)
 388                    .await?
 389                    .rows_affected;
 390
 391                if rows_affected == 0 {
 392                    Err(anyhow!("no such invitation"))?;
 393                }
 394
 395                Some(
 396                    self.calculate_membership_updated(&channel, user_id, &*tx)
 397                        .await?,
 398                )
 399            } else {
 400                let rows_affected = channel_member::Entity::delete_many()
 401                    .filter(
 402                        channel_member::Column::ChannelId
 403                            .eq(channel_id)
 404                            .and(channel_member::Column::UserId.eq(user_id))
 405                            .and(channel_member::Column::Accepted.eq(false)),
 406                    )
 407                    .exec(&*tx)
 408                    .await?
 409                    .rows_affected;
 410                if rows_affected == 0 {
 411                    Err(anyhow!("no such invitation"))?;
 412                }
 413
 414                None
 415            };
 416
 417            Ok(RespondToChannelInvite {
 418                membership_update,
 419                notifications: self
 420                    .mark_notification_as_read_with_response(
 421                        user_id,
 422                        &rpc::Notification::ChannelInvitation {
 423                            channel_id: channel_id.to_proto(),
 424                            channel_name: Default::default(),
 425                            inviter_id: Default::default(),
 426                        },
 427                        accept,
 428                        &*tx,
 429                    )
 430                    .await?
 431                    .into_iter()
 432                    .collect(),
 433            })
 434        })
 435        .await
 436    }
 437
 438    async fn calculate_membership_updated(
 439        &self,
 440        channel: &channel::Model,
 441        user_id: UserId,
 442        tx: &DatabaseTransaction,
 443    ) -> Result<MembershipUpdated> {
 444        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 445        let removed_channels = self
 446            .get_channel_descendants_excluding_self([channel], &*tx)
 447            .await?
 448            .into_iter()
 449            .map(|channel| channel.id)
 450            .chain([channel.id])
 451            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 452            .collect::<Vec<_>>();
 453
 454        Ok(MembershipUpdated {
 455            channel_id: channel.id,
 456            new_channels,
 457            removed_channels,
 458        })
 459    }
 460
 461    /// Removes a channel member.
 462    pub async fn remove_channel_member(
 463        &self,
 464        channel_id: ChannelId,
 465        member_id: UserId,
 466        admin_id: UserId,
 467    ) -> Result<RemoveChannelMemberResult> {
 468        self.transaction(|tx| async move {
 469            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 470
 471            if member_id != admin_id {
 472                self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 473                    .await?;
 474            }
 475
 476            let result = channel_member::Entity::delete_many()
 477                .filter(
 478                    channel_member::Column::ChannelId
 479                        .eq(channel_id)
 480                        .and(channel_member::Column::UserId.eq(member_id)),
 481                )
 482                .exec(&*tx)
 483                .await?;
 484
 485            if result.rows_affected == 0 {
 486                Err(anyhow!("no such member"))?;
 487            }
 488
 489            Ok(RemoveChannelMemberResult {
 490                membership_update: self
 491                    .calculate_membership_updated(&channel, member_id, &*tx)
 492                    .await?,
 493                notification_id: self
 494                    .remove_notification(
 495                        member_id,
 496                        rpc::Notification::ChannelInvitation {
 497                            channel_id: channel_id.to_proto(),
 498                            channel_name: Default::default(),
 499                            inviter_id: Default::default(),
 500                        },
 501                        &*tx,
 502                    )
 503                    .await?,
 504            })
 505        })
 506        .await
 507    }
 508
 509    /// Returns all channel invites for the user with the given ID.
 510    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 511        self.transaction(|tx| async move {
 512            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 513
 514            let channel_invites = channel_member::Entity::find()
 515                .filter(
 516                    channel_member::Column::UserId
 517                        .eq(user_id)
 518                        .and(channel_member::Column::Accepted.eq(false)),
 519                )
 520                .all(&*tx)
 521                .await?;
 522
 523            for invite in channel_invites {
 524                role_for_channel.insert(invite.channel_id, invite.role);
 525            }
 526
 527            let channels = channel::Entity::find()
 528                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 529                .all(&*tx)
 530                .await?;
 531
 532            let channels = channels
 533                .into_iter()
 534                .filter_map(|channel| Some(Channel::from_model(channel)))
 535                .collect();
 536
 537            Ok(channels)
 538        })
 539        .await
 540    }
 541
 542    /// Returns all channels for the user with the given ID.
 543    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 544        self.transaction(|tx| async move {
 545            let tx = tx;
 546
 547            self.get_user_channels(user_id, None, &tx).await
 548        })
 549        .await
 550    }
 551
 552    /// Returns all channels for the user with the given ID that are descendants
 553    /// of the specified ancestor channel.
 554    pub async fn get_user_channels(
 555        &self,
 556        user_id: UserId,
 557        ancestor_channel: Option<&channel::Model>,
 558        tx: &DatabaseTransaction,
 559    ) -> Result<ChannelsForUser> {
 560        let mut filter = channel_member::Column::UserId
 561            .eq(user_id)
 562            .and(channel_member::Column::Accepted.eq(true));
 563
 564        if let Some(ancestor) = ancestor_channel {
 565            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 566        }
 567
 568        let channel_memberships = channel_member::Entity::find()
 569            .filter(filter)
 570            .all(&*tx)
 571            .await?;
 572
 573        let channels = channel::Entity::find()
 574            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 575            .all(&*tx)
 576            .await?;
 577
 578        let mut descendants = self
 579            .get_channel_descendants_excluding_self(channels.iter(), &*tx)
 580            .await?;
 581
 582        for channel in channels {
 583            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 584                descendants.insert(ix, channel);
 585            }
 586        }
 587
 588        let roles_by_channel_id = channel_memberships
 589            .iter()
 590            .map(|membership| (membership.channel_id, membership.role))
 591            .collect::<HashMap<_, _>>();
 592
 593        let channels: Vec<Channel> = descendants
 594            .into_iter()
 595            .filter_map(|channel| {
 596                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 597                if parent_role.can_see_channel(channel.visibility) {
 598                    Some(Channel::from_model(channel))
 599                } else {
 600                    None
 601                }
 602            })
 603            .collect();
 604
 605        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 606        enum QueryUserIdsAndChannelIds {
 607            ChannelId,
 608            UserId,
 609        }
 610
 611        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 612        {
 613            let mut rows = room_participant::Entity::find()
 614                .inner_join(room::Entity)
 615                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 616                .select_only()
 617                .column(room::Column::ChannelId)
 618                .column(room_participant::Column::UserId)
 619                .into_values::<_, QueryUserIdsAndChannelIds>()
 620                .stream(&*tx)
 621                .await?;
 622            while let Some(row) = rows.next().await {
 623                let row: (ChannelId, UserId) = row?;
 624                channel_participants.entry(row.0).or_default().push(row.1)
 625            }
 626        }
 627
 628        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 629
 630        let mut channel_ids_by_buffer_id = HashMap::default();
 631        let mut rows = buffer::Entity::find()
 632            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
 633            .stream(&*tx)
 634            .await?;
 635        while let Some(row) = rows.next().await {
 636            let row = row?;
 637            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
 638        }
 639        drop(rows);
 640
 641        let latest_buffer_versions = self
 642            .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
 643            .await?;
 644
 645        let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
 646
 647        let observed_buffer_versions = self
 648            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
 649            .await?;
 650
 651        let observed_channel_messages = self
 652            .observed_channel_messages(&channel_ids, user_id, &*tx)
 653            .await?;
 654
 655        Ok(ChannelsForUser {
 656            channel_memberships,
 657            channels,
 658            channel_participants,
 659            latest_buffer_versions,
 660            latest_channel_messages,
 661            observed_buffer_versions,
 662            observed_channel_messages,
 663        })
 664    }
 665
 666    /// Sets the role for the specified channel member.
 667    pub async fn set_channel_member_role(
 668        &self,
 669        channel_id: ChannelId,
 670        admin_id: UserId,
 671        for_user: UserId,
 672        role: ChannelRole,
 673    ) -> Result<SetMemberRoleResult> {
 674        self.transaction(|tx| async move {
 675            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 676            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 677                .await?;
 678
 679            let membership = channel_member::Entity::find()
 680                .filter(
 681                    channel_member::Column::ChannelId
 682                        .eq(channel_id)
 683                        .and(channel_member::Column::UserId.eq(for_user)),
 684                )
 685                .one(&*tx)
 686                .await?;
 687
 688            let Some(membership) = membership else {
 689                Err(anyhow!("no such member"))?
 690            };
 691
 692            let mut update = membership.into_active_model();
 693            update.role = ActiveValue::Set(role);
 694            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 695
 696            if updated.accepted {
 697                Ok(SetMemberRoleResult::MembershipUpdated(
 698                    self.calculate_membership_updated(&channel, for_user, &*tx)
 699                        .await?,
 700                ))
 701            } else {
 702                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 703                    channel,
 704                )))
 705            }
 706        })
 707        .await
 708    }
 709
 710    /// Returns the details for the specified channel member.
 711    pub async fn get_channel_participant_details(
 712        &self,
 713        channel_id: ChannelId,
 714        user_id: UserId,
 715    ) -> Result<Vec<proto::ChannelMember>> {
 716        let (role, members) = self
 717            .transaction(move |tx| async move {
 718                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 719                let role = self
 720                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 721                    .await?;
 722                Ok((
 723                    role,
 724                    self.get_channel_participant_details_internal(&channel, &*tx)
 725                        .await?,
 726                ))
 727            })
 728            .await?;
 729
 730        if role == ChannelRole::Admin {
 731            Ok(members
 732                .into_iter()
 733                .map(|channel_member| proto::ChannelMember {
 734                    role: channel_member.role.into(),
 735                    user_id: channel_member.user_id.to_proto(),
 736                    kind: if channel_member.accepted {
 737                        Kind::Member
 738                    } else {
 739                        Kind::Invitee
 740                    }
 741                    .into(),
 742                })
 743                .collect())
 744        } else {
 745            return Ok(members
 746                .into_iter()
 747                .filter_map(|member| {
 748                    if !member.accepted {
 749                        return None;
 750                    }
 751                    Some(proto::ChannelMember {
 752                        role: member.role.into(),
 753                        user_id: member.user_id.to_proto(),
 754                        kind: Kind::Member.into(),
 755                    })
 756                })
 757                .collect());
 758        }
 759    }
 760
 761    async fn get_channel_participant_details_internal(
 762        &self,
 763        channel: &channel::Model,
 764        tx: &DatabaseTransaction,
 765    ) -> Result<Vec<channel_member::Model>> {
 766        Ok(channel_member::Entity::find()
 767            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 768            .all(tx)
 769            .await?)
 770    }
 771
 772    /// Returns the participants in the given channel.
 773    pub async fn get_channel_participants(
 774        &self,
 775        channel: &channel::Model,
 776        tx: &DatabaseTransaction,
 777    ) -> Result<Vec<UserId>> {
 778        let participants = self
 779            .get_channel_participant_details_internal(channel, &*tx)
 780            .await?;
 781        Ok(participants
 782            .into_iter()
 783            .map(|member| member.user_id)
 784            .collect())
 785    }
 786
 787    /// Returns whether the given user is an admin in the specified channel.
 788    pub async fn check_user_is_channel_admin(
 789        &self,
 790        channel: &channel::Model,
 791        user_id: UserId,
 792        tx: &DatabaseTransaction,
 793    ) -> Result<ChannelRole> {
 794        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 795        match role {
 796            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 797            Some(ChannelRole::Member)
 798            | Some(ChannelRole::Banned)
 799            | Some(ChannelRole::Guest)
 800            | None => Err(anyhow!(
 801                "user is not a channel admin or channel does not exist"
 802            ))?,
 803        }
 804    }
 805
 806    /// Returns whether the given user is a member of the specified channel.
 807    pub async fn check_user_is_channel_member(
 808        &self,
 809        channel: &channel::Model,
 810        user_id: UserId,
 811        tx: &DatabaseTransaction,
 812    ) -> Result<ChannelRole> {
 813        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 814        match channel_role {
 815            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 816            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 817                "user is not a channel member or channel does not exist"
 818            ))?,
 819        }
 820    }
 821
 822    /// Returns whether the given user is a participant in the specified channel.
 823    pub async fn check_user_is_channel_participant(
 824        &self,
 825        channel: &channel::Model,
 826        user_id: UserId,
 827        tx: &DatabaseTransaction,
 828    ) -> Result<ChannelRole> {
 829        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 830        match role {
 831            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 832                Ok(role.unwrap())
 833            }
 834            Some(ChannelRole::Banned) | None => Err(anyhow!(
 835                "user is not a channel participant or channel does not exist"
 836            ))?,
 837        }
 838    }
 839
 840    /// Returns a user's pending invite for the given channel, if one exists.
 841    pub async fn pending_invite_for_channel(
 842        &self,
 843        channel: &channel::Model,
 844        user_id: UserId,
 845        tx: &DatabaseTransaction,
 846    ) -> Result<Option<channel_member::Model>> {
 847        let row = channel_member::Entity::find()
 848            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 849            .filter(channel_member::Column::UserId.eq(user_id))
 850            .filter(channel_member::Column::Accepted.eq(false))
 851            .one(&*tx)
 852            .await?;
 853
 854        Ok(row)
 855    }
 856
 857    /// Returns the role for a user in the given channel.
 858    pub async fn channel_role_for_user(
 859        &self,
 860        channel: &channel::Model,
 861        user_id: UserId,
 862        tx: &DatabaseTransaction,
 863    ) -> Result<Option<ChannelRole>> {
 864        let membership = channel_member::Entity::find()
 865            .filter(
 866                channel_member::Column::ChannelId
 867                    .eq(channel.root_id())
 868                    .and(channel_member::Column::UserId.eq(user_id))
 869                    .and(channel_member::Column::Accepted.eq(true)),
 870            )
 871            .one(&*tx)
 872            .await?;
 873
 874        let Some(membership) = membership else {
 875            return Ok(None);
 876        };
 877
 878        if !membership.role.can_see_channel(channel.visibility) {
 879            return Ok(None);
 880        }
 881
 882        Ok(Some(membership.role))
 883    }
 884
 885    // Get the descendants of the given set if channels, ordered by their
 886    // path.
 887    pub(crate) async fn get_channel_descendants_excluding_self(
 888        &self,
 889        channels: impl IntoIterator<Item = &channel::Model>,
 890        tx: &DatabaseTransaction,
 891    ) -> Result<Vec<channel::Model>> {
 892        let mut filter = Condition::any();
 893        for channel in channels.into_iter() {
 894            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 895        }
 896
 897        if filter.is_empty() {
 898            return Ok(vec![]);
 899        }
 900
 901        Ok(channel::Entity::find()
 902            .filter(filter)
 903            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 904            .all(tx)
 905            .await?)
 906    }
 907
 908    /// Returns the channel with the given ID.
 909    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 910        self.transaction(|tx| async move {
 911            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 912            self.check_user_is_channel_participant(&channel, user_id, &*tx)
 913                .await?;
 914
 915            Ok(Channel::from_model(channel))
 916        })
 917        .await
 918    }
 919
 920    pub(crate) async fn get_channel_internal(
 921        &self,
 922        channel_id: ChannelId,
 923        tx: &DatabaseTransaction,
 924    ) -> Result<channel::Model> {
 925        Ok(channel::Entity::find_by_id(channel_id)
 926            .one(&*tx)
 927            .await?
 928            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 929    }
 930
 931    pub(crate) async fn get_or_create_channel_room(
 932        &self,
 933        channel_id: ChannelId,
 934        live_kit_room: &str,
 935        tx: &DatabaseTransaction,
 936    ) -> Result<RoomId> {
 937        let room = room::Entity::find()
 938            .filter(room::Column::ChannelId.eq(channel_id))
 939            .one(&*tx)
 940            .await?;
 941
 942        let room_id = if let Some(room) = room {
 943            room.id
 944        } else {
 945            let result = room::Entity::insert(room::ActiveModel {
 946                channel_id: ActiveValue::Set(Some(channel_id)),
 947                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 948                ..Default::default()
 949            })
 950            .exec(&*tx)
 951            .await?;
 952
 953            result.last_insert_id
 954        };
 955
 956        Ok(room_id)
 957    }
 958
 959    /// Move a channel from one parent to another
 960    pub async fn move_channel(
 961        &self,
 962        channel_id: ChannelId,
 963        new_parent_id: ChannelId,
 964        admin_id: UserId,
 965    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 966        self.transaction(|tx| async move {
 967            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 968            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 969                .await?;
 970            let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
 971
 972            if new_parent.root_id() != channel.root_id() {
 973                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 974            }
 975
 976            if new_parent
 977                .ancestors_including_self()
 978                .any(|id| id == channel.id)
 979            {
 980                Err(anyhow!(ErrorCode::CircularNesting))?;
 981            }
 982
 983            if channel.visibility == ChannelVisibility::Public
 984                && new_parent.visibility != ChannelVisibility::Public
 985            {
 986                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 987            }
 988
 989            let root_id = channel.root_id();
 990            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 991            let new_path = format!("{}{}/", new_parent.path(), channel.id);
 992
 993            let mut model = channel.into_active_model();
 994            model.parent_path = ActiveValue::Set(new_parent.path());
 995            let channel = model.update(&*tx).await?;
 996
 997            let descendent_ids =
 998                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 999                    self.pool.get_database_backend(),
1000                    "
1001                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1002                    WHERE parent_path LIKE $3 || '%'
1003                    RETURNING id
1004                ",
1005                    [old_path.clone().into(), new_path.into(), old_path.into()],
1006                ))
1007                .all(&*tx)
1008                .await?;
1009
1010            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1011
1012            let channels = channel::Entity::find()
1013                .filter(channel::Column::Id.is_in(all_moved_ids))
1014                .all(&*tx)
1015                .await?
1016                .into_iter()
1017                .map(|c| Channel::from_model(c))
1018                .collect::<Vec<_>>();
1019
1020            let channel_members = channel_member::Entity::find()
1021                .filter(channel_member::Column::ChannelId.eq(root_id))
1022                .all(&*tx)
1023                .await?;
1024
1025            Ok((channels, channel_members))
1026        })
1027        .await
1028    }
1029}
1030
1031#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1032enum QueryIds {
1033    Id,
1034}
1035
1036#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1037enum QueryUserIds {
1038    UserId,
1039}