channels.rs

   1use super::*;
   2use rpc::proto::channel_member::Kind;
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self
  23            .create_channel(name, None, creator_id)
  24            .await?
  25            .channel
  26            .id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .channel
  40            .id)
  41    }
  42
  43    pub async fn create_channel(
  44        &self,
  45        name: &str,
  46        parent_channel_id: Option<ChannelId>,
  47        admin_id: UserId,
  48    ) -> Result<CreateChannelResult> {
  49        let name = Self::sanitize_channel_name(name)?;
  50        self.transaction(move |tx| async move {
  51            let mut parent = None;
  52
  53            if let Some(parent_channel_id) = parent_channel_id {
  54                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  55                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  56                    .await?;
  57                parent = Some(parent_channel);
  58            }
  59
  60            let channel = channel::ActiveModel {
  61                id: ActiveValue::NotSet,
  62                name: ActiveValue::Set(name.to_string()),
  63                visibility: ActiveValue::Set(ChannelVisibility::Members),
  64                parent_path: ActiveValue::Set(
  65                    parent
  66                        .as_ref()
  67                        .map_or(String::new(), |parent| parent.path()),
  68                ),
  69            }
  70            .insert(&*tx)
  71            .await?;
  72
  73            let participants_to_update;
  74            if let Some(parent) = &parent {
  75                participants_to_update = self
  76                    .participants_to_notify_for_channel_change(parent, &*tx)
  77                    .await?;
  78            } else {
  79                participants_to_update = vec![];
  80
  81                channel_member::ActiveModel {
  82                    id: ActiveValue::NotSet,
  83                    channel_id: ActiveValue::Set(channel.id),
  84                    user_id: ActiveValue::Set(admin_id),
  85                    accepted: ActiveValue::Set(true),
  86                    role: ActiveValue::Set(ChannelRole::Admin),
  87                }
  88                .insert(&*tx)
  89                .await?;
  90            };
  91
  92            Ok(CreateChannelResult {
  93                channel: Channel::from_model(channel, ChannelRole::Admin),
  94                participants_to_update,
  95            })
  96        })
  97        .await
  98    }
  99
 100    pub async fn join_channel(
 101        &self,
 102        channel_id: ChannelId,
 103        user_id: UserId,
 104        connection: ConnectionId,
 105        environment: &str,
 106    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 107        self.transaction(move |tx| async move {
 108            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 109            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 110
 111            let mut accept_invite_result = None;
 112
 113            if role.is_none() {
 114                if let Some(invitation) = self
 115                    .pending_invite_for_channel(&channel, user_id, &*tx)
 116                    .await?
 117                {
 118                    // note, this may be a parent channel
 119                    role = Some(invitation.role);
 120                    channel_member::Entity::update(channel_member::ActiveModel {
 121                        accepted: ActiveValue::Set(true),
 122                        ..invitation.into_active_model()
 123                    })
 124                    .exec(&*tx)
 125                    .await?;
 126
 127                    accept_invite_result = Some(
 128                        self.calculate_membership_updated(&channel, user_id, &*tx)
 129                            .await?,
 130                    );
 131
 132                    debug_assert!(
 133                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 134                    );
 135                }
 136            }
 137
 138            if channel.visibility == ChannelVisibility::Public {
 139                role = Some(ChannelRole::Guest);
 140                let channel_to_join = self
 141                    .public_ancestors_including_self(&channel, &*tx)
 142                    .await?
 143                    .first()
 144                    .cloned()
 145                    .unwrap_or(channel.clone());
 146
 147                channel_member::Entity::insert(channel_member::ActiveModel {
 148                    id: ActiveValue::NotSet,
 149                    channel_id: ActiveValue::Set(channel_to_join.id),
 150                    user_id: ActiveValue::Set(user_id),
 151                    accepted: ActiveValue::Set(true),
 152                    role: ActiveValue::Set(ChannelRole::Guest),
 153                })
 154                .exec(&*tx)
 155                .await?;
 156
 157                accept_invite_result = Some(
 158                    self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
 159                        .await?,
 160                );
 161
 162                debug_assert!(self.channel_role_for_user(&channel, user_id, &*tx).await? == role);
 163            }
 164
 165            if role.is_none() || role == Some(ChannelRole::Banned) {
 166                Err(anyhow!("not allowed"))?
 167            }
 168
 169            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 170            let room_id = self
 171                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 172                .await?;
 173
 174            self.join_channel_room_internal(room_id, user_id, connection, &*tx)
 175                .await
 176                .map(|jr| (jr, accept_invite_result, role.unwrap()))
 177        })
 178        .await
 179    }
 180
 181    pub async fn set_channel_visibility(
 182        &self,
 183        channel_id: ChannelId,
 184        visibility: ChannelVisibility,
 185        admin_id: UserId,
 186    ) -> Result<SetChannelVisibilityResult> {
 187        self.transaction(move |tx| async move {
 188            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 189
 190            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 191                .await?;
 192
 193            let previous_members = self
 194                .get_channel_participant_details_internal(&channel, &*tx)
 195                .await?;
 196
 197            let mut model = channel.into_active_model();
 198            model.visibility = ActiveValue::Set(visibility);
 199            let channel = model.update(&*tx).await?;
 200
 201            let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
 202                .participants_to_notify_for_channel_change(&channel, &*tx)
 203                .await?
 204                .into_iter()
 205                .collect();
 206
 207            let mut channels_to_remove: Vec<ChannelId> = vec![];
 208            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
 209            match visibility {
 210                ChannelVisibility::Members => {
 211                    let all_descendents: Vec<ChannelId> = self
 212                        .get_channel_descendants_including_self(vec![channel_id], &*tx)
 213                        .await?
 214                        .into_iter()
 215                        .map(|channel| channel.id)
 216                        .collect();
 217
 218                    channels_to_remove = channel::Entity::find()
 219                        .filter(
 220                            channel::Column::Id
 221                                .is_in(all_descendents)
 222                                .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
 223                        )
 224                        .all(&*tx)
 225                        .await?
 226                        .into_iter()
 227                        .map(|channel| channel.id)
 228                        .collect();
 229
 230                    channels_to_remove.push(channel_id);
 231
 232                    for member in previous_members {
 233                        if member.role.can_only_see_public_descendants() {
 234                            participants_to_remove.insert(member.user_id);
 235                        }
 236                    }
 237                }
 238                ChannelVisibility::Public => {
 239                    if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
 240                        let parent_updates = self
 241                            .participants_to_notify_for_channel_change(&public_parent, &*tx)
 242                            .await?;
 243
 244                        for (user_id, channels) in parent_updates {
 245                            participants_to_update.insert(user_id, channels);
 246                        }
 247                    }
 248                }
 249            }
 250
 251            Ok(SetChannelVisibilityResult {
 252                participants_to_update,
 253                participants_to_remove,
 254                channels_to_remove,
 255            })
 256        })
 257        .await
 258    }
 259
 260    pub async fn delete_channel(
 261        &self,
 262        channel_id: ChannelId,
 263        user_id: UserId,
 264    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 265        self.transaction(move |tx| async move {
 266            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 267            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 268                .await?;
 269
 270            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 271                .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 272                .select_only()
 273                .column(channel_member::Column::UserId)
 274                .distinct()
 275                .into_values::<_, QueryUserIds>()
 276                .all(&*tx)
 277                .await?;
 278
 279            let channels_to_remove = self
 280                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 281                .await?
 282                .into_iter()
 283                .map(|channel| channel.id)
 284                .collect::<Vec<_>>();
 285
 286            channel::Entity::delete_many()
 287                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 288                .exec(&*tx)
 289                .await?;
 290
 291            Ok((channels_to_remove, members_to_notify))
 292        })
 293        .await
 294    }
 295
 296    pub async fn invite_channel_member(
 297        &self,
 298        channel_id: ChannelId,
 299        invitee_id: UserId,
 300        inviter_id: UserId,
 301        role: ChannelRole,
 302    ) -> Result<InviteMemberResult> {
 303        self.transaction(move |tx| async move {
 304            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 305            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 306                .await?;
 307
 308            channel_member::ActiveModel {
 309                id: ActiveValue::NotSet,
 310                channel_id: ActiveValue::Set(channel_id),
 311                user_id: ActiveValue::Set(invitee_id),
 312                accepted: ActiveValue::Set(false),
 313                role: ActiveValue::Set(role),
 314            }
 315            .insert(&*tx)
 316            .await?;
 317
 318            let channel = Channel::from_model(channel, role);
 319
 320            let notifications = self
 321                .create_notification(
 322                    invitee_id,
 323                    rpc::Notification::ChannelInvitation {
 324                        channel_id: channel_id.to_proto(),
 325                        channel_name: channel.name.clone(),
 326                        inviter_id: inviter_id.to_proto(),
 327                    },
 328                    true,
 329                    &*tx,
 330                )
 331                .await?
 332                .into_iter()
 333                .collect();
 334
 335            Ok(InviteMemberResult {
 336                channel,
 337                notifications,
 338            })
 339        })
 340        .await
 341    }
 342
 343    fn sanitize_channel_name(name: &str) -> Result<&str> {
 344        let new_name = name.trim().trim_start_matches('#');
 345        if new_name == "" {
 346            Err(anyhow!("channel name can't be blank"))?;
 347        }
 348        Ok(new_name)
 349    }
 350
 351    pub async fn rename_channel(
 352        &self,
 353        channel_id: ChannelId,
 354        admin_id: UserId,
 355        new_name: &str,
 356    ) -> Result<RenameChannelResult> {
 357        self.transaction(move |tx| async move {
 358            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 359
 360            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 361            let role = self
 362                .check_user_is_channel_admin(&channel, admin_id, &*tx)
 363                .await?;
 364
 365            let mut model = channel.into_active_model();
 366            model.name = ActiveValue::Set(new_name.clone());
 367            let channel = model.update(&*tx).await?;
 368
 369            let participants = self
 370                .get_channel_participant_details_internal(&channel, &*tx)
 371                .await?;
 372
 373            Ok(RenameChannelResult {
 374                channel: Channel::from_model(channel.clone(), role),
 375                participants_to_update: participants
 376                    .iter()
 377                    .map(|participant| {
 378                        (
 379                            participant.user_id,
 380                            Channel::from_model(channel.clone(), participant.role),
 381                        )
 382                    })
 383                    .collect(),
 384            })
 385        })
 386        .await
 387    }
 388
 389    pub async fn respond_to_channel_invite(
 390        &self,
 391        channel_id: ChannelId,
 392        user_id: UserId,
 393        accept: bool,
 394    ) -> Result<RespondToChannelInvite> {
 395        self.transaction(move |tx| async move {
 396            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 397
 398            let membership_update = if accept {
 399                let rows_affected = channel_member::Entity::update_many()
 400                    .set(channel_member::ActiveModel {
 401                        accepted: ActiveValue::Set(accept),
 402                        ..Default::default()
 403                    })
 404                    .filter(
 405                        channel_member::Column::ChannelId
 406                            .eq(channel_id)
 407                            .and(channel_member::Column::UserId.eq(user_id))
 408                            .and(channel_member::Column::Accepted.eq(false)),
 409                    )
 410                    .exec(&*tx)
 411                    .await?
 412                    .rows_affected;
 413
 414                if rows_affected == 0 {
 415                    Err(anyhow!("no such invitation"))?;
 416                }
 417
 418                Some(
 419                    self.calculate_membership_updated(&channel, user_id, &*tx)
 420                        .await?,
 421                )
 422            } else {
 423                let rows_affected = channel_member::Entity::delete_many()
 424                    .filter(
 425                        channel_member::Column::ChannelId
 426                            .eq(channel_id)
 427                            .and(channel_member::Column::UserId.eq(user_id))
 428                            .and(channel_member::Column::Accepted.eq(false)),
 429                    )
 430                    .exec(&*tx)
 431                    .await?
 432                    .rows_affected;
 433                if rows_affected == 0 {
 434                    Err(anyhow!("no such invitation"))?;
 435                }
 436
 437                None
 438            };
 439
 440            Ok(RespondToChannelInvite {
 441                membership_update,
 442                notifications: self
 443                    .mark_notification_as_read_with_response(
 444                        user_id,
 445                        &rpc::Notification::ChannelInvitation {
 446                            channel_id: channel_id.to_proto(),
 447                            channel_name: Default::default(),
 448                            inviter_id: Default::default(),
 449                        },
 450                        accept,
 451                        &*tx,
 452                    )
 453                    .await?
 454                    .into_iter()
 455                    .collect(),
 456            })
 457        })
 458        .await
 459    }
 460
 461    async fn calculate_membership_updated(
 462        &self,
 463        channel: &channel::Model,
 464        user_id: UserId,
 465        tx: &DatabaseTransaction,
 466    ) -> Result<MembershipUpdated> {
 467        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 468        let removed_channels = self
 469            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 470            .await?
 471            .into_iter()
 472            .filter_map(|channel| {
 473                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 474                    Some(channel.id)
 475                } else {
 476                    None
 477                }
 478            })
 479            .collect::<Vec<_>>();
 480
 481        Ok(MembershipUpdated {
 482            channel_id: channel.id,
 483            new_channels,
 484            removed_channels,
 485        })
 486    }
 487
 488    pub async fn remove_channel_member(
 489        &self,
 490        channel_id: ChannelId,
 491        member_id: UserId,
 492        admin_id: UserId,
 493    ) -> Result<RemoveChannelMemberResult> {
 494        self.transaction(|tx| async move {
 495            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 496            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 497                .await?;
 498
 499            let result = channel_member::Entity::delete_many()
 500                .filter(
 501                    channel_member::Column::ChannelId
 502                        .eq(channel_id)
 503                        .and(channel_member::Column::UserId.eq(member_id)),
 504                )
 505                .exec(&*tx)
 506                .await?;
 507
 508            if result.rows_affected == 0 {
 509                Err(anyhow!("no such member"))?;
 510            }
 511
 512            Ok(RemoveChannelMemberResult {
 513                membership_update: self
 514                    .calculate_membership_updated(&channel, member_id, &*tx)
 515                    .await?,
 516                notification_id: self
 517                    .remove_notification(
 518                        member_id,
 519                        rpc::Notification::ChannelInvitation {
 520                            channel_id: channel_id.to_proto(),
 521                            channel_name: Default::default(),
 522                            inviter_id: Default::default(),
 523                        },
 524                        &*tx,
 525                    )
 526                    .await?,
 527            })
 528        })
 529        .await
 530    }
 531
 532    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 533        self.transaction(|tx| async move {
 534            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 535
 536            let channel_invites = channel_member::Entity::find()
 537                .filter(
 538                    channel_member::Column::UserId
 539                        .eq(user_id)
 540                        .and(channel_member::Column::Accepted.eq(false)),
 541                )
 542                .all(&*tx)
 543                .await?;
 544
 545            for invite in channel_invites {
 546                role_for_channel.insert(invite.channel_id, invite.role);
 547            }
 548
 549            let channels = channel::Entity::find()
 550                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 551                .all(&*tx)
 552                .await?;
 553
 554            let channels = channels
 555                .into_iter()
 556                .filter_map(|channel| {
 557                    let role = *role_for_channel.get(&channel.id)?;
 558                    Some(Channel::from_model(channel, role))
 559                })
 560                .collect();
 561
 562            Ok(channels)
 563        })
 564        .await
 565    }
 566
 567    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 568        self.transaction(|tx| async move {
 569            let tx = tx;
 570
 571            self.get_user_channels(user_id, None, &tx).await
 572        })
 573        .await
 574    }
 575
 576    pub async fn get_user_channels(
 577        &self,
 578        user_id: UserId,
 579        ancestor_channel: Option<&channel::Model>,
 580        tx: &DatabaseTransaction,
 581    ) -> Result<ChannelsForUser> {
 582        let channel_memberships = channel_member::Entity::find()
 583            .filter(
 584                channel_member::Column::UserId
 585                    .eq(user_id)
 586                    .and(channel_member::Column::Accepted.eq(true)),
 587            )
 588            .all(&*tx)
 589            .await?;
 590
 591        let descendants = self
 592            .get_channel_descendants_including_self(
 593                channel_memberships.iter().map(|m| m.channel_id),
 594                &*tx,
 595            )
 596            .await?;
 597
 598        let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
 599        for membership in channel_memberships.iter() {
 600            roles_by_channel_id.insert(membership.channel_id, membership.role);
 601        }
 602
 603        let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
 604
 605        let channels: Vec<Channel> = descendants
 606            .into_iter()
 607            .filter_map(|channel| {
 608                let parent_role = channel
 609                    .parent_id()
 610                    .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
 611
 612                let role = if let Some(parent_role) = parent_role {
 613                    let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
 614                        existing_role.max(*parent_role)
 615                    } else {
 616                        *parent_role
 617                    };
 618                    roles_by_channel_id.insert(channel.id, role);
 619                    role
 620                } else {
 621                    *roles_by_channel_id.get(&channel.id)?
 622                };
 623
 624                let can_see_parent_paths = role.can_see_all_descendants()
 625                    || role.can_only_see_public_descendants()
 626                        && channel.visibility == ChannelVisibility::Public;
 627                if !can_see_parent_paths {
 628                    return None;
 629                }
 630
 631                visible_channel_ids.insert(channel.id);
 632
 633                if let Some(ancestor) = ancestor_channel {
 634                    if !channel
 635                        .ancestors_including_self()
 636                        .any(|id| id == ancestor.id)
 637                    {
 638                        return None;
 639                    }
 640                }
 641
 642                let mut channel = Channel::from_model(channel, role);
 643                channel
 644                    .parent_path
 645                    .retain(|id| visible_channel_ids.contains(&id));
 646
 647                Some(channel)
 648            })
 649            .collect();
 650
 651        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 652        enum QueryUserIdsAndChannelIds {
 653            ChannelId,
 654            UserId,
 655        }
 656
 657        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 658        {
 659            let mut rows = room_participant::Entity::find()
 660                .inner_join(room::Entity)
 661                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 662                .select_only()
 663                .column(room::Column::ChannelId)
 664                .column(room_participant::Column::UserId)
 665                .into_values::<_, QueryUserIdsAndChannelIds>()
 666                .stream(&*tx)
 667                .await?;
 668            while let Some(row) = rows.next().await {
 669                let row: (ChannelId, UserId) = row?;
 670                channel_participants.entry(row.0).or_default().push(row.1)
 671            }
 672        }
 673
 674        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 675        let channel_buffer_changes = self
 676            .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
 677            .await?;
 678
 679        let unseen_messages = self
 680            .unseen_channel_messages(user_id, &channel_ids, &*tx)
 681            .await?;
 682
 683        Ok(ChannelsForUser {
 684            channels,
 685            channel_participants,
 686            unseen_buffer_changes: channel_buffer_changes,
 687            channel_messages: unseen_messages,
 688        })
 689    }
 690
 691    async fn participants_to_notify_for_channel_change(
 692        &self,
 693        new_parent: &channel::Model,
 694        tx: &DatabaseTransaction,
 695    ) -> Result<Vec<(UserId, ChannelsForUser)>> {
 696        let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
 697
 698        let members = self
 699            .get_channel_participant_details_internal(new_parent, &*tx)
 700            .await?;
 701
 702        for member in members.iter() {
 703            if !member.role.can_see_all_descendants() {
 704                continue;
 705            }
 706            results.push((
 707                member.user_id,
 708                self.get_user_channels(member.user_id, Some(new_parent), &*tx)
 709                    .await?,
 710            ))
 711        }
 712
 713        let public_parents = self
 714            .public_ancestors_including_self(new_parent, &*tx)
 715            .await?;
 716        let public_parent = public_parents.last();
 717
 718        let Some(public_parent) = public_parent else {
 719            return Ok(results);
 720        };
 721
 722        // could save some time in the common case by skipping this if the
 723        // new channel is not public and has no public descendants.
 724        let public_members = if public_parent == new_parent {
 725            members
 726        } else {
 727            self.get_channel_participant_details_internal(public_parent, &*tx)
 728                .await?
 729        };
 730
 731        for member in public_members {
 732            if !member.role.can_only_see_public_descendants() {
 733                continue;
 734            };
 735            results.push((
 736                member.user_id,
 737                self.get_user_channels(member.user_id, Some(public_parent), &*tx)
 738                    .await?,
 739            ))
 740        }
 741
 742        Ok(results)
 743    }
 744
 745    pub async fn set_channel_member_role(
 746        &self,
 747        channel_id: ChannelId,
 748        admin_id: UserId,
 749        for_user: UserId,
 750        role: ChannelRole,
 751    ) -> Result<SetMemberRoleResult> {
 752        self.transaction(|tx| async move {
 753            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 754            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 755                .await?;
 756
 757            let membership = channel_member::Entity::find()
 758                .filter(
 759                    channel_member::Column::ChannelId
 760                        .eq(channel_id)
 761                        .and(channel_member::Column::UserId.eq(for_user)),
 762                )
 763                .one(&*tx)
 764                .await?;
 765
 766            let Some(membership) = membership else {
 767                Err(anyhow!("no such member"))?
 768            };
 769
 770            let mut update = membership.into_active_model();
 771            update.role = ActiveValue::Set(role);
 772            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 773
 774            if updated.accepted {
 775                Ok(SetMemberRoleResult::MembershipUpdated(
 776                    self.calculate_membership_updated(&channel, for_user, &*tx)
 777                        .await?,
 778                ))
 779            } else {
 780                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 781                    channel, role,
 782                )))
 783            }
 784        })
 785        .await
 786    }
 787
 788    pub async fn get_channel_participant_details(
 789        &self,
 790        channel_id: ChannelId,
 791        user_id: UserId,
 792    ) -> Result<Vec<proto::ChannelMember>> {
 793        let (role, members) = self
 794            .transaction(move |tx| async move {
 795                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 796                let role = self
 797                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 798                    .await?;
 799                Ok((
 800                    role,
 801                    self.get_channel_participant_details_internal(&channel, &*tx)
 802                        .await?,
 803                ))
 804            })
 805            .await?;
 806
 807        if role == ChannelRole::Admin {
 808            Ok(members
 809                .into_iter()
 810                .map(|channel_member| channel_member.to_proto())
 811                .collect())
 812        } else {
 813            return Ok(members
 814                .into_iter()
 815                .filter_map(|member| {
 816                    if member.kind == proto::channel_member::Kind::Invitee {
 817                        return None;
 818                    }
 819                    Some(ChannelMember {
 820                        role: member.role,
 821                        user_id: member.user_id,
 822                        kind: proto::channel_member::Kind::Member,
 823                    })
 824                })
 825                .map(|channel_member| channel_member.to_proto())
 826                .collect());
 827        }
 828    }
 829
 830    async fn get_channel_participant_details_internal(
 831        &self,
 832        channel: &channel::Model,
 833        tx: &DatabaseTransaction,
 834    ) -> Result<Vec<ChannelMember>> {
 835        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 836        enum QueryMemberDetails {
 837            UserId,
 838            Role,
 839            IsDirectMember,
 840            Accepted,
 841            Visibility,
 842        }
 843
 844        let mut stream = channel_member::Entity::find()
 845            .left_join(channel::Entity)
 846            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 847            .select_only()
 848            .column(channel_member::Column::UserId)
 849            .column(channel_member::Column::Role)
 850            .column_as(
 851                channel_member::Column::ChannelId.eq(channel.id),
 852                QueryMemberDetails::IsDirectMember,
 853            )
 854            .column(channel_member::Column::Accepted)
 855            .column(channel::Column::Visibility)
 856            .into_values::<_, QueryMemberDetails>()
 857            .stream(&*tx)
 858            .await?;
 859
 860        let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
 861
 862        while let Some(user_membership) = stream.next().await {
 863            let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
 864                UserId,
 865                ChannelRole,
 866                bool,
 867                bool,
 868                ChannelVisibility,
 869            ) = user_membership?;
 870            let kind = match (is_direct_member, is_invite_accepted) {
 871                (true, true) => proto::channel_member::Kind::Member,
 872                (true, false) => proto::channel_member::Kind::Invitee,
 873                (false, true) => proto::channel_member::Kind::AncestorMember,
 874                (false, false) => continue,
 875            };
 876
 877            if channel_role == ChannelRole::Guest
 878                && visibility != ChannelVisibility::Public
 879                && channel.visibility != ChannelVisibility::Public
 880            {
 881                continue;
 882            }
 883
 884            if let Some(details_mut) = user_details.get_mut(&user_id) {
 885                if channel_role.should_override(details_mut.role) {
 886                    details_mut.role = channel_role;
 887                }
 888                if kind == Kind::Member {
 889                    details_mut.kind = kind;
 890                // the UI is going to be a bit confusing if you already have permissions
 891                // that are greater than or equal to the ones you're being invited to.
 892                } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
 893                    details_mut.kind = kind;
 894                }
 895            } else {
 896                user_details.insert(
 897                    user_id,
 898                    ChannelMember {
 899                        user_id,
 900                        kind,
 901                        role: channel_role,
 902                    },
 903                );
 904            }
 905        }
 906
 907        Ok(user_details
 908            .into_iter()
 909            .map(|(_, details)| details)
 910            .collect())
 911    }
 912
 913    pub async fn get_channel_participants(
 914        &self,
 915        channel: &channel::Model,
 916        tx: &DatabaseTransaction,
 917    ) -> Result<Vec<UserId>> {
 918        let participants = self
 919            .get_channel_participant_details_internal(channel, &*tx)
 920            .await?;
 921        Ok(participants
 922            .into_iter()
 923            .map(|member| member.user_id)
 924            .collect())
 925    }
 926
 927    pub async fn check_user_is_channel_admin(
 928        &self,
 929        channel: &channel::Model,
 930        user_id: UserId,
 931        tx: &DatabaseTransaction,
 932    ) -> Result<ChannelRole> {
 933        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 934        match role {
 935            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 936            Some(ChannelRole::Member)
 937            | Some(ChannelRole::Banned)
 938            | Some(ChannelRole::Guest)
 939            | None => Err(anyhow!(
 940                "user is not a channel admin or channel does not exist"
 941            ))?,
 942        }
 943    }
 944
 945    pub async fn check_user_is_channel_member(
 946        &self,
 947        channel: &channel::Model,
 948        user_id: UserId,
 949        tx: &DatabaseTransaction,
 950    ) -> Result<ChannelRole> {
 951        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 952        match channel_role {
 953            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 954            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 955                "user is not a channel member or channel does not exist"
 956            ))?,
 957        }
 958    }
 959
 960    pub async fn check_user_is_channel_participant(
 961        &self,
 962        channel: &channel::Model,
 963        user_id: UserId,
 964        tx: &DatabaseTransaction,
 965    ) -> Result<ChannelRole> {
 966        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 967        match role {
 968            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 969                Ok(role.unwrap())
 970            }
 971            Some(ChannelRole::Banned) | None => Err(anyhow!(
 972                "user is not a channel participant or channel does not exist"
 973            ))?,
 974        }
 975    }
 976
 977    pub async fn pending_invite_for_channel(
 978        &self,
 979        channel: &channel::Model,
 980        user_id: UserId,
 981        tx: &DatabaseTransaction,
 982    ) -> Result<Option<channel_member::Model>> {
 983        let row = channel_member::Entity::find()
 984            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 985            .filter(channel_member::Column::UserId.eq(user_id))
 986            .filter(channel_member::Column::Accepted.eq(false))
 987            .one(&*tx)
 988            .await?;
 989
 990        Ok(row)
 991    }
 992
 993    pub async fn public_parent_channel(
 994        &self,
 995        channel: &channel::Model,
 996        tx: &DatabaseTransaction,
 997    ) -> Result<Option<channel::Model>> {
 998        let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
 999        if path.last().unwrap().id == channel.id {
1000            path.pop();
1001        }
1002        Ok(path.pop())
1003    }
1004
1005    pub async fn public_ancestors_including_self(
1006        &self,
1007        channel: &channel::Model,
1008        tx: &DatabaseTransaction,
1009    ) -> Result<Vec<channel::Model>> {
1010        let visible_channels = channel::Entity::find()
1011            .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1012            .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1013            .order_by_asc(channel::Column::ParentPath)
1014            .all(&*tx)
1015            .await?;
1016
1017        Ok(visible_channels)
1018    }
1019
1020    pub async fn channel_role_for_user(
1021        &self,
1022        channel: &channel::Model,
1023        user_id: UserId,
1024        tx: &DatabaseTransaction,
1025    ) -> Result<Option<ChannelRole>> {
1026        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1027        enum QueryChannelMembership {
1028            ChannelId,
1029            Role,
1030            Visibility,
1031        }
1032
1033        let mut rows = channel_member::Entity::find()
1034            .left_join(channel::Entity)
1035            .filter(
1036                channel_member::Column::ChannelId
1037                    .is_in(channel.ancestors_including_self())
1038                    .and(channel_member::Column::UserId.eq(user_id))
1039                    .and(channel_member::Column::Accepted.eq(true)),
1040            )
1041            .select_only()
1042            .column(channel_member::Column::ChannelId)
1043            .column(channel_member::Column::Role)
1044            .column(channel::Column::Visibility)
1045            .into_values::<_, QueryChannelMembership>()
1046            .stream(&*tx)
1047            .await?;
1048
1049        let mut user_role: Option<ChannelRole> = None;
1050
1051        let mut is_participant = false;
1052        let mut current_channel_visibility = None;
1053
1054        // note these channels are not iterated in any particular order,
1055        // our current logic takes the highest permission available.
1056        while let Some(row) = rows.next().await {
1057            let (membership_channel, role, visibility): (
1058                ChannelId,
1059                ChannelRole,
1060                ChannelVisibility,
1061            ) = row?;
1062
1063            match role {
1064                ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1065                    if let Some(users_role) = user_role {
1066                        user_role = Some(users_role.max(role));
1067                    } else {
1068                        user_role = Some(role)
1069                    }
1070                }
1071                ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1072                    is_participant = true
1073                }
1074                ChannelRole::Guest => {}
1075            }
1076            if channel.id == membership_channel {
1077                current_channel_visibility = Some(visibility);
1078            }
1079        }
1080        // free up database connection
1081        drop(rows);
1082
1083        if is_participant && user_role.is_none() {
1084            if current_channel_visibility.is_none() {
1085                current_channel_visibility = channel::Entity::find()
1086                    .filter(channel::Column::Id.eq(channel.id))
1087                    .one(&*tx)
1088                    .await?
1089                    .map(|channel| channel.visibility);
1090            }
1091            if current_channel_visibility == Some(ChannelVisibility::Public) {
1092                user_role = Some(ChannelRole::Guest);
1093            }
1094        }
1095
1096        Ok(user_role)
1097    }
1098
1099    // Get the descendants of the given set if channels, ordered by their
1100    // path.
1101    async fn get_channel_descendants_including_self(
1102        &self,
1103        channel_ids: impl IntoIterator<Item = ChannelId>,
1104        tx: &DatabaseTransaction,
1105    ) -> Result<Vec<channel::Model>> {
1106        let mut values = String::new();
1107        for id in channel_ids {
1108            if !values.is_empty() {
1109                values.push_str(", ");
1110            }
1111            write!(&mut values, "({})", id).unwrap();
1112        }
1113
1114        if values.is_empty() {
1115            return Ok(vec![]);
1116        }
1117
1118        let sql = format!(
1119            r#"
1120            SELECT DISTINCT
1121                descendant_channels.*,
1122                descendant_channels.parent_path || descendant_channels.id as full_path
1123            FROM
1124                channels parent_channels, channels descendant_channels
1125            WHERE
1126                descendant_channels.id IN ({values}) OR
1127                (
1128                    parent_channels.id IN ({values}) AND
1129                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1130                )
1131            ORDER BY
1132                full_path ASC
1133            "#
1134        );
1135
1136        Ok(channel::Entity::find()
1137            .from_raw_sql(Statement::from_string(
1138                self.pool.get_database_backend(),
1139                sql,
1140            ))
1141            .all(tx)
1142            .await?)
1143    }
1144
1145    /// Returns the channel with the given ID
1146    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1147        self.transaction(|tx| async move {
1148            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1149            let role = self
1150                .check_user_is_channel_participant(&channel, user_id, &*tx)
1151                .await?;
1152
1153            Ok(Channel::from_model(channel, role))
1154        })
1155        .await
1156    }
1157
1158    pub async fn get_channel_internal(
1159        &self,
1160        channel_id: ChannelId,
1161        tx: &DatabaseTransaction,
1162    ) -> Result<channel::Model> {
1163        Ok(channel::Entity::find_by_id(channel_id)
1164            .one(&*tx)
1165            .await?
1166            .ok_or_else(|| anyhow!("no such channel"))?)
1167    }
1168
1169    pub(crate) async fn get_or_create_channel_room(
1170        &self,
1171        channel_id: ChannelId,
1172        live_kit_room: &str,
1173        environment: &str,
1174        tx: &DatabaseTransaction,
1175    ) -> Result<RoomId> {
1176        let room = room::Entity::find()
1177            .filter(room::Column::ChannelId.eq(channel_id))
1178            .one(&*tx)
1179            .await?;
1180
1181        let room_id = if let Some(room) = room {
1182            if let Some(env) = room.enviroment {
1183                if &env != environment {
1184                    Err(anyhow!("must join using the {} release", env))?;
1185                }
1186            }
1187            room.id
1188        } else {
1189            let result = room::Entity::insert(room::ActiveModel {
1190                channel_id: ActiveValue::Set(Some(channel_id)),
1191                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1192                enviroment: ActiveValue::Set(Some(environment.to_string())),
1193                ..Default::default()
1194            })
1195            .exec(&*tx)
1196            .await?;
1197
1198            result.last_insert_id
1199        };
1200
1201        Ok(room_id)
1202    }
1203
1204    /// Move a channel from one parent to another
1205    pub async fn move_channel(
1206        &self,
1207        channel_id: ChannelId,
1208        new_parent_id: Option<ChannelId>,
1209        admin_id: UserId,
1210    ) -> Result<Option<MoveChannelResult>> {
1211        self.transaction(|tx| async move {
1212            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1213            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1214                .await?;
1215
1216            let new_parent_path;
1217            let new_parent_channel;
1218            if let Some(new_parent_id) = new_parent_id {
1219                let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1220                self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1221                    .await?;
1222
1223                if new_parent
1224                    .ancestors_including_self()
1225                    .any(|id| id == channel.id)
1226                {
1227                    Err(anyhow!("cannot move a channel into one of its descendants"))?;
1228                }
1229
1230                new_parent_path = new_parent.path();
1231                new_parent_channel = Some(new_parent);
1232            } else {
1233                new_parent_path = String::new();
1234                new_parent_channel = None;
1235            };
1236
1237            let previous_participants = self
1238                .get_channel_participant_details_internal(&channel, &*tx)
1239                .await?;
1240
1241            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1242            let new_path = format!("{}{}/", new_parent_path, channel.id);
1243
1244            if old_path == new_path {
1245                return Ok(None);
1246            }
1247
1248            let mut model = channel.into_active_model();
1249            model.parent_path = ActiveValue::Set(new_parent_path);
1250            let channel = model.update(&*tx).await?;
1251
1252            if new_parent_channel.is_none() {
1253                channel_member::ActiveModel {
1254                    id: ActiveValue::NotSet,
1255                    channel_id: ActiveValue::Set(channel_id),
1256                    user_id: ActiveValue::Set(admin_id),
1257                    accepted: ActiveValue::Set(true),
1258                    role: ActiveValue::Set(ChannelRole::Admin),
1259                }
1260                .insert(&*tx)
1261                .await?;
1262            }
1263
1264            let descendent_ids =
1265                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1266                    self.pool.get_database_backend(),
1267                    "
1268                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1269                    WHERE parent_path LIKE $3 || '%'
1270                    RETURNING id
1271                ",
1272                    [old_path.clone().into(), new_path.into(), old_path.into()],
1273                ))
1274                .all(&*tx)
1275                .await?;
1276
1277            let participants_to_update: HashMap<_, _> = self
1278                .participants_to_notify_for_channel_change(
1279                    new_parent_channel.as_ref().unwrap_or(&channel),
1280                    &*tx,
1281                )
1282                .await?
1283                .into_iter()
1284                .collect();
1285
1286            let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1287            for id in descendent_ids {
1288                moved_channels.insert(id);
1289            }
1290            moved_channels.insert(channel_id);
1291
1292            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1293            for participant in previous_participants {
1294                if participant.kind == proto::channel_member::Kind::AncestorMember {
1295                    if !participants_to_update.contains_key(&participant.user_id) {
1296                        participants_to_remove.insert(participant.user_id);
1297                    }
1298                }
1299            }
1300
1301            Ok(Some(MoveChannelResult {
1302                participants_to_remove,
1303                participants_to_update,
1304                moved_channels,
1305            }))
1306        })
1307        .await
1308    }
1309}
1310
1311#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1312enum QueryIds {
1313    Id,
1314}
1315
1316#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1317enum QueryUserIds {
1318    UserId,
1319}