channels.rs

   1use super::*;
   2use rpc::proto::channel_member::Kind;
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self
  23            .create_channel(name, None, creator_id)
  24            .await?
  25            .channel
  26            .id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .channel
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<CreateChannelResult> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53
  54            if let Some(parent_channel_id) = parent_channel_id {
  55                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  56                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  57                    .await?;
  58                parent = Some(parent_channel);
  59            }
  60
  61            let channel = channel::ActiveModel {
  62                id: ActiveValue::NotSet,
  63                name: ActiveValue::Set(name.to_string()),
  64                visibility: ActiveValue::Set(ChannelVisibility::Members),
  65                parent_path: ActiveValue::Set(
  66                    parent
  67                        .as_ref()
  68                        .map_or(String::new(), |parent| parent.path()),
  69                ),
  70            }
  71            .insert(&*tx)
  72            .await?;
  73
  74            let participants_to_update;
  75            if let Some(parent) = &parent {
  76                participants_to_update = self
  77                    .participants_to_notify_for_channel_change(parent, &*tx)
  78                    .await?;
  79            } else {
  80                participants_to_update = vec![];
  81
  82                channel_member::ActiveModel {
  83                    id: ActiveValue::NotSet,
  84                    channel_id: ActiveValue::Set(channel.id),
  85                    user_id: ActiveValue::Set(admin_id),
  86                    accepted: ActiveValue::Set(true),
  87                    role: ActiveValue::Set(ChannelRole::Admin),
  88                }
  89                .insert(&*tx)
  90                .await?;
  91            };
  92
  93            Ok(CreateChannelResult {
  94                channel: Channel::from_model(channel, ChannelRole::Admin),
  95                participants_to_update,
  96            })
  97        })
  98        .await
  99    }
 100
 101    /// Adds a user to the specified channel.
 102    pub async fn join_channel(
 103        &self,
 104        channel_id: ChannelId,
 105        user_id: UserId,
 106        connection: ConnectionId,
 107        environment: &str,
 108    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 109        self.transaction(move |tx| async move {
 110            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 111            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 112
 113            let mut accept_invite_result = None;
 114
 115            if role.is_none() {
 116                if let Some(invitation) = self
 117                    .pending_invite_for_channel(&channel, user_id, &*tx)
 118                    .await?
 119                {
 120                    // note, this may be a parent channel
 121                    role = Some(invitation.role);
 122                    channel_member::Entity::update(channel_member::ActiveModel {
 123                        accepted: ActiveValue::Set(true),
 124                        ..invitation.into_active_model()
 125                    })
 126                    .exec(&*tx)
 127                    .await?;
 128
 129                    accept_invite_result = Some(
 130                        self.calculate_membership_updated(&channel, user_id, &*tx)
 131                            .await?,
 132                    );
 133
 134                    debug_assert!(
 135                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 136                    );
 137                } else if channel.visibility == ChannelVisibility::Public {
 138                    role = Some(ChannelRole::Guest);
 139                    let channel_to_join = self
 140                        .public_ancestors_including_self(&channel, &*tx)
 141                        .await?
 142                        .first()
 143                        .cloned()
 144                        .unwrap_or(channel.clone());
 145
 146                    channel_member::Entity::insert(channel_member::ActiveModel {
 147                        id: ActiveValue::NotSet,
 148                        channel_id: ActiveValue::Set(channel_to_join.id),
 149                        user_id: ActiveValue::Set(user_id),
 150                        accepted: ActiveValue::Set(true),
 151                        role: ActiveValue::Set(ChannelRole::Guest),
 152                    })
 153                    .exec(&*tx)
 154                    .await?;
 155
 156                    accept_invite_result = Some(
 157                        self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
 158                            .await?,
 159                    );
 160
 161                    debug_assert!(
 162                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 163                    );
 164                }
 165            }
 166
 167            if role.is_none() || role == Some(ChannelRole::Banned) {
 168                Err(anyhow!("not allowed"))?
 169            }
 170            let role = role.unwrap();
 171
 172            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 173            let room_id = self
 174                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 175                .await?;
 176
 177            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 178                .await
 179                .map(|jr| (jr, accept_invite_result, role))
 180        })
 181        .await
 182    }
 183
 184    /// Sets the visibiltity of the given channel.
 185    pub async fn set_channel_visibility(
 186        &self,
 187        channel_id: ChannelId,
 188        visibility: ChannelVisibility,
 189        admin_id: UserId,
 190    ) -> Result<SetChannelVisibilityResult> {
 191        self.transaction(move |tx| async move {
 192            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 193
 194            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 195                .await?;
 196
 197            let previous_members = self
 198                .get_channel_participant_details_internal(&channel, &*tx)
 199                .await?;
 200
 201            let mut model = channel.into_active_model();
 202            model.visibility = ActiveValue::Set(visibility);
 203            let channel = model.update(&*tx).await?;
 204
 205            let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
 206                .participants_to_notify_for_channel_change(&channel, &*tx)
 207                .await?
 208                .into_iter()
 209                .collect();
 210
 211            let mut channels_to_remove: Vec<ChannelId> = vec![];
 212            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
 213            match visibility {
 214                ChannelVisibility::Members => {
 215                    let all_descendents: Vec<ChannelId> = self
 216                        .get_channel_descendants_including_self(vec![channel_id], &*tx)
 217                        .await?
 218                        .into_iter()
 219                        .map(|channel| channel.id)
 220                        .collect();
 221
 222                    channels_to_remove = channel::Entity::find()
 223                        .filter(
 224                            channel::Column::Id
 225                                .is_in(all_descendents)
 226                                .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
 227                        )
 228                        .all(&*tx)
 229                        .await?
 230                        .into_iter()
 231                        .map(|channel| channel.id)
 232                        .collect();
 233
 234                    channels_to_remove.push(channel_id);
 235
 236                    for member in previous_members {
 237                        if member.role.can_only_see_public_descendants() {
 238                            participants_to_remove.insert(member.user_id);
 239                        }
 240                    }
 241                }
 242                ChannelVisibility::Public => {
 243                    if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
 244                        let parent_updates = self
 245                            .participants_to_notify_for_channel_change(&public_parent, &*tx)
 246                            .await?;
 247
 248                        for (user_id, channels) in parent_updates {
 249                            participants_to_update.insert(user_id, channels);
 250                        }
 251                    }
 252                }
 253            }
 254
 255            Ok(SetChannelVisibilityResult {
 256                participants_to_update,
 257                participants_to_remove,
 258                channels_to_remove,
 259            })
 260        })
 261        .await
 262    }
 263
 264    /// Deletes the channel with the specified ID.
 265    pub async fn delete_channel(
 266        &self,
 267        channel_id: ChannelId,
 268        user_id: UserId,
 269    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 270        self.transaction(move |tx| async move {
 271            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 272            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 273                .await?;
 274
 275            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 276                .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 277                .select_only()
 278                .column(channel_member::Column::UserId)
 279                .distinct()
 280                .into_values::<_, QueryUserIds>()
 281                .all(&*tx)
 282                .await?;
 283
 284            let channels_to_remove = self
 285                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 286                .await?
 287                .into_iter()
 288                .map(|channel| channel.id)
 289                .collect::<Vec<_>>();
 290
 291            channel::Entity::delete_many()
 292                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 293                .exec(&*tx)
 294                .await?;
 295
 296            Ok((channels_to_remove, members_to_notify))
 297        })
 298        .await
 299    }
 300
 301    /// Invites a user to a channel as a member.
 302    pub async fn invite_channel_member(
 303        &self,
 304        channel_id: ChannelId,
 305        invitee_id: UserId,
 306        inviter_id: UserId,
 307        role: ChannelRole,
 308    ) -> Result<InviteMemberResult> {
 309        self.transaction(move |tx| async move {
 310            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 311            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 312                .await?;
 313
 314            channel_member::ActiveModel {
 315                id: ActiveValue::NotSet,
 316                channel_id: ActiveValue::Set(channel_id),
 317                user_id: ActiveValue::Set(invitee_id),
 318                accepted: ActiveValue::Set(false),
 319                role: ActiveValue::Set(role),
 320            }
 321            .insert(&*tx)
 322            .await?;
 323
 324            let channel = Channel::from_model(channel, role);
 325
 326            let notifications = self
 327                .create_notification(
 328                    invitee_id,
 329                    rpc::Notification::ChannelInvitation {
 330                        channel_id: channel_id.to_proto(),
 331                        channel_name: channel.name.clone(),
 332                        inviter_id: inviter_id.to_proto(),
 333                    },
 334                    true,
 335                    &*tx,
 336                )
 337                .await?
 338                .into_iter()
 339                .collect();
 340
 341            Ok(InviteMemberResult {
 342                channel,
 343                notifications,
 344            })
 345        })
 346        .await
 347    }
 348
 349    fn sanitize_channel_name(name: &str) -> Result<&str> {
 350        let new_name = name.trim().trim_start_matches('#');
 351        if new_name == "" {
 352            Err(anyhow!("channel name can't be blank"))?;
 353        }
 354        Ok(new_name)
 355    }
 356
 357    /// Renames the specified channel.
 358    pub async fn rename_channel(
 359        &self,
 360        channel_id: ChannelId,
 361        admin_id: UserId,
 362        new_name: &str,
 363    ) -> Result<RenameChannelResult> {
 364        self.transaction(move |tx| async move {
 365            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 366
 367            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 368            let role = self
 369                .check_user_is_channel_admin(&channel, admin_id, &*tx)
 370                .await?;
 371
 372            let mut model = channel.into_active_model();
 373            model.name = ActiveValue::Set(new_name.clone());
 374            let channel = model.update(&*tx).await?;
 375
 376            let participants = self
 377                .get_channel_participant_details_internal(&channel, &*tx)
 378                .await?;
 379
 380            Ok(RenameChannelResult {
 381                channel: Channel::from_model(channel.clone(), role),
 382                participants_to_update: participants
 383                    .iter()
 384                    .map(|participant| {
 385                        (
 386                            participant.user_id,
 387                            Channel::from_model(channel.clone(), participant.role),
 388                        )
 389                    })
 390                    .collect(),
 391            })
 392        })
 393        .await
 394    }
 395
 396    /// accept or decline an invite to join a channel
 397    pub async fn respond_to_channel_invite(
 398        &self,
 399        channel_id: ChannelId,
 400        user_id: UserId,
 401        accept: bool,
 402    ) -> Result<RespondToChannelInvite> {
 403        self.transaction(move |tx| async move {
 404            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 405
 406            let membership_update = if accept {
 407                let rows_affected = channel_member::Entity::update_many()
 408                    .set(channel_member::ActiveModel {
 409                        accepted: ActiveValue::Set(accept),
 410                        ..Default::default()
 411                    })
 412                    .filter(
 413                        channel_member::Column::ChannelId
 414                            .eq(channel_id)
 415                            .and(channel_member::Column::UserId.eq(user_id))
 416                            .and(channel_member::Column::Accepted.eq(false)),
 417                    )
 418                    .exec(&*tx)
 419                    .await?
 420                    .rows_affected;
 421
 422                if rows_affected == 0 {
 423                    Err(anyhow!("no such invitation"))?;
 424                }
 425
 426                Some(
 427                    self.calculate_membership_updated(&channel, user_id, &*tx)
 428                        .await?,
 429                )
 430            } else {
 431                let rows_affected = channel_member::Entity::delete_many()
 432                    .filter(
 433                        channel_member::Column::ChannelId
 434                            .eq(channel_id)
 435                            .and(channel_member::Column::UserId.eq(user_id))
 436                            .and(channel_member::Column::Accepted.eq(false)),
 437                    )
 438                    .exec(&*tx)
 439                    .await?
 440                    .rows_affected;
 441                if rows_affected == 0 {
 442                    Err(anyhow!("no such invitation"))?;
 443                }
 444
 445                None
 446            };
 447
 448            Ok(RespondToChannelInvite {
 449                membership_update,
 450                notifications: self
 451                    .mark_notification_as_read_with_response(
 452                        user_id,
 453                        &rpc::Notification::ChannelInvitation {
 454                            channel_id: channel_id.to_proto(),
 455                            channel_name: Default::default(),
 456                            inviter_id: Default::default(),
 457                        },
 458                        accept,
 459                        &*tx,
 460                    )
 461                    .await?
 462                    .into_iter()
 463                    .collect(),
 464            })
 465        })
 466        .await
 467    }
 468
 469    async fn calculate_membership_updated(
 470        &self,
 471        channel: &channel::Model,
 472        user_id: UserId,
 473        tx: &DatabaseTransaction,
 474    ) -> Result<MembershipUpdated> {
 475        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 476        let removed_channels = self
 477            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 478            .await?
 479            .into_iter()
 480            .filter_map(|channel| {
 481                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 482                    Some(channel.id)
 483                } else {
 484                    None
 485                }
 486            })
 487            .collect::<Vec<_>>();
 488
 489        Ok(MembershipUpdated {
 490            channel_id: channel.id,
 491            new_channels,
 492            removed_channels,
 493        })
 494    }
 495
 496    /// Removes a channel member.
 497    pub async fn remove_channel_member(
 498        &self,
 499        channel_id: ChannelId,
 500        member_id: UserId,
 501        admin_id: UserId,
 502    ) -> Result<RemoveChannelMemberResult> {
 503        self.transaction(|tx| async move {
 504            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 505            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 506                .await?;
 507
 508            let result = channel_member::Entity::delete_many()
 509                .filter(
 510                    channel_member::Column::ChannelId
 511                        .eq(channel_id)
 512                        .and(channel_member::Column::UserId.eq(member_id)),
 513                )
 514                .exec(&*tx)
 515                .await?;
 516
 517            if result.rows_affected == 0 {
 518                Err(anyhow!("no such member"))?;
 519            }
 520
 521            Ok(RemoveChannelMemberResult {
 522                membership_update: self
 523                    .calculate_membership_updated(&channel, member_id, &*tx)
 524                    .await?,
 525                notification_id: self
 526                    .remove_notification(
 527                        member_id,
 528                        rpc::Notification::ChannelInvitation {
 529                            channel_id: channel_id.to_proto(),
 530                            channel_name: Default::default(),
 531                            inviter_id: Default::default(),
 532                        },
 533                        &*tx,
 534                    )
 535                    .await?,
 536            })
 537        })
 538        .await
 539    }
 540
 541    /// Returns all channel invites for the user with the given ID.
 542    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 543        self.transaction(|tx| async move {
 544            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 545
 546            let channel_invites = channel_member::Entity::find()
 547                .filter(
 548                    channel_member::Column::UserId
 549                        .eq(user_id)
 550                        .and(channel_member::Column::Accepted.eq(false)),
 551                )
 552                .all(&*tx)
 553                .await?;
 554
 555            for invite in channel_invites {
 556                role_for_channel.insert(invite.channel_id, invite.role);
 557            }
 558
 559            let channels = channel::Entity::find()
 560                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 561                .all(&*tx)
 562                .await?;
 563
 564            let channels = channels
 565                .into_iter()
 566                .filter_map(|channel| {
 567                    let role = *role_for_channel.get(&channel.id)?;
 568                    Some(Channel::from_model(channel, role))
 569                })
 570                .collect();
 571
 572            Ok(channels)
 573        })
 574        .await
 575    }
 576
 577    /// Returns all channels for the user with the given ID.
 578    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 579        self.transaction(|tx| async move {
 580            let tx = tx;
 581
 582            self.get_user_channels(user_id, None, &tx).await
 583        })
 584        .await
 585    }
 586
 587    /// Returns all channels for the user with the given ID that are descendants
 588    /// of the specified ancestor channel.
 589    pub async fn get_user_channels(
 590        &self,
 591        user_id: UserId,
 592        ancestor_channel: Option<&channel::Model>,
 593        tx: &DatabaseTransaction,
 594    ) -> Result<ChannelsForUser> {
 595        let channel_memberships = channel_member::Entity::find()
 596            .filter(
 597                channel_member::Column::UserId
 598                    .eq(user_id)
 599                    .and(channel_member::Column::Accepted.eq(true)),
 600            )
 601            .all(&*tx)
 602            .await?;
 603
 604        let descendants = self
 605            .get_channel_descendants_including_self(
 606                channel_memberships.iter().map(|m| m.channel_id),
 607                &*tx,
 608            )
 609            .await?;
 610
 611        let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
 612        for membership in channel_memberships.iter() {
 613            roles_by_channel_id.insert(membership.channel_id, membership.role);
 614        }
 615
 616        let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
 617
 618        let channels: Vec<Channel> = descendants
 619            .into_iter()
 620            .filter_map(|channel| {
 621                let parent_role = channel
 622                    .parent_id()
 623                    .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
 624
 625                let role = if let Some(parent_role) = parent_role {
 626                    let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
 627                        existing_role.max(*parent_role)
 628                    } else {
 629                        *parent_role
 630                    };
 631                    roles_by_channel_id.insert(channel.id, role);
 632                    role
 633                } else {
 634                    *roles_by_channel_id.get(&channel.id)?
 635                };
 636
 637                let can_see_parent_paths = role.can_see_all_descendants()
 638                    || role.can_only_see_public_descendants()
 639                        && channel.visibility == ChannelVisibility::Public;
 640                if !can_see_parent_paths {
 641                    return None;
 642                }
 643
 644                visible_channel_ids.insert(channel.id);
 645
 646                if let Some(ancestor) = ancestor_channel {
 647                    if !channel
 648                        .ancestors_including_self()
 649                        .any(|id| id == ancestor.id)
 650                    {
 651                        return None;
 652                    }
 653                }
 654
 655                let mut channel = Channel::from_model(channel, role);
 656                channel
 657                    .parent_path
 658                    .retain(|id| visible_channel_ids.contains(&id));
 659
 660                Some(channel)
 661            })
 662            .collect();
 663
 664        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 665        enum QueryUserIdsAndChannelIds {
 666            ChannelId,
 667            UserId,
 668        }
 669
 670        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 671        {
 672            let mut rows = room_participant::Entity::find()
 673                .inner_join(room::Entity)
 674                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 675                .select_only()
 676                .column(room::Column::ChannelId)
 677                .column(room_participant::Column::UserId)
 678                .into_values::<_, QueryUserIdsAndChannelIds>()
 679                .stream(&*tx)
 680                .await?;
 681            while let Some(row) = rows.next().await {
 682                let row: (ChannelId, UserId) = row?;
 683                channel_participants.entry(row.0).or_default().push(row.1)
 684            }
 685        }
 686
 687        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 688        let channel_buffer_changes = self
 689            .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
 690            .await?;
 691
 692        let unseen_messages = self
 693            .unseen_channel_messages(user_id, &channel_ids, &*tx)
 694            .await?;
 695
 696        Ok(ChannelsForUser {
 697            channels,
 698            channel_participants,
 699            unseen_buffer_changes: channel_buffer_changes,
 700            channel_messages: unseen_messages,
 701        })
 702    }
 703
 704    async fn participants_to_notify_for_channel_change(
 705        &self,
 706        new_parent: &channel::Model,
 707        tx: &DatabaseTransaction,
 708    ) -> Result<Vec<(UserId, ChannelsForUser)>> {
 709        let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
 710
 711        let members = self
 712            .get_channel_participant_details_internal(new_parent, &*tx)
 713            .await?;
 714
 715        for member in members.iter() {
 716            if !member.role.can_see_all_descendants() {
 717                continue;
 718            }
 719            results.push((
 720                member.user_id,
 721                self.get_user_channels(member.user_id, Some(new_parent), &*tx)
 722                    .await?,
 723            ))
 724        }
 725
 726        let public_parents = self
 727            .public_ancestors_including_self(new_parent, &*tx)
 728            .await?;
 729        let public_parent = public_parents.last();
 730
 731        let Some(public_parent) = public_parent else {
 732            return Ok(results);
 733        };
 734
 735        // could save some time in the common case by skipping this if the
 736        // new channel is not public and has no public descendants.
 737        let public_members = if public_parent == new_parent {
 738            members
 739        } else {
 740            self.get_channel_participant_details_internal(public_parent, &*tx)
 741                .await?
 742        };
 743
 744        for member in public_members {
 745            if !member.role.can_only_see_public_descendants() {
 746                continue;
 747            };
 748            results.push((
 749                member.user_id,
 750                self.get_user_channels(member.user_id, Some(public_parent), &*tx)
 751                    .await?,
 752            ))
 753        }
 754
 755        Ok(results)
 756    }
 757
 758    /// Sets the role for the specified channel member.
 759    pub async fn set_channel_member_role(
 760        &self,
 761        channel_id: ChannelId,
 762        admin_id: UserId,
 763        for_user: UserId,
 764        role: ChannelRole,
 765    ) -> Result<SetMemberRoleResult> {
 766        self.transaction(|tx| async move {
 767            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 768            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 769                .await?;
 770
 771            let membership = channel_member::Entity::find()
 772                .filter(
 773                    channel_member::Column::ChannelId
 774                        .eq(channel_id)
 775                        .and(channel_member::Column::UserId.eq(for_user)),
 776                )
 777                .one(&*tx)
 778                .await?;
 779
 780            let Some(membership) = membership else {
 781                Err(anyhow!("no such member"))?
 782            };
 783
 784            let mut update = membership.into_active_model();
 785            update.role = ActiveValue::Set(role);
 786            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 787
 788            if updated.accepted {
 789                Ok(SetMemberRoleResult::MembershipUpdated(
 790                    self.calculate_membership_updated(&channel, for_user, &*tx)
 791                        .await?,
 792                ))
 793            } else {
 794                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 795                    channel, role,
 796                )))
 797            }
 798        })
 799        .await
 800    }
 801
 802    /// Returns the details for the specified channel member.
 803    pub async fn get_channel_participant_details(
 804        &self,
 805        channel_id: ChannelId,
 806        user_id: UserId,
 807    ) -> Result<Vec<proto::ChannelMember>> {
 808        let (role, members) = self
 809            .transaction(move |tx| async move {
 810                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 811                let role = self
 812                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 813                    .await?;
 814                Ok((
 815                    role,
 816                    self.get_channel_participant_details_internal(&channel, &*tx)
 817                        .await?,
 818                ))
 819            })
 820            .await?;
 821
 822        if role == ChannelRole::Admin {
 823            Ok(members
 824                .into_iter()
 825                .map(|channel_member| channel_member.to_proto())
 826                .collect())
 827        } else {
 828            return Ok(members
 829                .into_iter()
 830                .filter_map(|member| {
 831                    if member.kind == proto::channel_member::Kind::Invitee {
 832                        return None;
 833                    }
 834                    Some(ChannelMember {
 835                        role: member.role,
 836                        user_id: member.user_id,
 837                        kind: proto::channel_member::Kind::Member,
 838                    })
 839                })
 840                .map(|channel_member| channel_member.to_proto())
 841                .collect());
 842        }
 843    }
 844
 845    async fn get_channel_participant_details_internal(
 846        &self,
 847        channel: &channel::Model,
 848        tx: &DatabaseTransaction,
 849    ) -> Result<Vec<ChannelMember>> {
 850        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 851        enum QueryMemberDetails {
 852            UserId,
 853            Role,
 854            IsDirectMember,
 855            Accepted,
 856            Visibility,
 857        }
 858
 859        let mut stream = channel_member::Entity::find()
 860            .left_join(channel::Entity)
 861            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 862            .select_only()
 863            .column(channel_member::Column::UserId)
 864            .column(channel_member::Column::Role)
 865            .column_as(
 866                channel_member::Column::ChannelId.eq(channel.id),
 867                QueryMemberDetails::IsDirectMember,
 868            )
 869            .column(channel_member::Column::Accepted)
 870            .column(channel::Column::Visibility)
 871            .into_values::<_, QueryMemberDetails>()
 872            .stream(&*tx)
 873            .await?;
 874
 875        let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
 876
 877        while let Some(user_membership) = stream.next().await {
 878            let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
 879                UserId,
 880                ChannelRole,
 881                bool,
 882                bool,
 883                ChannelVisibility,
 884            ) = user_membership?;
 885            let kind = match (is_direct_member, is_invite_accepted) {
 886                (true, true) => proto::channel_member::Kind::Member,
 887                (true, false) => proto::channel_member::Kind::Invitee,
 888                (false, true) => proto::channel_member::Kind::AncestorMember,
 889                (false, false) => continue,
 890            };
 891
 892            if channel_role == ChannelRole::Guest
 893                && visibility != ChannelVisibility::Public
 894                && channel.visibility != ChannelVisibility::Public
 895            {
 896                continue;
 897            }
 898
 899            if let Some(details_mut) = user_details.get_mut(&user_id) {
 900                if channel_role.should_override(details_mut.role) {
 901                    details_mut.role = channel_role;
 902                }
 903                if kind == Kind::Member {
 904                    details_mut.kind = kind;
 905                // the UI is going to be a bit confusing if you already have permissions
 906                // that are greater than or equal to the ones you're being invited to.
 907                } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
 908                    details_mut.kind = kind;
 909                }
 910            } else {
 911                user_details.insert(
 912                    user_id,
 913                    ChannelMember {
 914                        user_id,
 915                        kind,
 916                        role: channel_role,
 917                    },
 918                );
 919            }
 920        }
 921
 922        Ok(user_details
 923            .into_iter()
 924            .map(|(_, details)| details)
 925            .collect())
 926    }
 927
 928    /// Returns the participants in the given channel.
 929    pub async fn get_channel_participants(
 930        &self,
 931        channel: &channel::Model,
 932        tx: &DatabaseTransaction,
 933    ) -> Result<Vec<UserId>> {
 934        let participants = self
 935            .get_channel_participant_details_internal(channel, &*tx)
 936            .await?;
 937        Ok(participants
 938            .into_iter()
 939            .map(|member| member.user_id)
 940            .collect())
 941    }
 942
 943    /// Returns whether the given user is an admin in the specified channel.
 944    pub async fn check_user_is_channel_admin(
 945        &self,
 946        channel: &channel::Model,
 947        user_id: UserId,
 948        tx: &DatabaseTransaction,
 949    ) -> Result<ChannelRole> {
 950        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 951        match role {
 952            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 953            Some(ChannelRole::Member)
 954            | Some(ChannelRole::Banned)
 955            | Some(ChannelRole::Guest)
 956            | None => Err(anyhow!(
 957                "user is not a channel admin or channel does not exist"
 958            ))?,
 959        }
 960    }
 961
 962    /// Returns whether the given user is a member of the specified channel.
 963    pub async fn check_user_is_channel_member(
 964        &self,
 965        channel: &channel::Model,
 966        user_id: UserId,
 967        tx: &DatabaseTransaction,
 968    ) -> Result<ChannelRole> {
 969        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 970        match channel_role {
 971            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 972            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 973                "user is not a channel member or channel does not exist"
 974            ))?,
 975        }
 976    }
 977
 978    /// Returns whether the given user is a participant in the specified channel.
 979    pub async fn check_user_is_channel_participant(
 980        &self,
 981        channel: &channel::Model,
 982        user_id: UserId,
 983        tx: &DatabaseTransaction,
 984    ) -> Result<ChannelRole> {
 985        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 986        match role {
 987            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 988                Ok(role.unwrap())
 989            }
 990            Some(ChannelRole::Banned) | None => Err(anyhow!(
 991                "user is not a channel participant or channel does not exist"
 992            ))?,
 993        }
 994    }
 995
 996    /// Returns a user's pending invite for the given channel, if one exists.
 997    pub async fn pending_invite_for_channel(
 998        &self,
 999        channel: &channel::Model,
1000        user_id: UserId,
1001        tx: &DatabaseTransaction,
1002    ) -> Result<Option<channel_member::Model>> {
1003        let row = channel_member::Entity::find()
1004            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
1005            .filter(channel_member::Column::UserId.eq(user_id))
1006            .filter(channel_member::Column::Accepted.eq(false))
1007            .one(&*tx)
1008            .await?;
1009
1010        Ok(row)
1011    }
1012
1013    async fn public_parent_channel(
1014        &self,
1015        channel: &channel::Model,
1016        tx: &DatabaseTransaction,
1017    ) -> Result<Option<channel::Model>> {
1018        let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1019        if path.last().unwrap().id == channel.id {
1020            path.pop();
1021        }
1022        Ok(path.pop())
1023    }
1024
1025    pub(crate) async fn public_ancestors_including_self(
1026        &self,
1027        channel: &channel::Model,
1028        tx: &DatabaseTransaction,
1029    ) -> Result<Vec<channel::Model>> {
1030        let visible_channels = channel::Entity::find()
1031            .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1032            .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1033            .order_by_asc(channel::Column::ParentPath)
1034            .all(&*tx)
1035            .await?;
1036
1037        Ok(visible_channels)
1038    }
1039
1040    /// Returns the role for a user in the given channel.
1041    pub async fn channel_role_for_user(
1042        &self,
1043        channel: &channel::Model,
1044        user_id: UserId,
1045        tx: &DatabaseTransaction,
1046    ) -> Result<Option<ChannelRole>> {
1047        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1048        enum QueryChannelMembership {
1049            ChannelId,
1050            Role,
1051            Visibility,
1052        }
1053
1054        let mut rows = channel_member::Entity::find()
1055            .left_join(channel::Entity)
1056            .filter(
1057                channel_member::Column::ChannelId
1058                    .is_in(channel.ancestors_including_self())
1059                    .and(channel_member::Column::UserId.eq(user_id))
1060                    .and(channel_member::Column::Accepted.eq(true)),
1061            )
1062            .select_only()
1063            .column(channel_member::Column::ChannelId)
1064            .column(channel_member::Column::Role)
1065            .column(channel::Column::Visibility)
1066            .into_values::<_, QueryChannelMembership>()
1067            .stream(&*tx)
1068            .await?;
1069
1070        let mut user_role: Option<ChannelRole> = None;
1071
1072        let mut is_participant = false;
1073        let mut current_channel_visibility = None;
1074
1075        // note these channels are not iterated in any particular order,
1076        // our current logic takes the highest permission available.
1077        while let Some(row) = rows.next().await {
1078            let (membership_channel, role, visibility): (
1079                ChannelId,
1080                ChannelRole,
1081                ChannelVisibility,
1082            ) = row?;
1083
1084            match role {
1085                ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1086                    if let Some(users_role) = user_role {
1087                        user_role = Some(users_role.max(role));
1088                    } else {
1089                        user_role = Some(role)
1090                    }
1091                }
1092                ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1093                    is_participant = true
1094                }
1095                ChannelRole::Guest => {}
1096            }
1097            if channel.id == membership_channel {
1098                current_channel_visibility = Some(visibility);
1099            }
1100        }
1101        // free up database connection
1102        drop(rows);
1103
1104        if is_participant && user_role.is_none() {
1105            if current_channel_visibility.is_none() {
1106                current_channel_visibility = channel::Entity::find()
1107                    .filter(channel::Column::Id.eq(channel.id))
1108                    .one(&*tx)
1109                    .await?
1110                    .map(|channel| channel.visibility);
1111            }
1112            if current_channel_visibility == Some(ChannelVisibility::Public) {
1113                user_role = Some(ChannelRole::Guest);
1114            }
1115        }
1116
1117        Ok(user_role)
1118    }
1119
1120    // Get the descendants of the given set if channels, ordered by their
1121    // path.
1122    async fn get_channel_descendants_including_self(
1123        &self,
1124        channel_ids: impl IntoIterator<Item = ChannelId>,
1125        tx: &DatabaseTransaction,
1126    ) -> Result<Vec<channel::Model>> {
1127        let mut values = String::new();
1128        for id in channel_ids {
1129            if !values.is_empty() {
1130                values.push_str(", ");
1131            }
1132            write!(&mut values, "({})", id).unwrap();
1133        }
1134
1135        if values.is_empty() {
1136            return Ok(vec![]);
1137        }
1138
1139        let sql = format!(
1140            r#"
1141            SELECT DISTINCT
1142                descendant_channels.*,
1143                descendant_channels.parent_path || descendant_channels.id as full_path
1144            FROM
1145                channels parent_channels, channels descendant_channels
1146            WHERE
1147                descendant_channels.id IN ({values}) OR
1148                (
1149                    parent_channels.id IN ({values}) AND
1150                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1151                )
1152            ORDER BY
1153                full_path ASC
1154            "#
1155        );
1156
1157        Ok(channel::Entity::find()
1158            .from_raw_sql(Statement::from_string(
1159                self.pool.get_database_backend(),
1160                sql,
1161            ))
1162            .all(tx)
1163            .await?)
1164    }
1165
1166    /// Returns the channel with the given ID.
1167    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1168        self.transaction(|tx| async move {
1169            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1170            let role = self
1171                .check_user_is_channel_participant(&channel, user_id, &*tx)
1172                .await?;
1173
1174            Ok(Channel::from_model(channel, role))
1175        })
1176        .await
1177    }
1178
1179    pub(crate) async fn get_channel_internal(
1180        &self,
1181        channel_id: ChannelId,
1182        tx: &DatabaseTransaction,
1183    ) -> Result<channel::Model> {
1184        Ok(channel::Entity::find_by_id(channel_id)
1185            .one(&*tx)
1186            .await?
1187            .ok_or_else(|| anyhow!("no such channel"))?)
1188    }
1189
1190    pub(crate) async fn get_or_create_channel_room(
1191        &self,
1192        channel_id: ChannelId,
1193        live_kit_room: &str,
1194        environment: &str,
1195        tx: &DatabaseTransaction,
1196    ) -> Result<RoomId> {
1197        let room = room::Entity::find()
1198            .filter(room::Column::ChannelId.eq(channel_id))
1199            .one(&*tx)
1200            .await?;
1201
1202        let room_id = if let Some(room) = room {
1203            if let Some(env) = room.environment {
1204                if &env != environment {
1205                    Err(anyhow!("must join using the {} release", env))?;
1206                }
1207            }
1208            room.id
1209        } else {
1210            let result = room::Entity::insert(room::ActiveModel {
1211                channel_id: ActiveValue::Set(Some(channel_id)),
1212                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1213                environment: ActiveValue::Set(Some(environment.to_string())),
1214                ..Default::default()
1215            })
1216            .exec(&*tx)
1217            .await?;
1218
1219            result.last_insert_id
1220        };
1221
1222        Ok(room_id)
1223    }
1224
1225    /// Move a channel from one parent to another
1226    pub async fn move_channel(
1227        &self,
1228        channel_id: ChannelId,
1229        new_parent_id: Option<ChannelId>,
1230        admin_id: UserId,
1231    ) -> Result<Option<MoveChannelResult>> {
1232        self.transaction(|tx| async move {
1233            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1234            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1235                .await?;
1236
1237            let new_parent_path;
1238            let new_parent_channel;
1239            if let Some(new_parent_id) = new_parent_id {
1240                let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1241                self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1242                    .await?;
1243
1244                if new_parent
1245                    .ancestors_including_self()
1246                    .any(|id| id == channel.id)
1247                {
1248                    Err(anyhow!("cannot move a channel into one of its descendants"))?;
1249                }
1250
1251                new_parent_path = new_parent.path();
1252                new_parent_channel = Some(new_parent);
1253            } else {
1254                new_parent_path = String::new();
1255                new_parent_channel = None;
1256            };
1257
1258            let previous_participants = self
1259                .get_channel_participant_details_internal(&channel, &*tx)
1260                .await?;
1261
1262            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1263            let new_path = format!("{}{}/", new_parent_path, channel.id);
1264
1265            if old_path == new_path {
1266                return Ok(None);
1267            }
1268
1269            let mut model = channel.into_active_model();
1270            model.parent_path = ActiveValue::Set(new_parent_path);
1271            let channel = model.update(&*tx).await?;
1272
1273            if new_parent_channel.is_none() {
1274                channel_member::ActiveModel {
1275                    id: ActiveValue::NotSet,
1276                    channel_id: ActiveValue::Set(channel_id),
1277                    user_id: ActiveValue::Set(admin_id),
1278                    accepted: ActiveValue::Set(true),
1279                    role: ActiveValue::Set(ChannelRole::Admin),
1280                }
1281                .insert(&*tx)
1282                .await?;
1283            }
1284
1285            let descendent_ids =
1286                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1287                    self.pool.get_database_backend(),
1288                    "
1289                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1290                    WHERE parent_path LIKE $3 || '%'
1291                    RETURNING id
1292                ",
1293                    [old_path.clone().into(), new_path.into(), old_path.into()],
1294                ))
1295                .all(&*tx)
1296                .await?;
1297
1298            let participants_to_update: HashMap<_, _> = self
1299                .participants_to_notify_for_channel_change(
1300                    new_parent_channel.as_ref().unwrap_or(&channel),
1301                    &*tx,
1302                )
1303                .await?
1304                .into_iter()
1305                .collect();
1306
1307            let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1308            for id in descendent_ids {
1309                moved_channels.insert(id);
1310            }
1311            moved_channels.insert(channel_id);
1312
1313            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1314            for participant in previous_participants {
1315                if participant.kind == proto::channel_member::Kind::AncestorMember {
1316                    if !participants_to_update.contains_key(&participant.user_id) {
1317                        participants_to_remove.insert(participant.user_id);
1318                    }
1319                }
1320            }
1321
1322            Ok(Some(MoveChannelResult {
1323                participants_to_remove,
1324                participants_to_update,
1325                moved_channels,
1326            }))
1327        })
1328        .await
1329    }
1330}
1331
1332#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1333enum QueryIds {
1334    Id,
1335}
1336
1337#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1338enum QueryUserIds {
1339    UserId,
1340}