channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.0.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .0
  36            .id)
  37    }
  38
  39    /// Creates a new channel.
  40    pub async fn create_channel(
  41        &self,
  42        name: &str,
  43        parent_channel_id: Option<ChannelId>,
  44        admin_id: UserId,
  45    ) -> Result<(
  46        Channel,
  47        Option<channel_member::Model>,
  48        Vec<channel_member::Model>,
  49    )> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53            let mut membership = None;
  54
  55            if let Some(parent_channel_id) = parent_channel_id {
  56                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  57                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  58                    .await?;
  59                parent = Some(parent_channel);
  60            }
  61
  62            let channel = channel::ActiveModel {
  63                id: ActiveValue::NotSet,
  64                name: ActiveValue::Set(name.to_string()),
  65                visibility: ActiveValue::Set(ChannelVisibility::Members),
  66                parent_path: ActiveValue::Set(
  67                    parent
  68                        .as_ref()
  69                        .map_or(String::new(), |parent| parent.path()),
  70                ),
  71                requires_zed_cla: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if parent.is_none() {
  77                membership = Some(
  78                    channel_member::ActiveModel {
  79                        id: ActiveValue::NotSet,
  80                        channel_id: ActiveValue::Set(channel.id),
  81                        user_id: ActiveValue::Set(admin_id),
  82                        accepted: ActiveValue::Set(true),
  83                        role: ActiveValue::Set(ChannelRole::Admin),
  84                    }
  85                    .insert(&*tx)
  86                    .await?,
  87                );
  88            }
  89
  90            let channel_members = channel_member::Entity::find()
  91                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
  92                .all(&*tx)
  93                .await?;
  94
  95            Ok((Channel::from_model(channel), membership, channel_members))
  96        })
  97        .await
  98    }
  99
 100    /// Adds a user to the specified channel.
 101    pub async fn join_channel(
 102        &self,
 103        channel_id: ChannelId,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106        environment: &str,
 107    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 108        self.transaction(move |tx| async move {
 109            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 110            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 111
 112            let mut accept_invite_result = None;
 113
 114            if role.is_none() {
 115                if let Some(invitation) = self
 116                    .pending_invite_for_channel(&channel, user_id, &*tx)
 117                    .await?
 118                {
 119                    // note, this may be a parent channel
 120                    role = Some(invitation.role);
 121                    channel_member::Entity::update(channel_member::ActiveModel {
 122                        accepted: ActiveValue::Set(true),
 123                        ..invitation.into_active_model()
 124                    })
 125                    .exec(&*tx)
 126                    .await?;
 127
 128                    accept_invite_result = Some(
 129                        self.calculate_membership_updated(&channel, user_id, &*tx)
 130                            .await?,
 131                    );
 132
 133                    debug_assert!(
 134                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 135                    );
 136                } else if channel.visibility == ChannelVisibility::Public {
 137                    role = Some(ChannelRole::Guest);
 138                    channel_member::Entity::insert(channel_member::ActiveModel {
 139                        id: ActiveValue::NotSet,
 140                        channel_id: ActiveValue::Set(channel.root_id()),
 141                        user_id: ActiveValue::Set(user_id),
 142                        accepted: ActiveValue::Set(true),
 143                        role: ActiveValue::Set(ChannelRole::Guest),
 144                    })
 145                    .exec(&*tx)
 146                    .await?;
 147
 148                    accept_invite_result = Some(
 149                        self.calculate_membership_updated(&channel, user_id, &*tx)
 150                            .await?,
 151                    );
 152
 153                    debug_assert!(
 154                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 155                    );
 156                }
 157            }
 158
 159            if role.is_none() || role == Some(ChannelRole::Banned) {
 160                Err(ErrorCode::Forbidden.anyhow())?
 161            }
 162            let role = role.unwrap();
 163
 164            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 165            let room_id = self
 166                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 167                .await?;
 168
 169            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 170                .await
 171                .map(|jr| (jr, accept_invite_result, role))
 172        })
 173        .await
 174    }
 175
 176    /// Sets the visibility of the given channel.
 177    pub async fn set_channel_visibility(
 178        &self,
 179        channel_id: ChannelId,
 180        visibility: ChannelVisibility,
 181        admin_id: UserId,
 182    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 183        self.transaction(move |tx| async move {
 184            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 185            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 186                .await?;
 187
 188            if visibility == ChannelVisibility::Public {
 189                if let Some(parent_id) = channel.parent_id() {
 190                    let parent = self.get_channel_internal(parent_id, &*tx).await?;
 191
 192                    if parent.visibility != ChannelVisibility::Public {
 193                        Err(ErrorCode::BadPublicNesting
 194                            .with_tag("direction", "parent")
 195                            .anyhow())?;
 196                    }
 197                }
 198            } else if visibility == ChannelVisibility::Members {
 199                if self
 200                    .get_channel_descendants_excluding_self([&channel], &*tx)
 201                    .await?
 202                    .into_iter()
 203                    .any(|channel| channel.visibility == ChannelVisibility::Public)
 204                {
 205                    Err(ErrorCode::BadPublicNesting
 206                        .with_tag("direction", "children")
 207                        .anyhow())?;
 208                }
 209            }
 210
 211            let mut model = channel.into_active_model();
 212            model.visibility = ActiveValue::Set(visibility);
 213            let channel = model.update(&*tx).await?;
 214
 215            let channel_members = channel_member::Entity::find()
 216                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 217                .all(&*tx)
 218                .await?;
 219
 220            Ok((Channel::from_model(channel), channel_members))
 221        })
 222        .await
 223    }
 224
 225    #[cfg(test)]
 226    pub async fn set_channel_requires_zed_cla(
 227        &self,
 228        channel_id: ChannelId,
 229        requires_zed_cla: bool,
 230    ) -> Result<()> {
 231        self.transaction(move |tx| async move {
 232            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 233            let mut model = channel.into_active_model();
 234            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 235            model.update(&*tx).await?;
 236            Ok(())
 237        })
 238        .await
 239    }
 240
 241    /// Deletes the channel with the specified ID.
 242    pub async fn delete_channel(
 243        &self,
 244        channel_id: ChannelId,
 245        user_id: UserId,
 246    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 247        self.transaction(move |tx| async move {
 248            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 249            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 250                .await?;
 251
 252            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 253                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 254                .select_only()
 255                .column(channel_member::Column::UserId)
 256                .distinct()
 257                .into_values::<_, QueryUserIds>()
 258                .all(&*tx)
 259                .await?;
 260
 261            let channels_to_remove = self
 262                .get_channel_descendants_excluding_self([&channel], &*tx)
 263                .await?
 264                .into_iter()
 265                .map(|channel| channel.id)
 266                .chain(Some(channel_id))
 267                .collect::<Vec<_>>();
 268
 269            channel::Entity::delete_many()
 270                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 271                .exec(&*tx)
 272                .await?;
 273
 274            Ok((channels_to_remove, members_to_notify))
 275        })
 276        .await
 277    }
 278
 279    /// Invites a user to a channel as a member.
 280    pub async fn invite_channel_member(
 281        &self,
 282        channel_id: ChannelId,
 283        invitee_id: UserId,
 284        inviter_id: UserId,
 285        role: ChannelRole,
 286    ) -> Result<InviteMemberResult> {
 287        self.transaction(move |tx| async move {
 288            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 289            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 290                .await?;
 291            if !channel.is_root() {
 292                Err(ErrorCode::NotARootChannel.anyhow())?
 293            }
 294
 295            channel_member::ActiveModel {
 296                id: ActiveValue::NotSet,
 297                channel_id: ActiveValue::Set(channel_id),
 298                user_id: ActiveValue::Set(invitee_id),
 299                accepted: ActiveValue::Set(false),
 300                role: ActiveValue::Set(role),
 301            }
 302            .insert(&*tx)
 303            .await?;
 304
 305            let channel = Channel::from_model(channel);
 306
 307            let notifications = self
 308                .create_notification(
 309                    invitee_id,
 310                    rpc::Notification::ChannelInvitation {
 311                        channel_id: channel_id.to_proto(),
 312                        channel_name: channel.name.clone(),
 313                        inviter_id: inviter_id.to_proto(),
 314                    },
 315                    true,
 316                    &*tx,
 317                )
 318                .await?
 319                .into_iter()
 320                .collect();
 321
 322            Ok(InviteMemberResult {
 323                channel,
 324                notifications,
 325            })
 326        })
 327        .await
 328    }
 329
 330    fn sanitize_channel_name(name: &str) -> Result<&str> {
 331        let new_name = name.trim().trim_start_matches('#');
 332        if new_name == "" {
 333            Err(anyhow!("channel name can't be blank"))?;
 334        }
 335        Ok(new_name)
 336    }
 337
 338    /// Renames the specified channel.
 339    pub async fn rename_channel(
 340        &self,
 341        channel_id: ChannelId,
 342        admin_id: UserId,
 343        new_name: &str,
 344    ) -> Result<(Channel, Vec<channel_member::Model>)> {
 345        self.transaction(move |tx| async move {
 346            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 347
 348            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 349            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 350                .await?;
 351
 352            let mut model = channel.into_active_model();
 353            model.name = ActiveValue::Set(new_name.clone());
 354            let channel = model.update(&*tx).await?;
 355
 356            let channel_members = channel_member::Entity::find()
 357                .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 358                .all(&*tx)
 359                .await?;
 360
 361            Ok((Channel::from_model(channel), channel_members))
 362        })
 363        .await
 364    }
 365
 366    /// accept or decline an invite to join a channel
 367    pub async fn respond_to_channel_invite(
 368        &self,
 369        channel_id: ChannelId,
 370        user_id: UserId,
 371        accept: bool,
 372    ) -> Result<RespondToChannelInvite> {
 373        self.transaction(move |tx| async move {
 374            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 375
 376            let membership_update = if accept {
 377                let rows_affected = channel_member::Entity::update_many()
 378                    .set(channel_member::ActiveModel {
 379                        accepted: ActiveValue::Set(accept),
 380                        ..Default::default()
 381                    })
 382                    .filter(
 383                        channel_member::Column::ChannelId
 384                            .eq(channel_id)
 385                            .and(channel_member::Column::UserId.eq(user_id))
 386                            .and(channel_member::Column::Accepted.eq(false)),
 387                    )
 388                    .exec(&*tx)
 389                    .await?
 390                    .rows_affected;
 391
 392                if rows_affected == 0 {
 393                    Err(anyhow!("no such invitation"))?;
 394                }
 395
 396                Some(
 397                    self.calculate_membership_updated(&channel, user_id, &*tx)
 398                        .await?,
 399                )
 400            } else {
 401                let rows_affected = channel_member::Entity::delete_many()
 402                    .filter(
 403                        channel_member::Column::ChannelId
 404                            .eq(channel_id)
 405                            .and(channel_member::Column::UserId.eq(user_id))
 406                            .and(channel_member::Column::Accepted.eq(false)),
 407                    )
 408                    .exec(&*tx)
 409                    .await?
 410                    .rows_affected;
 411                if rows_affected == 0 {
 412                    Err(anyhow!("no such invitation"))?;
 413                }
 414
 415                None
 416            };
 417
 418            Ok(RespondToChannelInvite {
 419                membership_update,
 420                notifications: self
 421                    .mark_notification_as_read_with_response(
 422                        user_id,
 423                        &rpc::Notification::ChannelInvitation {
 424                            channel_id: channel_id.to_proto(),
 425                            channel_name: Default::default(),
 426                            inviter_id: Default::default(),
 427                        },
 428                        accept,
 429                        &*tx,
 430                    )
 431                    .await?
 432                    .into_iter()
 433                    .collect(),
 434            })
 435        })
 436        .await
 437    }
 438
 439    async fn calculate_membership_updated(
 440        &self,
 441        channel: &channel::Model,
 442        user_id: UserId,
 443        tx: &DatabaseTransaction,
 444    ) -> Result<MembershipUpdated> {
 445        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 446        let removed_channels = self
 447            .get_channel_descendants_excluding_self([channel], &*tx)
 448            .await?
 449            .into_iter()
 450            .map(|channel| channel.id)
 451            .chain([channel.id])
 452            .filter(|channel_id| !new_channels.channels.iter().any(|c| c.id == *channel_id))
 453            .collect::<Vec<_>>();
 454
 455        Ok(MembershipUpdated {
 456            channel_id: channel.id,
 457            new_channels,
 458            removed_channels,
 459        })
 460    }
 461
 462    /// Removes a channel member.
 463    pub async fn remove_channel_member(
 464        &self,
 465        channel_id: ChannelId,
 466        member_id: UserId,
 467        admin_id: UserId,
 468    ) -> Result<RemoveChannelMemberResult> {
 469        self.transaction(|tx| async move {
 470            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 471
 472            if member_id != admin_id {
 473                self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 474                    .await?;
 475            }
 476
 477            let result = channel_member::Entity::delete_many()
 478                .filter(
 479                    channel_member::Column::ChannelId
 480                        .eq(channel_id)
 481                        .and(channel_member::Column::UserId.eq(member_id)),
 482                )
 483                .exec(&*tx)
 484                .await?;
 485
 486            if result.rows_affected == 0 {
 487                Err(anyhow!("no such member"))?;
 488            }
 489
 490            Ok(RemoveChannelMemberResult {
 491                membership_update: self
 492                    .calculate_membership_updated(&channel, member_id, &*tx)
 493                    .await?,
 494                notification_id: self
 495                    .remove_notification(
 496                        member_id,
 497                        rpc::Notification::ChannelInvitation {
 498                            channel_id: channel_id.to_proto(),
 499                            channel_name: Default::default(),
 500                            inviter_id: Default::default(),
 501                        },
 502                        &*tx,
 503                    )
 504                    .await?,
 505            })
 506        })
 507        .await
 508    }
 509
 510    /// Returns all channel invites for the user with the given ID.
 511    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 512        self.transaction(|tx| async move {
 513            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 514
 515            let channel_invites = channel_member::Entity::find()
 516                .filter(
 517                    channel_member::Column::UserId
 518                        .eq(user_id)
 519                        .and(channel_member::Column::Accepted.eq(false)),
 520                )
 521                .all(&*tx)
 522                .await?;
 523
 524            for invite in channel_invites {
 525                role_for_channel.insert(invite.channel_id, invite.role);
 526            }
 527
 528            let channels = channel::Entity::find()
 529                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 530                .all(&*tx)
 531                .await?;
 532
 533            let channels = channels
 534                .into_iter()
 535                .filter_map(|channel| Some(Channel::from_model(channel)))
 536                .collect();
 537
 538            Ok(channels)
 539        })
 540        .await
 541    }
 542
 543    /// Returns all channels for the user with the given ID.
 544    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 545        self.transaction(|tx| async move {
 546            let tx = tx;
 547
 548            self.get_user_channels(user_id, None, &tx).await
 549        })
 550        .await
 551    }
 552
 553    /// Returns all channels for the user with the given ID that are descendants
 554    /// of the specified ancestor channel.
 555    pub async fn get_user_channels(
 556        &self,
 557        user_id: UserId,
 558        ancestor_channel: Option<&channel::Model>,
 559        tx: &DatabaseTransaction,
 560    ) -> Result<ChannelsForUser> {
 561        let mut filter = channel_member::Column::UserId
 562            .eq(user_id)
 563            .and(channel_member::Column::Accepted.eq(true));
 564
 565        if let Some(ancestor) = ancestor_channel {
 566            filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
 567        }
 568
 569        let channel_memberships = channel_member::Entity::find()
 570            .filter(filter)
 571            .all(&*tx)
 572            .await?;
 573
 574        let channels = channel::Entity::find()
 575            .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
 576            .all(&*tx)
 577            .await?;
 578
 579        let mut descendants = self
 580            .get_channel_descendants_excluding_self(channels.iter(), &*tx)
 581            .await?;
 582
 583        for channel in channels {
 584            if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
 585                descendants.insert(ix, channel);
 586            }
 587        }
 588
 589        let roles_by_channel_id = channel_memberships
 590            .iter()
 591            .map(|membership| (membership.channel_id, membership.role))
 592            .collect::<HashMap<_, _>>();
 593
 594        let channels: Vec<Channel> = descendants
 595            .into_iter()
 596            .filter_map(|channel| {
 597                let parent_role = roles_by_channel_id.get(&channel.root_id())?;
 598                if parent_role.can_see_channel(channel.visibility) {
 599                    Some(Channel::from_model(channel))
 600                } else {
 601                    None
 602                }
 603            })
 604            .collect();
 605
 606        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 607        enum QueryUserIdsAndChannelIds {
 608            ChannelId,
 609            UserId,
 610        }
 611
 612        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 613        {
 614            let mut rows = room_participant::Entity::find()
 615                .inner_join(room::Entity)
 616                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 617                .select_only()
 618                .column(room::Column::ChannelId)
 619                .column(room_participant::Column::UserId)
 620                .into_values::<_, QueryUserIdsAndChannelIds>()
 621                .stream(&*tx)
 622                .await?;
 623            while let Some(row) = rows.next().await {
 624                let row: (ChannelId, UserId) = row?;
 625                channel_participants.entry(row.0).or_default().push(row.1)
 626            }
 627        }
 628
 629        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 630        let latest_buffer_versions = self
 631            .latest_channel_buffer_changes(&channel_ids, &*tx)
 632            .await?;
 633
 634        let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
 635
 636        Ok(ChannelsForUser {
 637            channel_memberships,
 638            channels,
 639            channel_participants,
 640            latest_buffer_versions,
 641            latest_channel_messages: latest_messages,
 642        })
 643    }
 644
 645    /// Sets the role for the specified channel member.
 646    pub async fn set_channel_member_role(
 647        &self,
 648        channel_id: ChannelId,
 649        admin_id: UserId,
 650        for_user: UserId,
 651        role: ChannelRole,
 652    ) -> Result<SetMemberRoleResult> {
 653        self.transaction(|tx| async move {
 654            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 655            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 656                .await?;
 657
 658            let membership = channel_member::Entity::find()
 659                .filter(
 660                    channel_member::Column::ChannelId
 661                        .eq(channel_id)
 662                        .and(channel_member::Column::UserId.eq(for_user)),
 663                )
 664                .one(&*tx)
 665                .await?;
 666
 667            let Some(membership) = membership else {
 668                Err(anyhow!("no such member"))?
 669            };
 670
 671            let mut update = membership.into_active_model();
 672            update.role = ActiveValue::Set(role);
 673            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 674
 675            if updated.accepted {
 676                Ok(SetMemberRoleResult::MembershipUpdated(
 677                    self.calculate_membership_updated(&channel, for_user, &*tx)
 678                        .await?,
 679                ))
 680            } else {
 681                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 682                    channel,
 683                )))
 684            }
 685        })
 686        .await
 687    }
 688
 689    /// Returns the details for the specified channel member.
 690    pub async fn get_channel_participant_details(
 691        &self,
 692        channel_id: ChannelId,
 693        user_id: UserId,
 694    ) -> Result<Vec<proto::ChannelMember>> {
 695        let (role, members) = self
 696            .transaction(move |tx| async move {
 697                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 698                let role = self
 699                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 700                    .await?;
 701                Ok((
 702                    role,
 703                    self.get_channel_participant_details_internal(&channel, &*tx)
 704                        .await?,
 705                ))
 706            })
 707            .await?;
 708
 709        if role == ChannelRole::Admin {
 710            Ok(members
 711                .into_iter()
 712                .map(|channel_member| proto::ChannelMember {
 713                    role: channel_member.role.into(),
 714                    user_id: channel_member.user_id.to_proto(),
 715                    kind: if channel_member.accepted {
 716                        Kind::Member
 717                    } else {
 718                        Kind::Invitee
 719                    }
 720                    .into(),
 721                })
 722                .collect())
 723        } else {
 724            return Ok(members
 725                .into_iter()
 726                .filter_map(|member| {
 727                    if !member.accepted {
 728                        return None;
 729                    }
 730                    Some(proto::ChannelMember {
 731                        role: member.role.into(),
 732                        user_id: member.user_id.to_proto(),
 733                        kind: Kind::Member.into(),
 734                    })
 735                })
 736                .collect());
 737        }
 738    }
 739
 740    async fn get_channel_participant_details_internal(
 741        &self,
 742        channel: &channel::Model,
 743        tx: &DatabaseTransaction,
 744    ) -> Result<Vec<channel_member::Model>> {
 745        Ok(channel_member::Entity::find()
 746            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 747            .all(tx)
 748            .await?)
 749    }
 750
 751    /// Returns the participants in the given channel.
 752    pub async fn get_channel_participants(
 753        &self,
 754        channel: &channel::Model,
 755        tx: &DatabaseTransaction,
 756    ) -> Result<Vec<UserId>> {
 757        let participants = self
 758            .get_channel_participant_details_internal(channel, &*tx)
 759            .await?;
 760        Ok(participants
 761            .into_iter()
 762            .map(|member| member.user_id)
 763            .collect())
 764    }
 765
 766    /// Returns whether the given user is an admin in the specified channel.
 767    pub async fn check_user_is_channel_admin(
 768        &self,
 769        channel: &channel::Model,
 770        user_id: UserId,
 771        tx: &DatabaseTransaction,
 772    ) -> Result<ChannelRole> {
 773        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 774        match role {
 775            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 776            Some(ChannelRole::Member)
 777            | Some(ChannelRole::Banned)
 778            | Some(ChannelRole::Guest)
 779            | None => Err(anyhow!(
 780                "user is not a channel admin or channel does not exist"
 781            ))?,
 782        }
 783    }
 784
 785    /// Returns whether the given user is a member of the specified channel.
 786    pub async fn check_user_is_channel_member(
 787        &self,
 788        channel: &channel::Model,
 789        user_id: UserId,
 790        tx: &DatabaseTransaction,
 791    ) -> Result<ChannelRole> {
 792        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 793        match channel_role {
 794            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 795            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 796                "user is not a channel member or channel does not exist"
 797            ))?,
 798        }
 799    }
 800
 801    /// Returns whether the given user is a participant in the specified channel.
 802    pub async fn check_user_is_channel_participant(
 803        &self,
 804        channel: &channel::Model,
 805        user_id: UserId,
 806        tx: &DatabaseTransaction,
 807    ) -> Result<ChannelRole> {
 808        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 809        match role {
 810            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 811                Ok(role.unwrap())
 812            }
 813            Some(ChannelRole::Banned) | None => Err(anyhow!(
 814                "user is not a channel participant or channel does not exist"
 815            ))?,
 816        }
 817    }
 818
 819    /// Returns a user's pending invite for the given channel, if one exists.
 820    pub async fn pending_invite_for_channel(
 821        &self,
 822        channel: &channel::Model,
 823        user_id: UserId,
 824        tx: &DatabaseTransaction,
 825    ) -> Result<Option<channel_member::Model>> {
 826        let row = channel_member::Entity::find()
 827            .filter(channel_member::Column::ChannelId.eq(channel.root_id()))
 828            .filter(channel_member::Column::UserId.eq(user_id))
 829            .filter(channel_member::Column::Accepted.eq(false))
 830            .one(&*tx)
 831            .await?;
 832
 833        Ok(row)
 834    }
 835
 836    /// Returns the role for a user in the given channel.
 837    pub async fn channel_role_for_user(
 838        &self,
 839        channel: &channel::Model,
 840        user_id: UserId,
 841        tx: &DatabaseTransaction,
 842    ) -> Result<Option<ChannelRole>> {
 843        let membership = channel_member::Entity::find()
 844            .filter(
 845                channel_member::Column::ChannelId
 846                    .eq(channel.root_id())
 847                    .and(channel_member::Column::UserId.eq(user_id))
 848                    .and(channel_member::Column::Accepted.eq(true)),
 849            )
 850            .one(&*tx)
 851            .await?;
 852
 853        let Some(membership) = membership else {
 854            return Ok(None);
 855        };
 856
 857        if !membership.role.can_see_channel(channel.visibility) {
 858            return Ok(None);
 859        }
 860
 861        Ok(Some(membership.role))
 862    }
 863
 864    // Get the descendants of the given set if channels, ordered by their
 865    // path.
 866    pub(crate) async fn get_channel_descendants_excluding_self(
 867        &self,
 868        channels: impl IntoIterator<Item = &channel::Model>,
 869        tx: &DatabaseTransaction,
 870    ) -> Result<Vec<channel::Model>> {
 871        let mut filter = Condition::any();
 872        for channel in channels.into_iter() {
 873            filter = filter.add(channel::Column::ParentPath.like(channel.descendant_path_filter()));
 874        }
 875
 876        if filter.is_empty() {
 877            return Ok(vec![]);
 878        }
 879
 880        Ok(channel::Entity::find()
 881            .filter(filter)
 882            .order_by_asc(Expr::cust("parent_path || id || '/'"))
 883            .all(tx)
 884            .await?)
 885    }
 886
 887    /// Returns the channel with the given ID.
 888    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
 889        self.transaction(|tx| async move {
 890            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 891            self.check_user_is_channel_participant(&channel, user_id, &*tx)
 892                .await?;
 893
 894            Ok(Channel::from_model(channel))
 895        })
 896        .await
 897    }
 898
 899    pub(crate) async fn get_channel_internal(
 900        &self,
 901        channel_id: ChannelId,
 902        tx: &DatabaseTransaction,
 903    ) -> Result<channel::Model> {
 904        Ok(channel::Entity::find_by_id(channel_id)
 905            .one(&*tx)
 906            .await?
 907            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
 908    }
 909
 910    pub(crate) async fn get_or_create_channel_room(
 911        &self,
 912        channel_id: ChannelId,
 913        live_kit_room: &str,
 914        environment: &str,
 915        tx: &DatabaseTransaction,
 916    ) -> Result<RoomId> {
 917        let room = room::Entity::find()
 918            .filter(room::Column::ChannelId.eq(channel_id))
 919            .one(&*tx)
 920            .await?;
 921
 922        let room_id = if let Some(room) = room {
 923            if let Some(env) = room.environment {
 924                if &env != environment {
 925                    Err(ErrorCode::WrongReleaseChannel
 926                        .with_tag("required", &env)
 927                        .anyhow())?;
 928                }
 929            }
 930            room.id
 931        } else {
 932            let result = room::Entity::insert(room::ActiveModel {
 933                channel_id: ActiveValue::Set(Some(channel_id)),
 934                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 935                environment: ActiveValue::Set(Some(environment.to_string())),
 936                ..Default::default()
 937            })
 938            .exec(&*tx)
 939            .await?;
 940
 941            result.last_insert_id
 942        };
 943
 944        Ok(room_id)
 945    }
 946
 947    /// Move a channel from one parent to another
 948    pub async fn move_channel(
 949        &self,
 950        channel_id: ChannelId,
 951        new_parent_id: ChannelId,
 952        admin_id: UserId,
 953    ) -> Result<(Vec<Channel>, Vec<channel_member::Model>)> {
 954        self.transaction(|tx| async move {
 955            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 956            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 957                .await?;
 958            let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
 959
 960            if new_parent.root_id() != channel.root_id() {
 961                Err(anyhow!(ErrorCode::WrongMoveTarget))?;
 962            }
 963
 964            if new_parent
 965                .ancestors_including_self()
 966                .any(|id| id == channel.id)
 967            {
 968                Err(anyhow!(ErrorCode::CircularNesting))?;
 969            }
 970
 971            if channel.visibility == ChannelVisibility::Public
 972                && new_parent.visibility != ChannelVisibility::Public
 973            {
 974                Err(anyhow!(ErrorCode::BadPublicNesting))?;
 975            }
 976
 977            let root_id = channel.root_id();
 978            let old_path = format!("{}{}/", channel.parent_path, channel.id);
 979            let new_path = format!("{}{}/", new_parent.path(), channel.id);
 980
 981            let mut model = channel.into_active_model();
 982            model.parent_path = ActiveValue::Set(new_parent.path());
 983            let channel = model.update(&*tx).await?;
 984
 985            let descendent_ids =
 986                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
 987                    self.pool.get_database_backend(),
 988                    "
 989                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
 990                    WHERE parent_path LIKE $3 || '%'
 991                    RETURNING id
 992                ",
 993                    [old_path.clone().into(), new_path.into(), old_path.into()],
 994                ))
 995                .all(&*tx)
 996                .await?;
 997
 998            let all_moved_ids = Some(channel.id).into_iter().chain(descendent_ids);
 999
1000            let channels = channel::Entity::find()
1001                .filter(channel::Column::Id.is_in(all_moved_ids))
1002                .all(&*tx)
1003                .await?
1004                .into_iter()
1005                .map(|c| Channel::from_model(c))
1006                .collect::<Vec<_>>();
1007
1008            let channel_members = channel_member::Entity::find()
1009                .filter(channel_member::Column::ChannelId.eq(root_id))
1010                .all(&*tx)
1011                .await?;
1012
1013            Ok((channels, channel_members))
1014        })
1015        .await
1016    }
1017}
1018
1019#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1020enum QueryIds {
1021    Id,
1022}
1023
1024#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1025enum QueryUserIds {
1026    UserId,
1027}