channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    /// Adds a user to the specified channel.
 101    pub async fn join_channel(
 102        &self,
 103        channel_id: ChannelId,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106        environment: &str,
 107    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 108        self.transaction(move |tx| async move {
 109            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 110            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 111
 112            let mut accept_invite_result = None;
 113
 114            if role.is_none() {
 115                if let Some(invitation) = self
 116                    .pending_invite_for_channel(&channel, user_id, &*tx)
 117                    .await?
 118                {
 119                    // note, this may be a parent channel
 120                    role = Some(invitation.role);
 121                    channel_member::Entity::update(channel_member::ActiveModel {
 122                        accepted: ActiveValue::Set(true),
 123                        ..invitation.into_active_model()
 124                    })
 125                    .exec(&*tx)
 126                    .await?;
 127
 128                    accept_invite_result = Some(
 129                        self.calculate_membership_updated(&channel, user_id, &*tx)
 130                            .await?,
 131                    );
 132
 133                    debug_assert!(
 134                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 135                    );
 136                } else if channel.visibility == ChannelVisibility::Public {
 137                    role = Some(ChannelRole::Guest);
 138                    channel_member::Entity::insert(channel_member::ActiveModel {
 139                        id: ActiveValue::NotSet,
 140                        channel_id: ActiveValue::Set(channel.root_id()),
 141                        user_id: ActiveValue::Set(user_id),
 142                        accepted: ActiveValue::Set(true),
 143                        role: ActiveValue::Set(ChannelRole::Guest),
 144                    })
 145                    .exec(&*tx)
 146                    .await?;
 147
 148                    accept_invite_result = Some(
 149                        self.calculate_membership_updated(&channel, user_id, &*tx)
 150                            .await?,
 151                    );
 152
 153                    debug_assert!(
 154                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 155                    );
 156                }
 157            }
 158
 159            if role.is_none() || role == Some(ChannelRole::Banned) {
 160                Err(ErrorCode::Forbidden.anyhow())?
 161            }
 162            let role = role.unwrap();
 163
 164            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 165            let room_id = self
 166                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 167                .await?;
 168
 169            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 170                .await
 171                .map(|jr| (jr, accept_invite_result, role))
 172        })
 173        .await
 174    }
 175
 176    /// Sets the visibiltity of the given channel.
 177    pub async fn set_channel_visibility(
 178        &self,
 179        channel_id: ChannelId,
 180        visibility: ChannelVisibility,
 181        admin_id: UserId,
 182    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 183        self.transaction(move |tx| async move {
 184            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 185            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 186                .await?;
 187
 188            if visibility == ChannelVisibility::Public {
 189                if let Some(parent_id) = channel.parent_id() {
 190                    let parent = self.get_channel_internal(parent_id, &*tx).await?;
 191
 192                    if parent.visibility != ChannelVisibility::Public {
 193                        Err(ErrorCode::BadPublicNesting
 194                            .with_tag("direction", "parent")
 195                            .anyhow())?;
 196                    }
 197                }
 198            } else if visibility == ChannelVisibility::Members {
 199                if self
 200                    .get_channel_descendants_including_self(vec![channel_id], &*tx)
 201                    .await?
 202                    .into_iter()
 203                    .any(|channel| {
 204                        channel.id != channel_id && channel.visibility == ChannelVisibility::Public
 205                    })
 206                {
 207                    Err(ErrorCode::BadPublicNesting
 208                        .with_tag("direction", "children")
 209                        .anyhow())?;
 210                }
 211            }
 212
 213            let mut model = channel.into_active_model();
 214            model.visibility = ActiveValue::Set(visibility);
 215            let channel = model.update(&*tx).await?;
 216
 217            let channel_members = channel_member::Entity::find()
 218                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 219                .all(&*tx)
 220                .await?;
 221
 222            Ok((Channel::from_model(channel), channel_members))
 223        })
 224        .await
 225    }
 226
 227    #[cfg(test)]
 228    pub async fn set_channel_requires_zed_cla(
 229        &self,
 230        channel_id: ChannelId,
 231        requires_zed_cla: bool,
 232    ) -> Result<()> {
 233        self.transaction(move |tx| async move {
 234            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 235            let mut model = channel.into_active_model();
 236            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 237            model.update(&*tx).await?;
 238            Ok(())
 239        })
 240        .await
 241    }
 242
 243    /// Deletes the channel with the specified ID.
 244    pub async fn delete_channel(
 245        &self,
 246        channel_id: ChannelId,
 247        user_id: UserId,
 248    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 249        self.transaction(move |tx| async move {
 250            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 251            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 252                .await?;
 253
 254            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 255                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 256                .select_only()
 257                .column(channel_member::Column::UserId)
 258                .distinct()
 259                .into_values::<_, QueryUserIds>()
 260                .all(&*tx)
 261                .await?;
 262
 263            let channels_to_remove = self
 264                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 265                .await?
 266                .into_iter()
 267                .map(|channel| channel.id)
 268                .collect::<Vec<_>>();
 269
 270            channel::Entity::delete_many()
 271                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 272                .exec(&*tx)
 273                .await?;
 274
 275            Ok((channels_to_remove, members_to_notify))
 276        })
 277        .await
 278    }
 279
 280    /// Invites a user to a channel as a member.
 281    pub async fn invite_channel_member(
 282        &self,
 283        channel_id: ChannelId,
 284        invitee_id: UserId,
 285        inviter_id: UserId,
 286        role: ChannelRole,
 287    ) -> Result<InviteMemberResult> {
 288        self.transaction(move |tx| async move {
 289            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 290            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 291                .await?;
 292            if !channel.is_root() {
 293                Err(ErrorCode::NotARootChannel.anyhow())?
 294            }
 295
 296            channel_member::ActiveModel {
 297                id: ActiveValue::NotSet,
 298                channel_id: ActiveValue::Set(channel_id),
 299                user_id: ActiveValue::Set(invitee_id),
 300                accepted: ActiveValue::Set(false),
 301                role: ActiveValue::Set(role),
 302            }
 303            .insert(&*tx)
 304            .await?;
 305
 306            let channel = Channel::from_model(channel);
 307
 308            let notifications = self
 309                .create_notification(
 310                    invitee_id,
 311                    rpc::Notification::ChannelInvitation {
 312                        channel_id: channel_id.to_proto(),
 313                        channel_name: channel.name.clone(),
 314                        inviter_id: inviter_id.to_proto(),
 315                    },
 316                    true,
 317                    &*tx,
 318                )
 319                .await?
 320                .into_iter()
 321                .collect();
 322
 323            Ok(InviteMemberResult {
 324                channel,
 325                notifications,
 326            })
 327        })
 328        .await
 329    }
 330
 331    fn sanitize_channel_name(name: &str) -> Result<&str> {
 332        let new_name = name.trim().trim_start_matches('#');
 333        if new_name == "" {
 334            Err(anyhow!("channel name can't be blank"))?;
 335        }
 336        Ok(new_name)
 337    }
 338
 339    /// Renames the specified channel.
 340    pub async fn rename_channel(
 341        &self,
 342        channel_id: ChannelId,
 343        admin_id: UserId,
 344        new_name: &str,
 345    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 346        self.transaction(move |tx| async move {
 347            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 348
 349            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 350            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 351                .await?;
 352
 353            let mut model = channel.into_active_model();
 354            model.name = ActiveValue::Set(new_name.clone());
 355            let channel = model.update(&*tx).await?;
 356
 357            let channel_members = channel_member::Entity::find()
 358                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 359                .all(&*tx)
 360                .await?;
 361
 362            Ok((Channel::from_model(channel), channel_members))
 363        })
 364        .await
 365    }
 366
 367    /// accept or decline an invite to join a channel
 368    pub async fn respond_to_channel_invite(
 369        &self,
 370        channel_id: ChannelId,
 371        user_id: UserId,
 372        accept: bool,
 373    ) -> Result<RespondToChannelInvite> {
 374        self.transaction(move |tx| async move {
 375            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 376
 377            let membership_update = if accept {
 378                let rows_affected = channel_member::Entity::update_many()
 379                    .set(channel_member::ActiveModel {
 380                        accepted: ActiveValue::Set(accept),
 381                        ..Default::default()
 382                    })
 383                    .filter(
 384                        channel_member::Column::ChannelId
 385                            .eq(channel_id)
 386                            .and(channel_member::Column::UserId.eq(user_id))
 387                            .and(channel_member::Column::Accepted.eq(false)),
 388                    )
 389                    .exec(&*tx)
 390                    .await?
 391                    .rows_affected;
 392
 393                if rows_affected == 0 {
 394                    Err(anyhow!("no such invitation"))?;
 395                }
 396
 397                Some(
 398                    self.calculate_membership_updated(&channel, user_id, &*tx)
 399                        .await?,
 400                )
 401            } else {
 402                let rows_affected = channel_member::Entity::delete_many()
 403                    .filter(
 404                        channel_member::Column::ChannelId
 405                            .eq(channel_id)
 406                            .and(channel_member::Column::UserId.eq(user_id))
 407                            .and(channel_member::Column::Accepted.eq(false)),
 408                    )
 409                    .exec(&*tx)
 410                    .await?
 411                    .rows_affected;
 412                if rows_affected == 0 {
 413                    Err(anyhow!("no such invitation"))?;
 414                }
 415
 416                None
 417            };
 418
 419            Ok(RespondToChannelInvite {
 420                membership_update,
 421                notifications: self
 422                    .mark_notification_as_read_with_response(
 423                        user_id,
 424                        &rpc::Notification::ChannelInvitation {
 425                            channel_id: channel_id.to_proto(),
 426                            channel_name: Default::default(),
 427                            inviter_id: Default::default(),
 428                        },
 429                        accept,
 430                        &*tx,
 431                    )
 432                    .await?
 433                    .into_iter()
 434                    .collect(),
 435            })
 436        })
 437        .await
 438    }
 439
 440    async fn calculate_membership_updated(
 441        &self,
 442        channel: &channel::Model,
 443        user_id: UserId,
 444        tx: &DatabaseTransaction,
 445    ) -> Result<MembershipUpdated> {
 446        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 447        let removed_channels = self
 448            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 449            .await?
 450            .into_iter()
 451            .filter_map(|channel| {
 452                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 453                    Some(channel.id)
 454                } else {
 455                    None
 456                }
 457            })
 458            .collect::<Vec<_>>();
 459
 460        Ok(MembershipUpdated {
 461            channel_id: channel.id,
 462            new_channels,
 463            removed_channels,
 464        })
 465    }
 466
 467    /// Removes a channel member.
 468    pub async fn remove_channel_member(
 469        &self,
 470        channel_id: ChannelId,
 471        member_id: UserId,
 472        admin_id: UserId,
 473    ) -> Result<RemoveChannelMemberResult> {
 474        self.transaction(|tx| async move {
 475            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 476            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 477                .await?;
 478
 479            let result = channel_member::Entity::delete_many()
 480                .filter(
 481                    channel_member::Column::ChannelId
 482                        .eq(channel_id)
 483                        .and(channel_member::Column::UserId.eq(member_id)),
 484                )
 485                .exec(&*tx)
 486                .await?;
 487
 488            if result.rows_affected == 0 {
 489                Err(anyhow!("no such member"))?;
 490            }
 491
 492            Ok(RemoveChannelMemberResult {
 493                membership_update: self
 494                    .calculate_membership_updated(&channel, member_id, &*tx)
 495                    .await?,
 496                notification_id: self
 497                    .remove_notification(
 498                        member_id,
 499                        rpc::Notification::ChannelInvitation {
 500                            channel_id: channel_id.to_proto(),
 501                            channel_name: Default::default(),
 502                            inviter_id: Default::default(),
 503                        },
 504                        &*tx,
 505                    )
 506                    .await?,
 507            })
 508        })
 509        .await
 510    }
 511
 512    /// Returns all channel invites for the user with the given ID.
 513    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 514        self.transaction(|tx| async move {
 515            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 516
 517            let channel_invites = channel_member::Entity::find()
 518                .filter(
 519                    channel_member::Column::UserId
 520                        .eq(user_id)
 521                        .and(channel_member::Column::Accepted.eq(false)),
 522                )
 523                .all(&*tx)
 524                .await?;
 525
 526            for invite in channel_invites {
 527                role_for_channel.insert(invite.channel_id, invite.role);
 528            }
 529
 530            let channels = channel::Entity::find()
 531                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 532                .all(&*tx)
 533                .await?;
 534
 535            let channels = channels
 536                .into_iter()
 537                .filter_map(|channel| Some(Channel::from_model(channel)))
 538                .collect();
 539
 540            Ok(channels)
 541        })
 542        .await
 543    }
 544
 545    pub async fn get_channel_memberships(
 546        &self,
 547        user_id: UserId,
 548    ) -> Result<(Vec<channel_member::Model>, Vec<channel::Model>)> {
 549        self.transaction(|tx| async move {
 550            let memberships = channel_member::Entity::find()
 551                .filter(channel_member::Column::UserId.eq(user_id))
 552                .all(&*tx)
 553                .await?;
 554            let channels = self
 555                .get_channel_descendants_including_self(
 556                    memberships.iter().map(|m| m.channel_id),
 557                    &*tx,
 558                )
 559                .await?;
 560            Ok((memberships, channels))
 561        })
 562        .await
 563    }
 564
 565    /// Returns all channels for the user with the given ID.
 566    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 567        self.transaction(|tx| async move {
 568            let tx = tx;
 569
 570            self.get_user_channels(user_id, None, &tx).await
 571        })
 572        .await
 573    }
 574
 575    /// Returns all channels for the user with the given ID that are descendants
 576    /// of the specified ancestor channel.
 577    pub async fn get_user_channels(
 578        &self,
 579        user_id: UserId,
 580        ancestor_channel: Option<&channel::Model>,
 581        tx: &DatabaseTransaction,
 582    ) -> Result<ChannelsForUser> {
 583        let mut filter = channel_member::Column::UserId
 584            .eq(user_id)
 585            .and(channel_member::Column::Accepted.eq(true));
 586
 587        if let Some(ancestor) = ancestor_channel {
 588            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 589        }
 590
 591        let channel_memberships = channel_member::Entity::find()
 592            .filter(filter)
 593            .all(&*tx)
 594            .await?;
 595
 596        let descendants = self
 597            .get_channel_descendants_including_self(
 598                channel_memberships.iter().map(|m| m.channel_id),
 599                &*tx,
 600            )
 601            .await?;
 602
 603        let roles_by_channel_id = channel_memberships
 604            .iter()
 605            .map(|membership| (membership.channel_id, membership.role))
 606            .collect::<HashMap<_, _>>();
 607
 608        let channels: Vec<Channel> = descendants
 609            .into_iter()
 610            .filter_map(|channel| {
 611                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 612                if parent_role.can_see_channel(channel.visibility) {
 613                    Some(Channel::from_model(channel))
 614                } else {
 615                    None
 616                }
 617            })
 618            .collect();
 619
 620        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 621        enum QueryUserIdsAndChannelIds {
 622            ChannelId,
 623            UserId,
 624        }
 625
 626        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 627        {
 628            let mut rows = room_participant::Entity::find()
 629                .inner_join(room::Entity)
 630                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 631                .select_only()
 632                .column(room::Column::ChannelId)
 633                .column(room_participant::Column::UserId)
 634                .into_values::<_, QueryUserIdsAndChannelIds>()
 635                .stream(&*tx)
 636                .await?;
 637            while let Some(row) = rows.next().await {
 638                let row: (ChannelId, UserId) = row?;
 639                channel_participants.entry(row.0).or_default().push(row.1)
 640            }
 641        }
 642
 643        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 644        let latest_buffer_versions = self
 645            .latest_channel_buffer_changes(&channel_ids, &*tx)
 646            .await?;
 647
 648        let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
 649
 650        Ok(ChannelsForUser {
 651            channel_memberships,
 652            channels,
 653            channel_participants,
 654            latest_buffer_versions,
 655            latest_channel_messages: latest_messages,
 656        })
 657    }
 658
 659    /// Sets the role for the specified channel member.
 660    pub async fn set_channel_member_role(
 661        &self,
 662        channel_id: ChannelId,
 663        admin_id: UserId,
 664        for_user: UserId,
 665        role: ChannelRole,
 666    ) -> Result<SetMemberRoleResult> {
 667        self.transaction(|tx| async move {
 668            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 669            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 670                .await?;
 671
 672            let membership = channel_member::Entity::find()
 673                .filter(
 674                    channel_member::Column::ChannelId
 675                        .eq(channel_id)
 676                        .and(channel_member::Column::UserId.eq(for_user)),
 677                )
 678                .one(&*tx)
 679                .await?;
 680
 681            let Some(membership) = membership else {
 682                Err(anyhow!("no such member"))?
 683            };
 684
 685            let mut update = membership.into_active_model();
 686            update.role = ActiveValue::Set(role);
 687            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 688
 689            if updated.accepted {
 690                Ok(SetMemberRoleResult::MembershipUpdated(
 691                    self.calculate_membership_updated(&channel, for_user, &*tx)
 692                        .await?,
 693                ))
 694            } else {
 695                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 696                    channel,
 697                )))
 698            }
 699        })
 700        .await
 701    }
 702
 703    /// Returns the details for the specified channel member.
 704    pub async fn get_channel_participant_details(
 705        &self,
 706        channel_id: ChannelId,
 707        user_id: UserId,
 708    ) -> Result<Vec<proto::ChannelMember>> {
 709        let (role, members) = self
 710            .transaction(move |tx| async move {
 711                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 712                let role = self
 713                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 714                    .await?;
 715                Ok((
 716                    role,
 717                    self.get_channel_participant_details_internal(&channel, &*tx)
 718                        .await?,
 719                ))
 720            })
 721            .await?;
 722
 723        if role == ChannelRole::Admin {
 724            Ok(members
 725                .into_iter()
 726                .map(|channel_member| proto::ChannelMember {
 727                    role: channel_member.role.into(),
 728                    user_id: channel_member.user_id.to_proto(),
 729                    kind: if channel_member.accepted {
 730                        Kind::Member
 731                    } else {
 732                        Kind::Invitee
 733                    }
 734                    .into(),
 735                })
 736                .collect())
 737        } else {
 738            return Ok(members
 739                .into_iter()
 740                .filter_map(|member| {
 741                    if !member.accepted {
 742                        return None;
 743                    }
 744                    Some(proto::ChannelMember {
 745                        role: member.role.into(),
 746                        user_id: member.user_id.to_proto(),
 747                        kind: Kind::Member.into(),
 748                    })
 749                })
 750                .collect());
 751        }
 752    }
 753
 754    async fn get_channel_participant_details_internal(
 755        &self,
 756        channel: &channel::Model,
 757        tx: &DatabaseTransaction,
 758    ) -> Result<Vec<channel_member::Model>> {
 759        Ok(channel_member::Entity::find()
 760            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 761            .all(tx)
 762            .await?)
 763    }
 764
 765    /// Returns the participants in the given channel.
 766    pub async fn get_channel_participants(
 767        &self,
 768        channel: &channel::Model,
 769        tx: &DatabaseTransaction,
 770    ) -> Result<Vec<UserId>> {
 771        let participants = self
 772            .get_channel_participant_details_internal(channel, &*tx)
 773            .await?;
 774        Ok(participants
 775            .into_iter()
 776            .map(|member| member.user_id)
 777            .collect())
 778    }
 779
 780    /// Returns whether the given user is an admin in the specified channel.
 781    pub async fn check_user_is_channel_admin(
 782        &self,
 783        channel: &channel::Model,
 784        user_id: UserId,
 785        tx: &DatabaseTransaction,
 786    ) -> Result<ChannelRole> {
 787        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 788        match role {
 789            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 790            Some(ChannelRole::Member)
 791            | Some(ChannelRole::Banned)
 792            | Some(ChannelRole::Guest)
 793            | None => Err(anyhow!(
 794                "user is not a channel admin or channel does not exist"
 795            ))?,
 796        }
 797    }
 798
 799    /// Returns whether the given user is a member of the specified channel.
 800    pub async fn check_user_is_channel_member(
 801        &self,
 802        channel: &channel::Model,
 803        user_id: UserId,
 804        tx: &DatabaseTransaction,
 805    ) -> Result<ChannelRole> {
 806        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 807        match channel_role {
 808            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 809            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 810                "user is not a channel member or channel does not exist"
 811            ))?,
 812        }
 813    }
 814
 815    /// Returns whether the given user is a participant in the specified channel.
 816    pub async fn check_user_is_channel_participant(
 817        &self,
 818        channel: &channel::Model,
 819        user_id: UserId,
 820        tx: &DatabaseTransaction,
 821    ) -> Result<ChannelRole> {
 822        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 823        match role {
 824            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 825                Ok(role.unwrap())
 826            }
 827            Some(ChannelRole::Banned) | None => Err(anyhow!(
 828                "user is not a channel participant or channel does not exist"
 829            ))?,
 830        }
 831    }
 832
 833    /// Returns a user's pending invite for the given channel, if one exists.
 834    pub async fn pending_invite_for_channel(
 835        &self,
 836        channel: &channel::Model,
 837        user_id: UserId,
 838        tx: &DatabaseTransaction,
 839    ) -> Result<Option<channel_member::Model>> {
 840        let row = channel_member::Entity::find()
 841            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 842            .filter(channel_member::Column::UserId.eq(user_id))
 843            .filter(channel_member::Column::Accepted.eq(false))
 844            .one(&*tx)
 845            .await?;
 846
 847        Ok(row)
 848    }
 849
 850    /// Returns the role for a user in the given channel.
 851    pub async fn channel_role_for_user(
 852        &self,
 853        channel: &channel::Model,
 854        user_id: UserId,
 855        tx: &DatabaseTransaction,
 856    ) -> Result<Option<ChannelRole>> {
 857        let membership = channel_member::Entity::find()
 858            .filter(
 859                channel_member::Column::ChannelId
 860                    .eq(channel.root_id())
 861                    .and(channel_member::Column::UserId.eq(user_id))
 862                    .and(channel_member::Column::Accepted.eq(true)),
 863            )
 864            .one(&*tx)
 865            .await?;
 866
 867        let Some(membership) = membership else {
 868            return Ok(None);
 869        };
 870
 871        if !membership.role.can_see_channel(channel.visibility) {
 872            return Ok(None);
 873        }
 874
 875        Ok(Some(membership.role))
 876    }
 877
 878    // Get the descendants of the given set if channels, ordered by their
 879    // path.
 880    async fn get_channel_descendants_including_self(
 881        &self,
 882        channel_ids: impl IntoIterator<Item = ChannelId>,
 883        tx: &DatabaseTransaction,
 884    ) -> Result<Vec<channel::Model>> {
 885        let mut values = String::new();
 886        for id in channel_ids {
 887            if !values.is_empty() {
 888                values.push_str(", ");
 889            }
 890            write!(&mut values, "({})", id).unwrap();
 891        }
 892
 893        if values.is_empty() {
 894            return Ok(vec![]);
 895        }
 896
 897        let sql = format!(
 898            r#"
 899            SELECT DISTINCT
 900                descendant_channels.*,
 901                descendant_channels.parent_path || descendant_channels.id as full_path
 902            FROM
 903                channels parent_channels, channels descendant_channels
 904            WHERE
 905                descendant_channels.id IN ({values}) OR
 906                (
 907                    parent_channels.id IN ({values}) AND
 908                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
 909                )
 910            ORDER BY
 911                full_path ASC
 912            "#
 913        );
 914
 915        Ok(channel::Entity::find()
 916            .from_raw_sql(Statement::from_string(
 917                self.pool.get_database_backend(),
 918                sql,
 919            ))
 920            .all(tx)
 921            .await?)
 922    }
 923
 924    /// Returns the channel with the given ID.
 925    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 926        self.transaction(|tx| async move {
 927            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 928            self.check_user_is_channel_participant(&channel, user_id, &*tx)
 929                .await?;
 930
 931            Ok(Channel::from_model(channel))
 932        })
 933        .await
 934    }
 935
 936    pub(crate) async fn get_channel_internal(
 937        &self,
 938        channel_id: ChannelId,
 939        tx: &DatabaseTransaction,
 940    ) -> Result<channel::Model> {
 941        Ok(channel::Entity::find_by_id(channel_id)
 942            .one(&*tx)
 943            .await?
 944            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 945    }
 946
 947    pub(crate) async fn get_or_create_channel_room(
 948        &self,
 949        channel_id: ChannelId,
 950        live_kit_room: &str,
 951        environment: &str,
 952        tx: &DatabaseTransaction,
 953    ) -> Result<RoomId> {
 954        let room = room::Entity::find()
 955            .filter(room::Column::ChannelId.eq(channel_id))
 956            .one(&*tx)
 957            .await?;
 958
 959        let room_id = if let Some(room) = room {
 960            if let Some(env) = room.environment {
 961                if &env != environment {
 962                    Err(ErrorCode::WrongReleaseChannel
 963                        .with_tag("required", &env)
 964                        .anyhow())?;
 965                }
 966            }
 967            room.id
 968        } else {
 969            let result = room::Entity::insert(room::ActiveModel {
 970                channel_id: ActiveValue::Set(Some(channel_id)),
 971                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 972                environment: ActiveValue::Set(Some(environment.to_string())),
 973                ..Default::default()
 974            })
 975            .exec(&*tx)
 976            .await?;
 977
 978            result.last_insert_id
 979        };
 980
 981        Ok(room_id)
 982    }
 983
 984    /// Move a channel from one parent to another
 985    pub async fn move_channel(
 986        &self,
 987        channel_id: ChannelId,
 988        new_parent_id: ChannelId,
 989        admin_id: UserId,
 990    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 991        self.transaction(|tx| async move {
 992            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 993            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 994                .await?;
 995            let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
 996
 997            if new_parent.root_id() != channel.root_id() {
 998                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 999            }
1000
1001            if new_parent
1002                .ancestors_including_self()
1003                .any(|id| id == channel.id)
1004            {
1005                Err(anyhow!(ErrorCode::CircularNesting))?;
1006            }
1007
1008            if channel.visibility == ChannelVisibility::Public
1009                && new_parent.visibility != ChannelVisibility::Public
1010            {
1011                Err(anyhow!(ErrorCode::BadPublicNesting))?;
1012            }
1013
1014            let root_id = channel.root_id();
1015            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1016            let new_path = format!("{}{}/", new_parent.path(), channel.id);
1017
1018            let mut model = channel.into_active_model();
1019            model.parent_path = ActiveValue::Set(new_parent.path());
1020            let channel = model.update(&*tx).await?;
1021
1022            let descendent_ids =
1023                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1024                    self.pool.get_database_backend(),
1025                    "
1026                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1027                    WHERE parent_path LIKE $3 || '%'
1028                    RETURNING id
1029                ",
1030                    [old_path.clone().into(), new_path.into(), old_path.into()],
1031                ))
1032                .all(&*tx)
1033                .await?;
1034
1035            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
1036
1037            let channels = channel::Entity::find()
1038                .filter(channel::Column::Id.is_in(all_moved_ids))
1039                .all(&*tx)
1040                .await?
1041                .into_iter()
1042                .map(|c| Channel::from_model(c))
1043                .collect::<Vec<_>>();
1044
1045            let channel_members = channel_member::Entity::find()
1046                .filter(channel_member::Column::ChannelId.eq(root_id))
1047                .all(&*tx)
1048                .await?;
1049
1050            Ok((channels, channel_members))
1051        })
1052        .await
1053    }
1054}
1055
1056#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1057enum QueryIds {
1058    Id,
1059}
1060
1061#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1062enum QueryUserIds {
1063    UserId,
1064}