channels.rs

   1use super::*;
   2use rpc::proto::channel_member::Kind;
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self
  23            .create_channel(name, None, creator_id)
  24            .await?
  25            .channel
  26            .id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .channel
  40            .id)
  41    }
  42
  43    /// Creates a new channel.
  44    pub async fn create_channel(
  45        &self,
  46        name: &str,
  47        parent_channel_id: Option<ChannelId>,
  48        admin_id: UserId,
  49    ) -> Result<CreateChannelResult> {
  50        let name = Self::sanitize_channel_name(name)?;
  51        self.transaction(move |tx| async move {
  52            let mut parent = None;
  53
  54            if let Some(parent_channel_id) = parent_channel_id {
  55                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  56                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  57                    .await?;
  58                parent = Some(parent_channel);
  59            }
  60
  61            let channel = channel::ActiveModel {
  62                id: ActiveValue::NotSet,
  63                name: ActiveValue::Set(name.to_string()),
  64                visibility: ActiveValue::Set(ChannelVisibility::Members),
  65                parent_path: ActiveValue::Set(
  66                    parent
  67                        .as_ref()
  68                        .map_or(String::new(), |parent| parent.path()),
  69                ),
  70                requires_zed_cla: ActiveValue::NotSet,
  71            }
  72            .insert(&*tx)
  73            .await?;
  74
  75            let participants_to_update;
  76            if let Some(parent) = &parent {
  77                participants_to_update = self
  78                    .participants_to_notify_for_channel_change(parent, &*tx)
  79                    .await?;
  80            } else {
  81                participants_to_update = vec![];
  82
  83                channel_member::ActiveModel {
  84                    id: ActiveValue::NotSet,
  85                    channel_id: ActiveValue::Set(channel.id),
  86                    user_id: ActiveValue::Set(admin_id),
  87                    accepted: ActiveValue::Set(true),
  88                    role: ActiveValue::Set(ChannelRole::Admin),
  89                }
  90                .insert(&*tx)
  91                .await?;
  92            };
  93
  94            Ok(CreateChannelResult {
  95                channel: Channel::from_model(channel, ChannelRole::Admin),
  96                participants_to_update,
  97            })
  98        })
  99        .await
 100    }
 101
 102    /// Adds a user to the specified channel.
 103    pub async fn join_channel(
 104        &self,
 105        channel_id: ChannelId,
 106        user_id: UserId,
 107        connection: ConnectionId,
 108        environment: &str,
 109    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 110        self.transaction(move |tx| async move {
 111            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 112            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 113
 114            let mut accept_invite_result = None;
 115
 116            if role.is_none() {
 117                if let Some(invitation) = self
 118                    .pending_invite_for_channel(&channel, user_id, &*tx)
 119                    .await?
 120                {
 121                    // note, this may be a parent channel
 122                    role = Some(invitation.role);
 123                    channel_member::Entity::update(channel_member::ActiveModel {
 124                        accepted: ActiveValue::Set(true),
 125                        ..invitation.into_active_model()
 126                    })
 127                    .exec(&*tx)
 128                    .await?;
 129
 130                    accept_invite_result = Some(
 131                        self.calculate_membership_updated(&channel, user_id, &*tx)
 132                            .await?,
 133                    );
 134
 135                    debug_assert!(
 136                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 137                    );
 138                } else if channel.visibility == ChannelVisibility::Public {
 139                    role = Some(ChannelRole::Guest);
 140                    let channel_to_join = self
 141                        .public_ancestors_including_self(&channel, &*tx)
 142                        .await?
 143                        .first()
 144                        .cloned()
 145                        .unwrap_or(channel.clone());
 146
 147                    channel_member::Entity::insert(channel_member::ActiveModel {
 148                        id: ActiveValue::NotSet,
 149                        channel_id: ActiveValue::Set(channel_to_join.id),
 150                        user_id: ActiveValue::Set(user_id),
 151                        accepted: ActiveValue::Set(true),
 152                        role: ActiveValue::Set(ChannelRole::Guest),
 153                    })
 154                    .exec(&*tx)
 155                    .await?;
 156
 157                    accept_invite_result = Some(
 158                        self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
 159                            .await?,
 160                    );
 161
 162                    debug_assert!(
 163                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 164                    );
 165                }
 166            }
 167
 168            if role.is_none() || role == Some(ChannelRole::Banned) {
 169                Err(anyhow!("not allowed"))?
 170            }
 171            let role = role.unwrap();
 172
 173            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 174            let room_id = self
 175                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 176                .await?;
 177
 178            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 179                .await
 180                .map(|jr| (jr, accept_invite_result, role))
 181        })
 182        .await
 183    }
 184
 185    /// Sets the visibiltity of the given channel.
 186    pub async fn set_channel_visibility(
 187        &self,
 188        channel_id: ChannelId,
 189        visibility: ChannelVisibility,
 190        admin_id: UserId,
 191    ) -> Result<SetChannelVisibilityResult> {
 192        self.transaction(move |tx| async move {
 193            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 194
 195            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 196                .await?;
 197
 198            let previous_members = self
 199                .get_channel_participant_details_internal(&channel, &*tx)
 200                .await?;
 201
 202            let mut model = channel.into_active_model();
 203            model.visibility = ActiveValue::Set(visibility);
 204            let channel = model.update(&*tx).await?;
 205
 206            let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
 207                .participants_to_notify_for_channel_change(&channel, &*tx)
 208                .await?
 209                .into_iter()
 210                .collect();
 211
 212            let mut channels_to_remove: Vec<ChannelId> = vec![];
 213            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
 214            match visibility {
 215                ChannelVisibility::Members => {
 216                    let all_descendents: Vec<ChannelId> = self
 217                        .get_channel_descendants_including_self(vec![channel_id], &*tx)
 218                        .await?
 219                        .into_iter()
 220                        .map(|channel| channel.id)
 221                        .collect();
 222
 223                    channels_to_remove = channel::Entity::find()
 224                        .filter(
 225                            channel::Column::Id
 226                                .is_in(all_descendents)
 227                                .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
 228                        )
 229                        .all(&*tx)
 230                        .await?
 231                        .into_iter()
 232                        .map(|channel| channel.id)
 233                        .collect();
 234
 235                    channels_to_remove.push(channel_id);
 236
 237                    for member in previous_members {
 238                        if member.role.can_only_see_public_descendants() {
 239                            participants_to_remove.insert(member.user_id);
 240                        }
 241                    }
 242                }
 243                ChannelVisibility::Public => {
 244                    if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
 245                        let parent_updates = self
 246                            .participants_to_notify_for_channel_change(&public_parent, &*tx)
 247                            .await?;
 248
 249                        for (user_id, channels) in parent_updates {
 250                            participants_to_update.insert(user_id, channels);
 251                        }
 252                    }
 253                }
 254            }
 255
 256            Ok(SetChannelVisibilityResult {
 257                participants_to_update,
 258                participants_to_remove,
 259                channels_to_remove,
 260            })
 261        })
 262        .await
 263    }
 264
 265    #[cfg(test)]
 266    pub async fn set_channel_requires_zed_cla(
 267        &self,
 268        channel_id: ChannelId,
 269        requires_zed_cla: bool,
 270    ) -> Result<()> {
 271        self.transaction(move |tx| async move {
 272            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 273            let mut model = channel.into_active_model();
 274            model.requires_zed_cla = ActiveValue::Set(requires_zed_cla);
 275            model.update(&*tx).await?;
 276            Ok(())
 277        })
 278        .await
 279    }
 280
 281    /// Deletes the channel with the specified ID.
 282    pub async fn delete_channel(
 283        &self,
 284        channel_id: ChannelId,
 285        user_id: UserId,
 286    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 287        self.transaction(move |tx| async move {
 288            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 289            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 290                .await?;
 291
 292            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 293                .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 294                .select_only()
 295                .column(channel_member::Column::UserId)
 296                .distinct()
 297                .into_values::<_, QueryUserIds>()
 298                .all(&*tx)
 299                .await?;
 300
 301            let channels_to_remove = self
 302                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 303                .await?
 304                .into_iter()
 305                .map(|channel| channel.id)
 306                .collect::<Vec<_>>();
 307
 308            channel::Entity::delete_many()
 309                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 310                .exec(&*tx)
 311                .await?;
 312
 313            Ok((channels_to_remove, members_to_notify))
 314        })
 315        .await
 316    }
 317
 318    /// Invites a user to a channel as a member.
 319    pub async fn invite_channel_member(
 320        &self,
 321        channel_id: ChannelId,
 322        invitee_id: UserId,
 323        inviter_id: UserId,
 324        role: ChannelRole,
 325    ) -> Result<InviteMemberResult> {
 326        self.transaction(move |tx| async move {
 327            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 328            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 329                .await?;
 330
 331            channel_member::ActiveModel {
 332                id: ActiveValue::NotSet,
 333                channel_id: ActiveValue::Set(channel_id),
 334                user_id: ActiveValue::Set(invitee_id),
 335                accepted: ActiveValue::Set(false),
 336                role: ActiveValue::Set(role),
 337            }
 338            .insert(&*tx)
 339            .await?;
 340
 341            let channel = Channel::from_model(channel, role);
 342
 343            let notifications = self
 344                .create_notification(
 345                    invitee_id,
 346                    rpc::Notification::ChannelInvitation {
 347                        channel_id: channel_id.to_proto(),
 348                        channel_name: channel.name.clone(),
 349                        inviter_id: inviter_id.to_proto(),
 350                    },
 351                    true,
 352                    &*tx,
 353                )
 354                .await?
 355                .into_iter()
 356                .collect();
 357
 358            Ok(InviteMemberResult {
 359                channel,
 360                notifications,
 361            })
 362        })
 363        .await
 364    }
 365
 366    fn sanitize_channel_name(name: &str) -> Result<&str> {
 367        let new_name = name.trim().trim_start_matches('#');
 368        if new_name == "" {
 369            Err(anyhow!("channel name can't be blank"))?;
 370        }
 371        Ok(new_name)
 372    }
 373
 374    /// Renames the specified channel.
 375    pub async fn rename_channel(
 376        &self,
 377        channel_id: ChannelId,
 378        admin_id: UserId,
 379        new_name: &str,
 380    ) -> Result<RenameChannelResult> {
 381        self.transaction(move |tx| async move {
 382            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 383
 384            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 385            let role = self
 386                .check_user_is_channel_admin(&channel, admin_id, &*tx)
 387                .await?;
 388
 389            let mut model = channel.into_active_model();
 390            model.name = ActiveValue::Set(new_name.clone());
 391            let channel = model.update(&*tx).await?;
 392
 393            let participants = self
 394                .get_channel_participant_details_internal(&channel, &*tx)
 395                .await?;
 396
 397            Ok(RenameChannelResult {
 398                channel: Channel::from_model(channel.clone(), role),
 399                participants_to_update: participants
 400                    .iter()
 401                    .map(|participant| {
 402                        (
 403                            participant.user_id,
 404                            Channel::from_model(channel.clone(), participant.role),
 405                        )
 406                    })
 407                    .collect(),
 408            })
 409        })
 410        .await
 411    }
 412
 413    /// accept or decline an invite to join a channel
 414    pub async fn respond_to_channel_invite(
 415        &self,
 416        channel_id: ChannelId,
 417        user_id: UserId,
 418        accept: bool,
 419    ) -> Result<RespondToChannelInvite> {
 420        self.transaction(move |tx| async move {
 421            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 422
 423            let membership_update = if accept {
 424                let rows_affected = channel_member::Entity::update_many()
 425                    .set(channel_member::ActiveModel {
 426                        accepted: ActiveValue::Set(accept),
 427                        ..Default::default()
 428                    })
 429                    .filter(
 430                        channel_member::Column::ChannelId
 431                            .eq(channel_id)
 432                            .and(channel_member::Column::UserId.eq(user_id))
 433                            .and(channel_member::Column::Accepted.eq(false)),
 434                    )
 435                    .exec(&*tx)
 436                    .await?
 437                    .rows_affected;
 438
 439                if rows_affected == 0 {
 440                    Err(anyhow!("no such invitation"))?;
 441                }
 442
 443                Some(
 444                    self.calculate_membership_updated(&channel, user_id, &*tx)
 445                        .await?,
 446                )
 447            } else {
 448                let rows_affected = channel_member::Entity::delete_many()
 449                    .filter(
 450                        channel_member::Column::ChannelId
 451                            .eq(channel_id)
 452                            .and(channel_member::Column::UserId.eq(user_id))
 453                            .and(channel_member::Column::Accepted.eq(false)),
 454                    )
 455                    .exec(&*tx)
 456                    .await?
 457                    .rows_affected;
 458                if rows_affected == 0 {
 459                    Err(anyhow!("no such invitation"))?;
 460                }
 461
 462                None
 463            };
 464
 465            Ok(RespondToChannelInvite {
 466                membership_update,
 467                notifications: self
 468                    .mark_notification_as_read_with_response(
 469                        user_id,
 470                        &rpc::Notification::ChannelInvitation {
 471                            channel_id: channel_id.to_proto(),
 472                            channel_name: Default::default(),
 473                            inviter_id: Default::default(),
 474                        },
 475                        accept,
 476                        &*tx,
 477                    )
 478                    .await?
 479                    .into_iter()
 480                    .collect(),
 481            })
 482        })
 483        .await
 484    }
 485
 486    async fn calculate_membership_updated(
 487        &self,
 488        channel: &channel::Model,
 489        user_id: UserId,
 490        tx: &DatabaseTransaction,
 491    ) -> Result<MembershipUpdated> {
 492        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 493        let removed_channels = self
 494            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 495            .await?
 496            .into_iter()
 497            .filter_map(|channel| {
 498                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 499                    Some(channel.id)
 500                } else {
 501                    None
 502                }
 503            })
 504            .collect::<Vec<_>>();
 505
 506        Ok(MembershipUpdated {
 507            channel_id: channel.id,
 508            new_channels,
 509            removed_channels,
 510        })
 511    }
 512
 513    /// Removes a channel member.
 514    pub async fn remove_channel_member(
 515        &self,
 516        channel_id: ChannelId,
 517        member_id: UserId,
 518        admin_id: UserId,
 519    ) -> Result<RemoveChannelMemberResult> {
 520        self.transaction(|tx| async move {
 521            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 522            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 523                .await?;
 524
 525            let result = channel_member::Entity::delete_many()
 526                .filter(
 527                    channel_member::Column::ChannelId
 528                        .eq(channel_id)
 529                        .and(channel_member::Column::UserId.eq(member_id)),
 530                )
 531                .exec(&*tx)
 532                .await?;
 533
 534            if result.rows_affected == 0 {
 535                Err(anyhow!("no such member"))?;
 536            }
 537
 538            Ok(RemoveChannelMemberResult {
 539                membership_update: self
 540                    .calculate_membership_updated(&channel, member_id, &*tx)
 541                    .await?,
 542                notification_id: self
 543                    .remove_notification(
 544                        member_id,
 545                        rpc::Notification::ChannelInvitation {
 546                            channel_id: channel_id.to_proto(),
 547                            channel_name: Default::default(),
 548                            inviter_id: Default::default(),
 549                        },
 550                        &*tx,
 551                    )
 552                    .await?,
 553            })
 554        })
 555        .await
 556    }
 557
 558    /// Returns all channel invites for the user with the given ID.
 559    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 560        self.transaction(|tx| async move {
 561            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 562
 563            let channel_invites = channel_member::Entity::find()
 564                .filter(
 565                    channel_member::Column::UserId
 566                        .eq(user_id)
 567                        .and(channel_member::Column::Accepted.eq(false)),
 568                )
 569                .all(&*tx)
 570                .await?;
 571
 572            for invite in channel_invites {
 573                role_for_channel.insert(invite.channel_id, invite.role);
 574            }
 575
 576            let channels = channel::Entity::find()
 577                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 578                .all(&*tx)
 579                .await?;
 580
 581            let channels = channels
 582                .into_iter()
 583                .filter_map(|channel| {
 584                    let role = *role_for_channel.get(&channel.id)?;
 585                    Some(Channel::from_model(channel, role))
 586                })
 587                .collect();
 588
 589            Ok(channels)
 590        })
 591        .await
 592    }
 593
 594    /// Returns all channels for the user with the given ID.
 595    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 596        self.transaction(|tx| async move {
 597            let tx = tx;
 598
 599            self.get_user_channels(user_id, None, &tx).await
 600        })
 601        .await
 602    }
 603
 604    /// Returns all channels for the user with the given ID that are descendants
 605    /// of the specified ancestor channel.
 606    pub async fn get_user_channels(
 607        &self,
 608        user_id: UserId,
 609        ancestor_channel: Option<&channel::Model>,
 610        tx: &DatabaseTransaction,
 611    ) -> Result<ChannelsForUser> {
 612        let channel_memberships = channel_member::Entity::find()
 613            .filter(
 614                channel_member::Column::UserId
 615                    .eq(user_id)
 616                    .and(channel_member::Column::Accepted.eq(true)),
 617            )
 618            .all(&*tx)
 619            .await?;
 620
 621        let descendants = self
 622            .get_channel_descendants_including_self(
 623                channel_memberships.iter().map(|m| m.channel_id),
 624                &*tx,
 625            )
 626            .await?;
 627
 628        let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
 629        for membership in channel_memberships.iter() {
 630            roles_by_channel_id.insert(membership.channel_id, membership.role);
 631        }
 632
 633        let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
 634
 635        let channels: Vec<Channel> = descendants
 636            .into_iter()
 637            .filter_map(|channel| {
 638                let parent_role = channel
 639                    .parent_id()
 640                    .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
 641
 642                let role = if let Some(parent_role) = parent_role {
 643                    let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
 644                        existing_role.max(*parent_role)
 645                    } else {
 646                        *parent_role
 647                    };
 648                    roles_by_channel_id.insert(channel.id, role);
 649                    role
 650                } else {
 651                    *roles_by_channel_id.get(&channel.id)?
 652                };
 653
 654                let can_see_parent_paths = role.can_see_all_descendants()
 655                    || role.can_only_see_public_descendants()
 656                        && channel.visibility == ChannelVisibility::Public;
 657                if !can_see_parent_paths {
 658                    return None;
 659                }
 660
 661                visible_channel_ids.insert(channel.id);
 662
 663                if let Some(ancestor) = ancestor_channel {
 664                    if !channel
 665                        .ancestors_including_self()
 666                        .any(|id| id == ancestor.id)
 667                    {
 668                        return None;
 669                    }
 670                }
 671
 672                let mut channel = Channel::from_model(channel, role);
 673                channel
 674                    .parent_path
 675                    .retain(|id| visible_channel_ids.contains(&id));
 676
 677                Some(channel)
 678            })
 679            .collect();
 680
 681        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 682        enum QueryUserIdsAndChannelIds {
 683            ChannelId,
 684            UserId,
 685        }
 686
 687        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 688        {
 689            let mut rows = room_participant::Entity::find()
 690                .inner_join(room::Entity)
 691                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 692                .select_only()
 693                .column(room::Column::ChannelId)
 694                .column(room_participant::Column::UserId)
 695                .into_values::<_, QueryUserIdsAndChannelIds>()
 696                .stream(&*tx)
 697                .await?;
 698            while let Some(row) = rows.next().await {
 699                let row: (ChannelId, UserId) = row?;
 700                channel_participants.entry(row.0).or_default().push(row.1)
 701            }
 702        }
 703
 704        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 705        let channel_buffer_changes = self
 706            .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
 707            .await?;
 708
 709        let unseen_messages = self
 710            .unseen_channel_messages(user_id, &channel_ids, &*tx)
 711            .await?;
 712
 713        Ok(ChannelsForUser {
 714            channels,
 715            channel_participants,
 716            unseen_buffer_changes: channel_buffer_changes,
 717            channel_messages: unseen_messages,
 718        })
 719    }
 720
 721    async fn participants_to_notify_for_channel_change(
 722        &self,
 723        new_parent: &channel::Model,
 724        tx: &DatabaseTransaction,
 725    ) -> Result<Vec<(UserId, ChannelsForUser)>> {
 726        let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
 727
 728        let members = self
 729            .get_channel_participant_details_internal(new_parent, &*tx)
 730            .await?;
 731
 732        for member in members.iter() {
 733            if !member.role.can_see_all_descendants() {
 734                continue;
 735            }
 736            results.push((
 737                member.user_id,
 738                self.get_user_channels(member.user_id, Some(new_parent), &*tx)
 739                    .await?,
 740            ))
 741        }
 742
 743        let public_parents = self
 744            .public_ancestors_including_self(new_parent, &*tx)
 745            .await?;
 746        let public_parent = public_parents.last();
 747
 748        let Some(public_parent) = public_parent else {
 749            return Ok(results);
 750        };
 751
 752        // could save some time in the common case by skipping this if the
 753        // new channel is not public and has no public descendants.
 754        let public_members = if public_parent == new_parent {
 755            members
 756        } else {
 757            self.get_channel_participant_details_internal(public_parent, &*tx)
 758                .await?
 759        };
 760
 761        for member in public_members {
 762            if !member.role.can_only_see_public_descendants() {
 763                continue;
 764            };
 765            results.push((
 766                member.user_id,
 767                self.get_user_channels(member.user_id, Some(public_parent), &*tx)
 768                    .await?,
 769            ))
 770        }
 771
 772        Ok(results)
 773    }
 774
 775    /// Sets the role for the specified channel member.
 776    pub async fn set_channel_member_role(
 777        &self,
 778        channel_id: ChannelId,
 779        admin_id: UserId,
 780        for_user: UserId,
 781        role: ChannelRole,
 782    ) -> Result<SetMemberRoleResult> {
 783        self.transaction(|tx| async move {
 784            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 785            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 786                .await?;
 787
 788            let membership = channel_member::Entity::find()
 789                .filter(
 790                    channel_member::Column::ChannelId
 791                        .eq(channel_id)
 792                        .and(channel_member::Column::UserId.eq(for_user)),
 793                )
 794                .one(&*tx)
 795                .await?;
 796
 797            let Some(membership) = membership else {
 798                Err(anyhow!("no such member"))?
 799            };
 800
 801            let mut update = membership.into_active_model();
 802            update.role = ActiveValue::Set(role);
 803            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 804
 805            if updated.accepted {
 806                Ok(SetMemberRoleResult::MembershipUpdated(
 807                    self.calculate_membership_updated(&channel, for_user, &*tx)
 808                        .await?,
 809                ))
 810            } else {
 811                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 812                    channel, role,
 813                )))
 814            }
 815        })
 816        .await
 817    }
 818
 819    /// Returns the details for the specified channel member.
 820    pub async fn get_channel_participant_details(
 821        &self,
 822        channel_id: ChannelId,
 823        user_id: UserId,
 824    ) -> Result<Vec<proto::ChannelMember>> {
 825        let (role, members) = self
 826            .transaction(move |tx| async move {
 827                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 828                let role = self
 829                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 830                    .await?;
 831                Ok((
 832                    role,
 833                    self.get_channel_participant_details_internal(&channel, &*tx)
 834                        .await?,
 835                ))
 836            })
 837            .await?;
 838
 839        if role == ChannelRole::Admin {
 840            Ok(members
 841                .into_iter()
 842                .map(|channel_member| channel_member.to_proto())
 843                .collect())
 844        } else {
 845            return Ok(members
 846                .into_iter()
 847                .filter_map(|member| {
 848                    if member.kind == proto::channel_member::Kind::Invitee {
 849                        return None;
 850                    }
 851                    Some(ChannelMember {
 852                        role: member.role,
 853                        user_id: member.user_id,
 854                        kind: proto::channel_member::Kind::Member,
 855                    })
 856                })
 857                .map(|channel_member| channel_member.to_proto())
 858                .collect());
 859        }
 860    }
 861
 862    async fn get_channel_participant_details_internal(
 863        &self,
 864        channel: &channel::Model,
 865        tx: &DatabaseTransaction,
 866    ) -> Result<Vec<ChannelMember>> {
 867        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 868        enum QueryMemberDetails {
 869            UserId,
 870            Role,
 871            IsDirectMember,
 872            Accepted,
 873            Visibility,
 874        }
 875
 876        let mut stream = channel_member::Entity::find()
 877            .left_join(channel::Entity)
 878            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 879            .select_only()
 880            .column(channel_member::Column::UserId)
 881            .column(channel_member::Column::Role)
 882            .column_as(
 883                channel_member::Column::ChannelId.eq(channel.id),
 884                QueryMemberDetails::IsDirectMember,
 885            )
 886            .column(channel_member::Column::Accepted)
 887            .column(channel::Column::Visibility)
 888            .into_values::<_, QueryMemberDetails>()
 889            .stream(&*tx)
 890            .await?;
 891
 892        let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
 893
 894        while let Some(user_membership) = stream.next().await {
 895            let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
 896                UserId,
 897                ChannelRole,
 898                bool,
 899                bool,
 900                ChannelVisibility,
 901            ) = user_membership?;
 902            let kind = match (is_direct_member, is_invite_accepted) {
 903                (true, true) => proto::channel_member::Kind::Member,
 904                (true, false) => proto::channel_member::Kind::Invitee,
 905                (false, true) => proto::channel_member::Kind::AncestorMember,
 906                (false, false) => continue,
 907            };
 908
 909            if channel_role == ChannelRole::Guest
 910                && visibility != ChannelVisibility::Public
 911                && channel.visibility != ChannelVisibility::Public
 912            {
 913                continue;
 914            }
 915
 916            if let Some(details_mut) = user_details.get_mut(&user_id) {
 917                if channel_role.should_override(details_mut.role) {
 918                    details_mut.role = channel_role;
 919                }
 920                if kind == Kind::Member {
 921                    details_mut.kind = kind;
 922                // the UI is going to be a bit confusing if you already have permissions
 923                // that are greater than or equal to the ones you're being invited to.
 924                } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
 925                    details_mut.kind = kind;
 926                }
 927            } else {
 928                user_details.insert(
 929                    user_id,
 930                    ChannelMember {
 931                        user_id,
 932                        kind,
 933                        role: channel_role,
 934                    },
 935                );
 936            }
 937        }
 938
 939        Ok(user_details
 940            .into_iter()
 941            .map(|(_, details)| details)
 942            .collect())
 943    }
 944
 945    /// Returns the participants in the given channel.
 946    pub async fn get_channel_participants(
 947        &self,
 948        channel: &channel::Model,
 949        tx: &DatabaseTransaction,
 950    ) -> Result<Vec<UserId>> {
 951        let participants = self
 952            .get_channel_participant_details_internal(channel, &*tx)
 953            .await?;
 954        Ok(participants
 955            .into_iter()
 956            .map(|member| member.user_id)
 957            .collect())
 958    }
 959
 960    /// Returns whether the given user is an admin in the specified channel.
 961    pub async fn check_user_is_channel_admin(
 962        &self,
 963        channel: &channel::Model,
 964        user_id: UserId,
 965        tx: &DatabaseTransaction,
 966    ) -> Result<ChannelRole> {
 967        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 968        match role {
 969            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 970            Some(ChannelRole::Member)
 971            | Some(ChannelRole::Banned)
 972            | Some(ChannelRole::Guest)
 973            | None => Err(anyhow!(
 974                "user is not a channel admin or channel does not exist"
 975            ))?,
 976        }
 977    }
 978
 979    /// Returns whether the given user is a member of the specified channel.
 980    pub async fn check_user_is_channel_member(
 981        &self,
 982        channel: &channel::Model,
 983        user_id: UserId,
 984        tx: &DatabaseTransaction,
 985    ) -> Result<ChannelRole> {
 986        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 987        match channel_role {
 988            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 989            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 990                "user is not a channel member or channel does not exist"
 991            ))?,
 992        }
 993    }
 994
 995    /// Returns whether the given user is a participant in the specified channel.
 996    pub async fn check_user_is_channel_participant(
 997        &self,
 998        channel: &channel::Model,
 999        user_id: UserId,
1000        tx: &DatabaseTransaction,
1001    ) -> Result<ChannelRole> {
1002        let role = self.channel_role_for_user(channel, user_id, tx).await?;
1003        match role {
1004            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
1005                Ok(role.unwrap())
1006            }
1007            Some(ChannelRole::Banned) | None => Err(anyhow!(
1008                "user is not a channel participant or channel does not exist"
1009            ))?,
1010        }
1011    }
1012
1013    /// Returns a user's pending invite for the given channel, if one exists.
1014    pub async fn pending_invite_for_channel(
1015        &self,
1016        channel: &channel::Model,
1017        user_id: UserId,
1018        tx: &DatabaseTransaction,
1019    ) -> Result<Option<channel_member::Model>> {
1020        let row = channel_member::Entity::find()
1021            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
1022            .filter(channel_member::Column::UserId.eq(user_id))
1023            .filter(channel_member::Column::Accepted.eq(false))
1024            .one(&*tx)
1025            .await?;
1026
1027        Ok(row)
1028    }
1029
1030    async fn public_parent_channel(
1031        &self,
1032        channel: &channel::Model,
1033        tx: &DatabaseTransaction,
1034    ) -> Result<Option<channel::Model>> {
1035        let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1036        if path.last().unwrap().id == channel.id {
1037            path.pop();
1038        }
1039        Ok(path.pop())
1040    }
1041
1042    pub(crate) async fn public_ancestors_including_self(
1043        &self,
1044        channel: &channel::Model,
1045        tx: &DatabaseTransaction,
1046    ) -> Result<Vec<channel::Model>> {
1047        let visible_channels = channel::Entity::find()
1048            .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1049            .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1050            .order_by_asc(channel::Column::ParentPath)
1051            .all(&*tx)
1052            .await?;
1053
1054        Ok(visible_channels)
1055    }
1056
1057    /// Returns the role for a user in the given channel.
1058    pub async fn channel_role_for_user(
1059        &self,
1060        channel: &channel::Model,
1061        user_id: UserId,
1062        tx: &DatabaseTransaction,
1063    ) -> Result<Option<ChannelRole>> {
1064        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1065        enum QueryChannelMembership {
1066            ChannelId,
1067            Role,
1068            Visibility,
1069        }
1070
1071        let mut rows = channel_member::Entity::find()
1072            .left_join(channel::Entity)
1073            .filter(
1074                channel_member::Column::ChannelId
1075                    .is_in(channel.ancestors_including_self())
1076                    .and(channel_member::Column::UserId.eq(user_id))
1077                    .and(channel_member::Column::Accepted.eq(true)),
1078            )
1079            .select_only()
1080            .column(channel_member::Column::ChannelId)
1081            .column(channel_member::Column::Role)
1082            .column(channel::Column::Visibility)
1083            .into_values::<_, QueryChannelMembership>()
1084            .stream(&*tx)
1085            .await?;
1086
1087        let mut user_role: Option<ChannelRole> = None;
1088
1089        let mut is_participant = false;
1090        let mut current_channel_visibility = None;
1091
1092        // note these channels are not iterated in any particular order,
1093        // our current logic takes the highest permission available.
1094        while let Some(row) = rows.next().await {
1095            let (membership_channel, role, visibility): (
1096                ChannelId,
1097                ChannelRole,
1098                ChannelVisibility,
1099            ) = row?;
1100
1101            match role {
1102                ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1103                    if let Some(users_role) = user_role {
1104                        user_role = Some(users_role.max(role));
1105                    } else {
1106                        user_role = Some(role)
1107                    }
1108                }
1109                ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1110                    is_participant = true
1111                }
1112                ChannelRole::Guest => {}
1113            }
1114            if channel.id == membership_channel {
1115                current_channel_visibility = Some(visibility);
1116            }
1117        }
1118        // free up database connection
1119        drop(rows);
1120
1121        if is_participant && user_role.is_none() {
1122            if current_channel_visibility.is_none() {
1123                current_channel_visibility = channel::Entity::find()
1124                    .filter(channel::Column::Id.eq(channel.id))
1125                    .one(&*tx)
1126                    .await?
1127                    .map(|channel| channel.visibility);
1128            }
1129            if current_channel_visibility == Some(ChannelVisibility::Public) {
1130                user_role = Some(ChannelRole::Guest);
1131            }
1132        }
1133
1134        Ok(user_role)
1135    }
1136
1137    // Get the descendants of the given set if channels, ordered by their
1138    // path.
1139    async fn get_channel_descendants_including_self(
1140        &self,
1141        channel_ids: impl IntoIterator<Item = ChannelId>,
1142        tx: &DatabaseTransaction,
1143    ) -> Result<Vec<channel::Model>> {
1144        let mut values = String::new();
1145        for id in channel_ids {
1146            if !values.is_empty() {
1147                values.push_str(", ");
1148            }
1149            write!(&mut values, "({})", id).unwrap();
1150        }
1151
1152        if values.is_empty() {
1153            return Ok(vec![]);
1154        }
1155
1156        let sql = format!(
1157            r#"
1158            SELECT DISTINCT
1159                descendant_channels.*,
1160                descendant_channels.parent_path || descendant_channels.id as full_path
1161            FROM
1162                channels parent_channels, channels descendant_channels
1163            WHERE
1164                descendant_channels.id IN ({values}) OR
1165                (
1166                    parent_channels.id IN ({values}) AND
1167                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1168                )
1169            ORDER BY
1170                full_path ASC
1171            "#
1172        );
1173
1174        Ok(channel::Entity::find()
1175            .from_raw_sql(Statement::from_string(
1176                self.pool.get_database_backend(),
1177                sql,
1178            ))
1179            .all(tx)
1180            .await?)
1181    }
1182
1183    /// Returns the channel with the given ID.
1184    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1185        self.transaction(|tx| async move {
1186            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1187            let role = self
1188                .check_user_is_channel_participant(&channel, user_id, &*tx)
1189                .await?;
1190
1191            Ok(Channel::from_model(channel, role))
1192        })
1193        .await
1194    }
1195
1196    pub(crate) async fn get_channel_internal(
1197        &self,
1198        channel_id: ChannelId,
1199        tx: &DatabaseTransaction,
1200    ) -> Result<channel::Model> {
1201        Ok(channel::Entity::find_by_id(channel_id)
1202            .one(&*tx)
1203            .await?
1204            .ok_or_else(|| anyhow!("no such channel"))?)
1205    }
1206
1207    pub(crate) async fn get_or_create_channel_room(
1208        &self,
1209        channel_id: ChannelId,
1210        live_kit_room: &str,
1211        environment: &str,
1212        tx: &DatabaseTransaction,
1213    ) -> Result<RoomId> {
1214        let room = room::Entity::find()
1215            .filter(room::Column::ChannelId.eq(channel_id))
1216            .one(&*tx)
1217            .await?;
1218
1219        let room_id = if let Some(room) = room {
1220            if let Some(env) = room.environment {
1221                if &env != environment {
1222                    Err(anyhow!("must join using the {} release", env))?;
1223                }
1224            }
1225            room.id
1226        } else {
1227            let result = room::Entity::insert(room::ActiveModel {
1228                channel_id: ActiveValue::Set(Some(channel_id)),
1229                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1230                environment: ActiveValue::Set(Some(environment.to_string())),
1231                ..Default::default()
1232            })
1233            .exec(&*tx)
1234            .await?;
1235
1236            result.last_insert_id
1237        };
1238
1239        Ok(room_id)
1240    }
1241
1242    /// Move a channel from one parent to another
1243    pub async fn move_channel(
1244        &self,
1245        channel_id: ChannelId,
1246        new_parent_id: Option<ChannelId>,
1247        admin_id: UserId,
1248    ) -> Result<Option<MoveChannelResult>> {
1249        self.transaction(|tx| async move {
1250            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1251            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1252                .await?;
1253
1254            let new_parent_path;
1255            let new_parent_channel;
1256            if let Some(new_parent_id) = new_parent_id {
1257                let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1258                self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1259                    .await?;
1260
1261                if new_parent
1262                    .ancestors_including_self()
1263                    .any(|id| id == channel.id)
1264                {
1265                    Err(anyhow!("cannot move a channel into one of its descendants"))?;
1266                }
1267
1268                new_parent_path = new_parent.path();
1269                new_parent_channel = Some(new_parent);
1270            } else {
1271                new_parent_path = String::new();
1272                new_parent_channel = None;
1273            };
1274
1275            let previous_participants = self
1276                .get_channel_participant_details_internal(&channel, &*tx)
1277                .await?;
1278
1279            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1280            let new_path = format!("{}{}/", new_parent_path, channel.id);
1281
1282            if old_path == new_path {
1283                return Ok(None);
1284            }
1285
1286            let mut model = channel.into_active_model();
1287            model.parent_path = ActiveValue::Set(new_parent_path);
1288            let channel = model.update(&*tx).await?;
1289
1290            if new_parent_channel.is_none() {
1291                channel_member::ActiveModel {
1292                    id: ActiveValue::NotSet,
1293                    channel_id: ActiveValue::Set(channel_id),
1294                    user_id: ActiveValue::Set(admin_id),
1295                    accepted: ActiveValue::Set(true),
1296                    role: ActiveValue::Set(ChannelRole::Admin),
1297                }
1298                .insert(&*tx)
1299                .await?;
1300            }
1301
1302            let descendent_ids =
1303                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1304                    self.pool.get_database_backend(),
1305                    "
1306                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1307                    WHERE parent_path LIKE $3 || '%'
1308                    RETURNING id
1309                ",
1310                    [old_path.clone().into(), new_path.into(), old_path.into()],
1311                ))
1312                .all(&*tx)
1313                .await?;
1314
1315            let participants_to_update: HashMap<_, _> = self
1316                .participants_to_notify_for_channel_change(
1317                    new_parent_channel.as_ref().unwrap_or(&channel),
1318                    &*tx,
1319                )
1320                .await?
1321                .into_iter()
1322                .collect();
1323
1324            let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1325            for id in descendent_ids {
1326                moved_channels.insert(id);
1327            }
1328            moved_channels.insert(channel_id);
1329
1330            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1331            for participant in previous_participants {
1332                if participant.kind == proto::channel_member::Kind::AncestorMember {
1333                    if !participants_to_update.contains_key(&participant.user_id) {
1334                        participants_to_remove.insert(participant.user_id);
1335                    }
1336                }
1337            }
1338
1339            Ok(Some(MoveChannelResult {
1340                participants_to_remove,
1341                participants_to_update,
1342                moved_channels,
1343            }))
1344        })
1345        .await
1346    }
1347}
1348
1349#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1350enum QueryIds {
1351    Id,
1352}
1353
1354#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1355enum QueryUserIds {
1356    UserId,
1357}