channels.rs

   1use super::*;
   2use rpc::{proto::channel_member::Kind, ErrorCode, ErrorCodeExt};
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self.create_channel(name, None, creator_id).await?.id)
  23    }
  24
  25    #[cfg(test)]
  26    pub async fn create_sub_channel(
  27        &self,
  28        name: &str,
  29        parent: ChannelId,
  30        creator_id: UserId,
  31    ) -> Result<ChannelId> {
  32        Ok(self
  33            .create_channel(name, Some(parent), creator_id)
  34            .await?
  35            .id)
  36    }
  37
  38    /// Creates a new channel.
  39    pub async fn create_channel(
  40        &self,
  41        name: &str,
  42        parent_channel_id: Option<ChannelId>,
  43        admin_id: UserId,
  44    ) -> Result<Channel> {
  45        let name = Self::sanitize_channel_name(name)?;
  46        self.transaction(move |tx| async move {
  47            let mut parent = None;
  48
  49            if let Some(parent_channel_id) = parent_channel_id {
  50                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  51                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  52                    .await?;
  53                parent = Some(parent_channel);
  54            }
  55
  56            let channel = channel::ActiveModel {
  57                id: ActiveValue::NotSet,
  58                name: ActiveValue::Set(name.to_string()),
  59                visibility: ActiveValue::Set(ChannelVisibility::Members),
  60                parent_path: ActiveValue::Set(
  61                    parent
  62                        .as_ref()
  63                        .map_or(String::new(), |parent| parent.path()),
  64                ),
  65                requires_zed_cla: ActiveValue::NotSet,
  66            }
  67            .insert(&*tx)
  68            .await?;
  69
  70            if parent.is_none() {
  71                channel_member::ActiveModel {
  72                    id: ActiveValue::NotSet,
  73                    channel_id: ActiveValue::Set(channel.id),
  74                    user_id: ActiveValue::Set(admin_id),
  75                    accepted: ActiveValue::Set(true),
  76                    role: ActiveValue::Set(ChannelRole::Admin),
  77                }
  78                .insert(&*tx)
  79                .await?;
  80            }
  81
  82            Ok(Channel::from_model(channel, ChannelRole::Admin))
  83        })
  84        .await
  85    }
  86
  87    /// Adds a user to the specified channel.
  88    pub async fn join_channel(
  89        &self,
  90        channel_id: ChannelId,
  91        user_id: UserId,
  92        connection: ConnectionId,
  93        environment: &str,
  94    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
  95        self.transaction(move |tx| async move {
  96            let channel = self.get_channel_internal(channel_id, &*tx).await?;
  97            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
  98
  99            let mut accept_invite_result = None;
 100
 101            if role.is_none() {
 102                if let Some(invitation) = self
 103                    .pending_invite_for_channel(&channel, user_id, &*tx)
 104                    .await?
 105                {
 106                    // note, this may be a parent channel
 107                    role = Some(invitation.role);
 108                    channel_member::Entity::update(channel_member::ActiveModel {
 109                        accepted: ActiveValue::Set(true),
 110                        ..invitation.into_active_model()
 111                    })
 112                    .exec(&*tx)
 113                    .await?;
 114
 115                    accept_invite_result = Some(
 116                        self.calculate_membership_updated(&channel, user_id, &*tx)
 117                            .await?,
 118                    );
 119
 120                    debug_assert!(
 121                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 122                    );
 123                } else if channel.visibility == ChannelVisibility::Public {
 124                    role = Some(ChannelRole::Guest);
 125                    let channel_to_join = self
 126                        .public_ancestors_including_self(&channel, &*tx)
 127                        .await?
 128                        .first()
 129                        .cloned()
 130                        .unwrap_or(channel.clone());
 131
 132                    channel_member::Entity::insert(channel_member::ActiveModel {
 133                        id: ActiveValue::NotSet,
 134                        channel_id: ActiveValue::Set(channel_to_join.id),
 135                        user_id: ActiveValue::Set(user_id),
 136                        accepted: ActiveValue::Set(true),
 137                        role: ActiveValue::Set(ChannelRole::Guest),
 138                    })
 139                    .exec(&*tx)
 140                    .await?;
 141
 142                    accept_invite_result = Some(
 143                        self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
 144                            .await?,
 145                    );
 146
 147                    debug_assert!(
 148                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 149                    );
 150                }
 151            }
 152
 153            if role.is_none() || role == Some(ChannelRole::Banned) {
 154                Err(ErrorCode::Forbidden.anyhow())?
 155            }
 156            let role = role.unwrap();
 157
 158            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 159            let room_id = self
 160                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 161                .await?;
 162
 163            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 164                .await
 165                .map(|jr| (jr, accept_invite_result, role))
 166        })
 167        .await
 168    }
 169
 170    /// Sets the visibiltity of the given channel.
 171    pub async fn set_channel_visibility(
 172        &self,
 173        channel_id: ChannelId,
 174        visibility: ChannelVisibility,
 175        admin_id: UserId,
 176    ) -> Result<SetChannelVisibilityResult> {
 177        self.transaction(move |tx| async move {
 178            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 179
 180            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 181                .await?;
 182
 183            let previous_members = self
 184                .get_channel_participant_details_internal(&channel, &*tx)
 185                .await?;
 186
 187            let mut model = channel.into_active_model();
 188            model.visibility = ActiveValue::Set(visibility);
 189            let channel = model.update(&*tx).await?;
 190
 191            let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
 192                .participants_to_notify_for_channel_change(&channel, &*tx)
 193                .await?
 194                .into_iter()
 195                .collect();
 196
 197            let mut channels_to_remove: Vec<ChannelId> = vec![];
 198            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
 199            match visibility {
 200                ChannelVisibility::Members => {
 201                    let all_descendents: Vec<ChannelId> = self
 202                        .get_channel_descendants_including_self(vec![channel_id], &*tx)
 203                        .await?
 204                        .into_iter()
 205                        .map(|channel| channel.id)
 206                        .collect();
 207
 208                    channels_to_remove = channel::Entity::find()
 209                        .filter(
 210                            channel::Column::Id
 211                                .is_in(all_descendents)
 212                                .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
 213                        )
 214                        .all(&*tx)
 215                        .await?
 216                        .into_iter()
 217                        .map(|channel| channel.id)
 218                        .collect();
 219
 220                    channels_to_remove.push(channel_id);
 221
 222                    for member in previous_members {
 223                        if member.role.can_only_see_public_descendants() {
 224                            participants_to_remove.insert(member.user_id);
 225                        }
 226                    }
 227                }
 228                ChannelVisibility::Public => {
 229                    if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
 230                        let parent_updates = self
 231                            .participants_to_notify_for_channel_change(&public_parent, &*tx)
 232                            .await?;
 233
 234                        for (user_id, channels) in parent_updates {
 235                            participants_to_update.insert(user_id, channels);
 236                        }
 237                    }
 238                }
 239            }
 240
 241            Ok(SetChannelVisibilityResult {
 242                participants_to_update,
 243                participants_to_remove,
 244                channels_to_remove,
 245            })
 246        })
 247        .await
 248    }
 249
 250    #[cfg(test)]
 251    pub async fn set_channel_requires_zed_cla(
 252        &self,
 253        channel_id: ChannelId,
 254        requires_zed_cla: bool,
 255    ) -> Result<()> {
 256        self.transaction(move |tx| async move {
 257            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 258            let mut model = channel.into_active_model();
 259            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 260            model.update(&*tx).await?;
 261            Ok(())
 262        })
 263        .await
 264    }
 265
 266    /// Deletes the channel with the specified ID.
 267    pub async fn delete_channel(
 268        &self,
 269        channel_id: ChannelId,
 270        user_id: UserId,
 271    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 272        self.transaction(move |tx| async move {
 273            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 274            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 275                .await?;
 276
 277            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 278                .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 279                .select_only()
 280                .column(channel_member::Column::UserId)
 281                .distinct()
 282                .into_values::<_, QueryUserIds>()
 283                .all(&*tx)
 284                .await?;
 285
 286            let channels_to_remove = self
 287                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 288                .await?
 289                .into_iter()
 290                .map(|channel| channel.id)
 291                .collect::<Vec<_>>();
 292
 293            channel::Entity::delete_many()
 294                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 295                .exec(&*tx)
 296                .await?;
 297
 298            Ok((channels_to_remove, members_to_notify))
 299        })
 300        .await
 301    }
 302
 303    /// Invites a user to a channel as a member.
 304    pub async fn invite_channel_member(
 305        &self,
 306        channel_id: ChannelId,
 307        invitee_id: UserId,
 308        inviter_id: UserId,
 309        role: ChannelRole,
 310    ) -> Result<InviteMemberResult> {
 311        self.transaction(move |tx| async move {
 312            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 313            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 314                .await?;
 315
 316            channel_member::ActiveModel {
 317                id: ActiveValue::NotSet,
 318                channel_id: ActiveValue::Set(channel_id),
 319                user_id: ActiveValue::Set(invitee_id),
 320                accepted: ActiveValue::Set(false),
 321                role: ActiveValue::Set(role),
 322            }
 323            .insert(&*tx)
 324            .await?;
 325
 326            let channel = Channel::from_model(channel, role);
 327
 328            let notifications = self
 329                .create_notification(
 330                    invitee_id,
 331                    rpc::Notification::ChannelInvitation {
 332                        channel_id: channel_id.to_proto(),
 333                        channel_name: channel.name.clone(),
 334                        inviter_id: inviter_id.to_proto(),
 335                    },
 336                    true,
 337                    &*tx,
 338                )
 339                .await?
 340                .into_iter()
 341                .collect();
 342
 343            Ok(InviteMemberResult {
 344                channel,
 345                notifications,
 346            })
 347        })
 348        .await
 349    }
 350
 351    fn sanitize_channel_name(name: &str) -> Result<&str> {
 352        let new_name = name.trim().trim_start_matches('#');
 353        if new_name == "" {
 354            Err(anyhow!("channel name can't be blank"))?;
 355        }
 356        Ok(new_name)
 357    }
 358
 359    /// Renames the specified channel.
 360    pub async fn rename_channel(
 361        &self,
 362        channel_id: ChannelId,
 363        admin_id: UserId,
 364        new_name: &str,
 365    ) -> Result<RenameChannelResult> {
 366        self.transaction(move |tx| async move {
 367            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 368
 369            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 370            let role = self
 371                .check_user_is_channel_admin(&channel, admin_id, &*tx)
 372                .await?;
 373
 374            let mut model = channel.into_active_model();
 375            model.name = ActiveValue::Set(new_name.clone());
 376            let channel = model.update(&*tx).await?;
 377
 378            let participants = self
 379                .get_channel_participant_details_internal(&channel, &*tx)
 380                .await?;
 381
 382            Ok(RenameChannelResult {
 383                channel: Channel::from_model(channel.clone(), role),
 384                participants_to_update: participants
 385                    .iter()
 386                    .map(|participant| {
 387                        (
 388                            participant.user_id,
 389                            Channel::from_model(channel.clone(), participant.role),
 390                        )
 391                    })
 392                    .collect(),
 393            })
 394        })
 395        .await
 396    }
 397
 398    /// accept or decline an invite to join a channel
 399    pub async fn respond_to_channel_invite(
 400        &self,
 401        channel_id: ChannelId,
 402        user_id: UserId,
 403        accept: bool,
 404    ) -> Result<RespondToChannelInvite> {
 405        self.transaction(move |tx| async move {
 406            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 407
 408            let membership_update = if accept {
 409                let rows_affected = channel_member::Entity::update_many()
 410                    .set(channel_member::ActiveModel {
 411                        accepted: ActiveValue::Set(accept),
 412                        ..Default::default()
 413                    })
 414                    .filter(
 415                        channel_member::Column::ChannelId
 416                            .eq(channel_id)
 417                            .and(channel_member::Column::UserId.eq(user_id))
 418                            .and(channel_member::Column::Accepted.eq(false)),
 419                    )
 420                    .exec(&*tx)
 421                    .await?
 422                    .rows_affected;
 423
 424                if rows_affected == 0 {
 425                    Err(anyhow!("no such invitation"))?;
 426                }
 427
 428                Some(
 429                    self.calculate_membership_updated(&channel, user_id, &*tx)
 430                        .await?,
 431                )
 432            } else {
 433                let rows_affected = channel_member::Entity::delete_many()
 434                    .filter(
 435                        channel_member::Column::ChannelId
 436                            .eq(channel_id)
 437                            .and(channel_member::Column::UserId.eq(user_id))
 438                            .and(channel_member::Column::Accepted.eq(false)),
 439                    )
 440                    .exec(&*tx)
 441                    .await?
 442                    .rows_affected;
 443                if rows_affected == 0 {
 444                    Err(anyhow!("no such invitation"))?;
 445                }
 446
 447                None
 448            };
 449
 450            Ok(RespondToChannelInvite {
 451                membership_update,
 452                notifications: self
 453                    .mark_notification_as_read_with_response(
 454                        user_id,
 455                        &rpc::Notification::ChannelInvitation {
 456                            channel_id: channel_id.to_proto(),
 457                            channel_name: Default::default(),
 458                            inviter_id: Default::default(),
 459                        },
 460                        accept,
 461                        &*tx,
 462                    )
 463                    .await?
 464                    .into_iter()
 465                    .collect(),
 466            })
 467        })
 468        .await
 469    }
 470
 471    async fn calculate_membership_updated(
 472        &self,
 473        channel: &channel::Model,
 474        user_id: UserId,
 475        tx: &DatabaseTransaction,
 476    ) -> Result<MembershipUpdated> {
 477        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 478        let removed_channels = self
 479            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 480            .await?
 481            .into_iter()
 482            .filter_map(|channel| {
 483                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 484                    Some(channel.id)
 485                } else {
 486                    None
 487                }
 488            })
 489            .collect::<Vec<_>>();
 490
 491        Ok(MembershipUpdated {
 492            channel_id: channel.id,
 493            new_channels,
 494            removed_channels,
 495        })
 496    }
 497
 498    /// Removes a channel member.
 499    pub async fn remove_channel_member(
 500        &self,
 501        channel_id: ChannelId,
 502        member_id: UserId,
 503        admin_id: UserId,
 504    ) -> Result<RemoveChannelMemberResult> {
 505        self.transaction(|tx| async move {
 506            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 507            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 508                .await?;
 509
 510            let result = channel_member::Entity::delete_many()
 511                .filter(
 512                    channel_member::Column::ChannelId
 513                        .eq(channel_id)
 514                        .and(channel_member::Column::UserId.eq(member_id)),
 515                )
 516                .exec(&*tx)
 517                .await?;
 518
 519            if result.rows_affected == 0 {
 520                Err(anyhow!("no such member"))?;
 521            }
 522
 523            Ok(RemoveChannelMemberResult {
 524                membership_update: self
 525                    .calculate_membership_updated(&channel, member_id, &*tx)
 526                    .await?,
 527                notification_id: self
 528                    .remove_notification(
 529                        member_id,
 530                        rpc::Notification::ChannelInvitation {
 531                            channel_id: channel_id.to_proto(),
 532                            channel_name: Default::default(),
 533                            inviter_id: Default::default(),
 534                        },
 535                        &*tx,
 536                    )
 537                    .await?,
 538            })
 539        })
 540        .await
 541    }
 542
 543    /// Returns all channel invites for the user with the given ID.
 544    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 545        self.transaction(|tx| async move {
 546            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 547
 548            let channel_invites = channel_member::Entity::find()
 549                .filter(
 550                    channel_member::Column::UserId
 551                        .eq(user_id)
 552                        .and(channel_member::Column::Accepted.eq(false)),
 553                )
 554                .all(&*tx)
 555                .await?;
 556
 557            for invite in channel_invites {
 558                role_for_channel.insert(invite.channel_id, invite.role);
 559            }
 560
 561            let channels = channel::Entity::find()
 562                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 563                .all(&*tx)
 564                .await?;
 565
 566            let channels = channels
 567                .into_iter()
 568                .filter_map(|channel| {
 569                    let role = *role_for_channel.get(&channel.id)?;
 570                    Some(Channel::from_model(channel, role))
 571                })
 572                .collect();
 573
 574            Ok(channels)
 575        })
 576        .await
 577    }
 578
 579    /// Returns all channels for the user with the given ID.
 580    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 581        self.transaction(|tx| async move {
 582            let tx = tx;
 583
 584            self.get_user_channels(user_id, None, &tx).await
 585        })
 586        .await
 587    }
 588
 589    /// Returns all channels for the user with the given ID that are descendants
 590    /// of the specified ancestor channel.
 591    pub async fn get_user_channels(
 592        &self,
 593        user_id: UserId,
 594        ancestor_channel: Option<&channel::Model>,
 595        tx: &DatabaseTransaction,
 596    ) -> Result<ChannelsForUser> {
 597        let channel_memberships = channel_member::Entity::find()
 598            .filter(
 599                channel_member::Column::UserId
 600                    .eq(user_id)
 601                    .and(channel_member::Column::Accepted.eq(true)),
 602            )
 603            .all(&*tx)
 604            .await?;
 605
 606        let descendants = self
 607            .get_channel_descendants_including_self(
 608                channel_memberships.iter().map(|m| m.channel_id),
 609                &*tx,
 610            )
 611            .await?;
 612
 613        let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
 614        for membership in channel_memberships.iter() {
 615            roles_by_channel_id.insert(membership.channel_id, membership.role);
 616        }
 617
 618        let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
 619
 620        let channels: Vec<Channel> = descendants
 621            .into_iter()
 622            .filter_map(|channel| {
 623                let parent_role = channel
 624                    .parent_id()
 625                    .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
 626
 627                let role = if let Some(parent_role) = parent_role {
 628                    let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
 629                        existing_role.max(*parent_role)
 630                    } else {
 631                        *parent_role
 632                    };
 633                    roles_by_channel_id.insert(channel.id, role);
 634                    role
 635                } else {
 636                    *roles_by_channel_id.get(&channel.id)?
 637                };
 638
 639                let can_see_parent_paths = role.can_see_all_descendants()
 640                    || role.can_only_see_public_descendants()
 641                        && channel.visibility == ChannelVisibility::Public;
 642                if !can_see_parent_paths {
 643                    return None;
 644                }
 645
 646                visible_channel_ids.insert(channel.id);
 647
 648                if let Some(ancestor) = ancestor_channel {
 649                    if !channel
 650                        .ancestors_including_self()
 651                        .any(|id| id == ancestor.id)
 652                    {
 653                        return None;
 654                    }
 655                }
 656
 657                let mut channel = Channel::from_model(channel, role);
 658                channel
 659                    .parent_path
 660                    .retain(|id| visible_channel_ids.contains(&id));
 661
 662                Some(channel)
 663            })
 664            .collect();
 665
 666        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 667        enum QueryUserIdsAndChannelIds {
 668            ChannelId,
 669            UserId,
 670        }
 671
 672        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 673        {
 674            let mut rows = room_participant::Entity::find()
 675                .inner_join(room::Entity)
 676                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 677                .select_only()
 678                .column(room::Column::ChannelId)
 679                .column(room_participant::Column::UserId)
 680                .into_values::<_, QueryUserIdsAndChannelIds>()
 681                .stream(&*tx)
 682                .await?;
 683            while let Some(row) = rows.next().await {
 684                let row: (ChannelId, UserId) = row?;
 685                channel_participants.entry(row.0).or_default().push(row.1)
 686            }
 687        }
 688
 689        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 690        let channel_buffer_changes = self
 691            .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
 692            .await?;
 693
 694        let unseen_messages = self
 695            .unseen_channel_messages(user_id, &channel_ids, &*tx)
 696            .await?;
 697
 698        Ok(ChannelsForUser {
 699            channels,
 700            channel_participants,
 701            unseen_buffer_changes: channel_buffer_changes,
 702            channel_messages: unseen_messages,
 703        })
 704    }
 705
 706    pub async fn new_participants_to_notify(
 707        &self,
 708        parent_channel_id: ChannelId,
 709    ) -> Result<Vec<(UserId, ChannelsForUser)>> {
 710        self.weak_transaction(|tx| async move {
 711            let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
 712            self.participants_to_notify_for_channel_change(&parent_channel, &*tx)
 713                .await
 714        })
 715        .await
 716    }
 717
 718    // TODO: this is very expensive, and we should rethink
 719    async fn participants_to_notify_for_channel_change(
 720        &self,
 721        new_parent: &channel::Model,
 722        tx: &DatabaseTransaction,
 723    ) -> Result<Vec<(UserId, ChannelsForUser)>> {
 724        let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
 725
 726        let members = self
 727            .get_channel_participant_details_internal(new_parent, &*tx)
 728            .await?;
 729
 730        for member in members.iter() {
 731            if !member.role.can_see_all_descendants() {
 732                continue;
 733            }
 734            results.push((
 735                member.user_id,
 736                self.get_user_channels(member.user_id, Some(new_parent), &*tx)
 737                    .await?,
 738            ))
 739        }
 740
 741        let public_parents = self
 742            .public_ancestors_including_self(new_parent, &*tx)
 743            .await?;
 744        let public_parent = public_parents.last();
 745
 746        let Some(public_parent) = public_parent else {
 747            return Ok(results);
 748        };
 749
 750        // could save some time in the common case by skipping this if the
 751        // new channel is not public and has no public descendants.
 752        let public_members = if public_parent == new_parent {
 753            members
 754        } else {
 755            self.get_channel_participant_details_internal(public_parent, &*tx)
 756                .await?
 757        };
 758
 759        for member in public_members {
 760            if !member.role.can_only_see_public_descendants() {
 761                continue;
 762            };
 763            results.push((
 764                member.user_id,
 765                self.get_user_channels(member.user_id, Some(public_parent), &*tx)
 766                    .await?,
 767            ))
 768        }
 769
 770        Ok(results)
 771    }
 772
 773    /// Sets the role for the specified channel member.
 774    pub async fn set_channel_member_role(
 775        &self,
 776        channel_id: ChannelId,
 777        admin_id: UserId,
 778        for_user: UserId,
 779        role: ChannelRole,
 780    ) -> Result<SetMemberRoleResult> {
 781        self.transaction(|tx| async move {
 782            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 783            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 784                .await?;
 785
 786            let membership = channel_member::Entity::find()
 787                .filter(
 788                    channel_member::Column::ChannelId
 789                        .eq(channel_id)
 790                        .and(channel_member::Column::UserId.eq(for_user)),
 791                )
 792                .one(&*tx)
 793                .await?;
 794
 795            let Some(membership) = membership else {
 796                Err(anyhow!("no such member"))?
 797            };
 798
 799            let mut update = membership.into_active_model();
 800            update.role = ActiveValue::Set(role);
 801            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 802
 803            if updated.accepted {
 804                Ok(SetMemberRoleResult::MembershipUpdated(
 805                    self.calculate_membership_updated(&channel, for_user, &*tx)
 806                        .await?,
 807                ))
 808            } else {
 809                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 810                    channel, role,
 811                )))
 812            }
 813        })
 814        .await
 815    }
 816
 817    /// Returns the details for the specified channel member.
 818    pub async fn get_channel_participant_details(
 819        &self,
 820        channel_id: ChannelId,
 821        user_id: UserId,
 822    ) -> Result<Vec<proto::ChannelMember>> {
 823        let (role, members) = self
 824            .transaction(move |tx| async move {
 825                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 826                let role = self
 827                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 828                    .await?;
 829                Ok((
 830                    role,
 831                    self.get_channel_participant_details_internal(&channel, &*tx)
 832                        .await?,
 833                ))
 834            })
 835            .await?;
 836
 837        if role == ChannelRole::Admin {
 838            Ok(members
 839                .into_iter()
 840                .map(|channel_member| channel_member.to_proto())
 841                .collect())
 842        } else {
 843            return Ok(members
 844                .into_iter()
 845                .filter_map(|member| {
 846                    if member.kind == proto::channel_member::Kind::Invitee {
 847                        return None;
 848                    }
 849                    Some(ChannelMember {
 850                        role: member.role,
 851                        user_id: member.user_id,
 852                        kind: proto::channel_member::Kind::Member,
 853                    })
 854                })
 855                .map(|channel_member| channel_member.to_proto())
 856                .collect());
 857        }
 858    }
 859
 860    async fn get_channel_participant_details_internal(
 861        &self,
 862        channel: &channel::Model,
 863        tx: &DatabaseTransaction,
 864    ) -> Result<Vec<ChannelMember>> {
 865        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 866        enum QueryMemberDetails {
 867            UserId,
 868            Role,
 869            IsDirectMember,
 870            Accepted,
 871            Visibility,
 872        }
 873
 874        let mut stream = channel_member::Entity::find()
 875            .left_join(channel::Entity)
 876            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 877            .select_only()
 878            .column(channel_member::Column::UserId)
 879            .column(channel_member::Column::Role)
 880            .column_as(
 881                channel_member::Column::ChannelId.eq(channel.id),
 882                QueryMemberDetails::IsDirectMember,
 883            )
 884            .column(channel_member::Column::Accepted)
 885            .column(channel::Column::Visibility)
 886            .into_values::<_, QueryMemberDetails>()
 887            .stream(&*tx)
 888            .await?;
 889
 890        let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
 891
 892        while let Some(user_membership) = stream.next().await {
 893            let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
 894                UserId,
 895                ChannelRole,
 896                bool,
 897                bool,
 898                ChannelVisibility,
 899            ) = user_membership?;
 900            let kind = match (is_direct_member, is_invite_accepted) {
 901                (true, true) => proto::channel_member::Kind::Member,
 902                (true, false) => proto::channel_member::Kind::Invitee,
 903                (false, true) => proto::channel_member::Kind::AncestorMember,
 904                (false, false) => continue,
 905            };
 906
 907            if channel_role == ChannelRole::Guest
 908                && visibility != ChannelVisibility::Public
 909                && channel.visibility != ChannelVisibility::Public
 910            {
 911                continue;
 912            }
 913
 914            if let Some(details_mut) = user_details.get_mut(&user_id) {
 915                if channel_role.should_override(details_mut.role) {
 916                    details_mut.role = channel_role;
 917                }
 918                if kind == Kind::Member {
 919                    details_mut.kind = kind;
 920                // the UI is going to be a bit confusing if you already have permissions
 921                // that are greater than or equal to the ones you're being invited to.
 922                } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
 923                    details_mut.kind = kind;
 924                }
 925            } else {
 926                user_details.insert(
 927                    user_id,
 928                    ChannelMember {
 929                        user_id,
 930                        kind,
 931                        role: channel_role,
 932                    },
 933                );
 934            }
 935        }
 936
 937        Ok(user_details
 938            .into_iter()
 939            .map(|(_, details)| details)
 940            .collect())
 941    }
 942
 943    /// Returns the participants in the given channel.
 944    pub async fn get_channel_participants(
 945        &self,
 946        channel: &channel::Model,
 947        tx: &DatabaseTransaction,
 948    ) -> Result<Vec<UserId>> {
 949        let participants = self
 950            .get_channel_participant_details_internal(channel, &*tx)
 951            .await?;
 952        Ok(participants
 953            .into_iter()
 954            .map(|member| member.user_id)
 955            .collect())
 956    }
 957
 958    /// Returns whether the given user is an admin in the specified channel.
 959    pub async fn check_user_is_channel_admin(
 960        &self,
 961        channel: &channel::Model,
 962        user_id: UserId,
 963        tx: &DatabaseTransaction,
 964    ) -> Result<ChannelRole> {
 965        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 966        match role {
 967            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 968            Some(ChannelRole::Member)
 969            | Some(ChannelRole::Banned)
 970            | Some(ChannelRole::Guest)
 971            | None => Err(anyhow!(
 972                "user is not a channel admin or channel does not exist"
 973            ))?,
 974        }
 975    }
 976
 977    /// Returns whether the given user is a member of the specified channel.
 978    pub async fn check_user_is_channel_member(
 979        &self,
 980        channel: &channel::Model,
 981        user_id: UserId,
 982        tx: &DatabaseTransaction,
 983    ) -> Result<ChannelRole> {
 984        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 985        match channel_role {
 986            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 987            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 988                "user is not a channel member or channel does not exist"
 989            ))?,
 990        }
 991    }
 992
 993    /// Returns whether the given user is a participant in the specified channel.
 994    pub async fn check_user_is_channel_participant(
 995        &self,
 996        channel: &channel::Model,
 997        user_id: UserId,
 998        tx: &DatabaseTransaction,
 999    ) -> Result<ChannelRole> {
1000        let role = self.channel_role_for_user(channel, user_id, tx).await?;
1001        match role {
1002            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
1003                Ok(role.unwrap())
1004            }
1005            Some(ChannelRole::Banned) | None => Err(anyhow!(
1006                "user is not a channel participant or channel does not exist"
1007            ))?,
1008        }
1009    }
1010
1011    /// Returns a user's pending invite for the given channel, if one exists.
1012    pub async fn pending_invite_for_channel(
1013        &self,
1014        channel: &channel::Model,
1015        user_id: UserId,
1016        tx: &DatabaseTransaction,
1017    ) -> Result<Option<channel_member::Model>> {
1018        let row = channel_member::Entity::find()
1019            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
1020            .filter(channel_member::Column::UserId.eq(user_id))
1021            .filter(channel_member::Column::Accepted.eq(false))
1022            .one(&*tx)
1023            .await?;
1024
1025        Ok(row)
1026    }
1027
1028    async fn public_parent_channel(
1029        &self,
1030        channel: &channel::Model,
1031        tx: &DatabaseTransaction,
1032    ) -> Result<Option<channel::Model>> {
1033        let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1034        if path.last().unwrap().id == channel.id {
1035            path.pop();
1036        }
1037        Ok(path.pop())
1038    }
1039
1040    pub(crate) async fn public_ancestors_including_self(
1041        &self,
1042        channel: &channel::Model,
1043        tx: &DatabaseTransaction,
1044    ) -> Result<Vec<channel::Model>> {
1045        let visible_channels = channel::Entity::find()
1046            .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1047            .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1048            .order_by_asc(channel::Column::ParentPath)
1049            .all(&*tx)
1050            .await?;
1051
1052        Ok(visible_channels)
1053    }
1054
1055    /// Returns the role for a user in the given channel.
1056    pub async fn channel_role_for_user(
1057        &self,
1058        channel: &channel::Model,
1059        user_id: UserId,
1060        tx: &DatabaseTransaction,
1061    ) -> Result<Option<ChannelRole>> {
1062        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1063        enum QueryChannelMembership {
1064            ChannelId,
1065            Role,
1066            Visibility,
1067        }
1068
1069        let mut rows = channel_member::Entity::find()
1070            .left_join(channel::Entity)
1071            .filter(
1072                channel_member::Column::ChannelId
1073                    .is_in(channel.ancestors_including_self())
1074                    .and(channel_member::Column::UserId.eq(user_id))
1075                    .and(channel_member::Column::Accepted.eq(true)),
1076            )
1077            .select_only()
1078            .column(channel_member::Column::ChannelId)
1079            .column(channel_member::Column::Role)
1080            .column(channel::Column::Visibility)
1081            .into_values::<_, QueryChannelMembership>()
1082            .stream(&*tx)
1083            .await?;
1084
1085        let mut user_role: Option<ChannelRole> = None;
1086
1087        let mut is_participant = false;
1088        let mut current_channel_visibility = None;
1089
1090        // note these channels are not iterated in any particular order,
1091        // our current logic takes the highest permission available.
1092        while let Some(row) = rows.next().await {
1093            let (membership_channel, role, visibility): (
1094                ChannelId,
1095                ChannelRole,
1096                ChannelVisibility,
1097            ) = row?;
1098
1099            match role {
1100                ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1101                    if let Some(users_role) = user_role {
1102                        user_role = Some(users_role.max(role));
1103                    } else {
1104                        user_role = Some(role)
1105                    }
1106                }
1107                ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1108                    is_participant = true
1109                }
1110                ChannelRole::Guest => {}
1111            }
1112            if channel.id == membership_channel {
1113                current_channel_visibility = Some(visibility);
1114            }
1115        }
1116        // free up database connection
1117        drop(rows);
1118
1119        if is_participant && user_role.is_none() {
1120            if current_channel_visibility.is_none() {
1121                current_channel_visibility = channel::Entity::find()
1122                    .filter(channel::Column::Id.eq(channel.id))
1123                    .one(&*tx)
1124                    .await?
1125                    .map(|channel| channel.visibility);
1126            }
1127            if current_channel_visibility == Some(ChannelVisibility::Public) {
1128                user_role = Some(ChannelRole::Guest);
1129            }
1130        }
1131
1132        Ok(user_role)
1133    }
1134
1135    // Get the descendants of the given set if channels, ordered by their
1136    // path.
1137    async fn get_channel_descendants_including_self(
1138        &self,
1139        channel_ids: impl IntoIterator<Item = ChannelId>,
1140        tx: &DatabaseTransaction,
1141    ) -> Result<Vec<channel::Model>> {
1142        let mut values = String::new();
1143        for id in channel_ids {
1144            if !values.is_empty() {
1145                values.push_str(", ");
1146            }
1147            write!(&mut values, "({})", id).unwrap();
1148        }
1149
1150        if values.is_empty() {
1151            return Ok(vec![]);
1152        }
1153
1154        let sql = format!(
1155            r#"
1156            SELECT DISTINCT
1157                descendant_channels.*,
1158                descendant_channels.parent_path || descendant_channels.id as full_path
1159            FROM
1160                channels parent_channels, channels descendant_channels
1161            WHERE
1162                descendant_channels.id IN ({values}) OR
1163                (
1164                    parent_channels.id IN ({values}) AND
1165                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1166                )
1167            ORDER BY
1168                full_path ASC
1169            "#
1170        );
1171
1172        Ok(channel::Entity::find()
1173            .from_raw_sql(Statement::from_string(
1174                self.pool.get_database_backend(),
1175                sql,
1176            ))
1177            .all(tx)
1178            .await?)
1179    }
1180
1181    /// Returns the channel with the given ID.
1182    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1183        self.transaction(|tx| async move {
1184            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1185            let role = self
1186                .check_user_is_channel_participant(&channel, user_id, &*tx)
1187                .await?;
1188
1189            Ok(Channel::from_model(channel, role))
1190        })
1191        .await
1192    }
1193
1194    pub(crate) async fn get_channel_internal(
1195        &self,
1196        channel_id: ChannelId,
1197        tx: &DatabaseTransaction,
1198    ) -> Result<channel::Model> {
1199        Ok(channel::Entity::find_by_id(channel_id)
1200            .one(&*tx)
1201            .await?
1202            .ok_or_else(|| proto::ErrorCode::NoSuchChannel.anyhow())?)
1203    }
1204
1205    pub(crate) async fn get_or_create_channel_room(
1206        &self,
1207        channel_id: ChannelId,
1208        live_kit_room: &str,
1209        environment: &str,
1210        tx: &DatabaseTransaction,
1211    ) -> Result<RoomId> {
1212        let room = room::Entity::find()
1213            .filter(room::Column::ChannelId.eq(channel_id))
1214            .one(&*tx)
1215            .await?;
1216
1217        let room_id = if let Some(room) = room {
1218            if let Some(env) = room.environment {
1219                if &env != environment {
1220                    Err(ErrorCode::WrongReleaseChannel
1221                        .with_tag("required", &env)
1222                        .anyhow())?;
1223                }
1224            }
1225            room.id
1226        } else {
1227            let result = room::Entity::insert(room::ActiveModel {
1228                channel_id: ActiveValue::Set(Some(channel_id)),
1229                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1230                environment: ActiveValue::Set(Some(environment.to_string())),
1231                ..Default::default()
1232            })
1233            .exec(&*tx)
1234            .await?;
1235
1236            result.last_insert_id
1237        };
1238
1239        Ok(room_id)
1240    }
1241
1242    /// Move a channel from one parent to another
1243    pub async fn move_channel(
1244        &self,
1245        channel_id: ChannelId,
1246        new_parent_id: Option<ChannelId>,
1247        admin_id: UserId,
1248    ) -> Result<Option<MoveChannelResult>> {
1249        self.transaction(|tx| async move {
1250            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1251            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1252                .await?;
1253
1254            let new_parent_path;
1255            let new_parent_channel;
1256            if let Some(new_parent_id) = new_parent_id {
1257                let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1258                self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1259                    .await?;
1260
1261                if new_parent
1262                    .ancestors_including_self()
1263                    .any(|id| id == channel.id)
1264                {
1265                    Err(anyhow!("cannot move a channel into one of its descendants"))?;
1266                }
1267
1268                new_parent_path = new_parent.path();
1269                new_parent_channel = Some(new_parent);
1270            } else {
1271                new_parent_path = String::new();
1272                new_parent_channel = None;
1273            };
1274
1275            let previous_participants = self
1276                .get_channel_participant_details_internal(&channel, &*tx)
1277                .await?;
1278
1279            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1280            let new_path = format!("{}{}/", new_parent_path, channel.id);
1281
1282            if old_path == new_path {
1283                return Ok(None);
1284            }
1285
1286            let mut model = channel.into_active_model();
1287            model.parent_path = ActiveValue::Set(new_parent_path);
1288            model.update(&*tx).await?;
1289
1290            if new_parent_channel.is_none() {
1291                channel_member::ActiveModel {
1292                    id: ActiveValue::NotSet,
1293                    channel_id: ActiveValue::Set(channel_id),
1294                    user_id: ActiveValue::Set(admin_id),
1295                    accepted: ActiveValue::Set(true),
1296                    role: ActiveValue::Set(ChannelRole::Admin),
1297                }
1298                .insert(&*tx)
1299                .await?;
1300            }
1301
1302            let descendent_ids =
1303                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1304                    self.pool.get_database_backend(),
1305                    "
1306                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1307                    WHERE parent_path LIKE $3 || '%'
1308                    RETURNING id
1309                ",
1310                    [old_path.clone().into(), new_path.into(), old_path.into()],
1311                ))
1312                .all(&*tx)
1313                .await?;
1314
1315            Ok(Some(MoveChannelResult {
1316                previous_participants,
1317                descendent_ids,
1318            }))
1319        })
1320        .await
1321    }
1322}
1323
1324#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1325enum QueryIds {
1326    Id,
1327}
1328
1329#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1330enum QueryUserIds {
1331    UserId,
1332}