channels.rs

   1use super::*;
   2use rpc::proto::channel_member::Kind;
   3use sea_orm::TryGetableMany;
   4
   5impl Database {
   6    #[cfg(test)]
   7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
   8        self.transaction(move |tx| async move {
   9            let mut channels = Vec::new();
  10            let mut rows = channel::Entity::find().stream(&*tx).await?;
  11            while let Some(row) = rows.next().await {
  12                let row = row?;
  13                channels.push((row.id, row.name));
  14            }
  15            Ok(channels)
  16        })
  17        .await
  18    }
  19
  20    #[cfg(test)]
  21    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
  22        Ok(self
  23            .create_channel(name, None, creator_id)
  24            .await?
  25            .channel
  26            .id)
  27    }
  28
  29    #[cfg(test)]
  30    pub async fn create_sub_channel(
  31        &self,
  32        name: &str,
  33        parent: ChannelId,
  34        creator_id: UserId,
  35    ) -> Result<ChannelId> {
  36        Ok(self
  37            .create_channel(name, Some(parent), creator_id)
  38            .await?
  39            .channel
  40            .id)
  41    }
  42
  43    pub async fn create_channel(
  44        &self,
  45        name: &str,
  46        parent_channel_id: Option<ChannelId>,
  47        admin_id: UserId,
  48    ) -> Result<CreateChannelResult> {
  49        let name = Self::sanitize_channel_name(name)?;
  50        self.transaction(move |tx| async move {
  51            let mut parent = None;
  52
  53            if let Some(parent_channel_id) = parent_channel_id {
  54                let parent_channel = self.get_channel_internal(parent_channel_id, &*tx).await?;
  55                self.check_user_is_channel_admin(&parent_channel, admin_id, &*tx)
  56                    .await?;
  57                parent = Some(parent_channel);
  58            }
  59
  60            let channel = channel::ActiveModel {
  61                id: ActiveValue::NotSet,
  62                name: ActiveValue::Set(name.to_string()),
  63                visibility: ActiveValue::Set(ChannelVisibility::Members),
  64                parent_path: ActiveValue::Set(
  65                    parent
  66                        .as_ref()
  67                        .map_or(String::new(), |parent| parent.path()),
  68                ),
  69            }
  70            .insert(&*tx)
  71            .await?;
  72
  73            let participants_to_update;
  74            if let Some(parent) = &parent {
  75                participants_to_update = self
  76                    .participants_to_notify_for_channel_change(parent, &*tx)
  77                    .await?;
  78            } else {
  79                participants_to_update = vec![];
  80
  81                channel_member::ActiveModel {
  82                    id: ActiveValue::NotSet,
  83                    channel_id: ActiveValue::Set(channel.id),
  84                    user_id: ActiveValue::Set(admin_id),
  85                    accepted: ActiveValue::Set(true),
  86                    role: ActiveValue::Set(ChannelRole::Admin),
  87                }
  88                .insert(&*tx)
  89                .await?;
  90            };
  91
  92            Ok(CreateChannelResult {
  93                channel: Channel::from_model(channel, ChannelRole::Admin),
  94                participants_to_update,
  95            })
  96        })
  97        .await
  98    }
  99
 100    pub async fn join_channel(
 101        &self,
 102        channel_id: ChannelId,
 103        user_id: UserId,
 104        connection: ConnectionId,
 105        environment: &str,
 106    ) -> Result<(JoinRoom, Option<MembershipUpdated>, ChannelRole)> {
 107        self.transaction(move |tx| async move {
 108            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 109            let mut role = self.channel_role_for_user(&channel, user_id, &*tx).await?;
 110
 111            let mut accept_invite_result = None;
 112
 113            if role.is_none() {
 114                if let Some(invitation) = self
 115                    .pending_invite_for_channel(&channel, user_id, &*tx)
 116                    .await?
 117                {
 118                    // note, this may be a parent channel
 119                    role = Some(invitation.role);
 120                    channel_member::Entity::update(channel_member::ActiveModel {
 121                        accepted: ActiveValue::Set(true),
 122                        ..invitation.into_active_model()
 123                    })
 124                    .exec(&*tx)
 125                    .await?;
 126
 127                    accept_invite_result = Some(
 128                        self.calculate_membership_updated(&channel, user_id, &*tx)
 129                            .await?,
 130                    );
 131
 132                    debug_assert!(
 133                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 134                    );
 135                } else if channel.visibility == ChannelVisibility::Public {
 136                    role = Some(ChannelRole::Guest);
 137                    let channel_to_join = self
 138                        .public_ancestors_including_self(&channel, &*tx)
 139                        .await?
 140                        .first()
 141                        .cloned()
 142                        .unwrap_or(channel.clone());
 143
 144                    channel_member::Entity::insert(channel_member::ActiveModel {
 145                        id: ActiveValue::NotSet,
 146                        channel_id: ActiveValue::Set(channel_to_join.id),
 147                        user_id: ActiveValue::Set(user_id),
 148                        accepted: ActiveValue::Set(true),
 149                        role: ActiveValue::Set(ChannelRole::Guest),
 150                    })
 151                    .exec(&*tx)
 152                    .await?;
 153
 154                    accept_invite_result = Some(
 155                        self.calculate_membership_updated(&channel_to_join, user_id, &*tx)
 156                            .await?,
 157                    );
 158
 159                    debug_assert!(
 160                        self.channel_role_for_user(&channel, user_id, &*tx).await? == role
 161                    );
 162                }
 163            }
 164
 165            if role.is_none() || role == Some(ChannelRole::Banned) {
 166                Err(anyhow!("not allowed"))?
 167            }
 168            let role = role.unwrap();
 169
 170            let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
 171            let room_id = self
 172                .get_or_create_channel_room(channel_id, &live_kit_room, environment, &*tx)
 173                .await?;
 174
 175            self.join_channel_room_internal(room_id, user_id, connection, role, &*tx)
 176                .await
 177                .map(|jr| (jr, accept_invite_result, role))
 178        })
 179        .await
 180    }
 181
 182    pub async fn set_channel_visibility(
 183        &self,
 184        channel_id: ChannelId,
 185        visibility: ChannelVisibility,
 186        admin_id: UserId,
 187    ) -> Result<SetChannelVisibilityResult> {
 188        self.transaction(move |tx| async move {
 189            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 190
 191            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 192                .await?;
 193
 194            let previous_members = self
 195                .get_channel_participant_details_internal(&channel, &*tx)
 196                .await?;
 197
 198            let mut model = channel.into_active_model();
 199            model.visibility = ActiveValue::Set(visibility);
 200            let channel = model.update(&*tx).await?;
 201
 202            let mut participants_to_update: HashMap<UserId, ChannelsForUser> = self
 203                .participants_to_notify_for_channel_change(&channel, &*tx)
 204                .await?
 205                .into_iter()
 206                .collect();
 207
 208            let mut channels_to_remove: Vec<ChannelId> = vec![];
 209            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
 210            match visibility {
 211                ChannelVisibility::Members => {
 212                    let all_descendents: Vec<ChannelId> = self
 213                        .get_channel_descendants_including_self(vec![channel_id], &*tx)
 214                        .await?
 215                        .into_iter()
 216                        .map(|channel| channel.id)
 217                        .collect();
 218
 219                    channels_to_remove = channel::Entity::find()
 220                        .filter(
 221                            channel::Column::Id
 222                                .is_in(all_descendents)
 223                                .and(channel::Column::Visibility.eq(ChannelVisibility::Public)),
 224                        )
 225                        .all(&*tx)
 226                        .await?
 227                        .into_iter()
 228                        .map(|channel| channel.id)
 229                        .collect();
 230
 231                    channels_to_remove.push(channel_id);
 232
 233                    for member in previous_members {
 234                        if member.role.can_only_see_public_descendants() {
 235                            participants_to_remove.insert(member.user_id);
 236                        }
 237                    }
 238                }
 239                ChannelVisibility::Public => {
 240                    if let Some(public_parent) = self.public_parent_channel(&channel, &*tx).await? {
 241                        let parent_updates = self
 242                            .participants_to_notify_for_channel_change(&public_parent, &*tx)
 243                            .await?;
 244
 245                        for (user_id, channels) in parent_updates {
 246                            participants_to_update.insert(user_id, channels);
 247                        }
 248                    }
 249                }
 250            }
 251
 252            Ok(SetChannelVisibilityResult {
 253                participants_to_update,
 254                participants_to_remove,
 255                channels_to_remove,
 256            })
 257        })
 258        .await
 259    }
 260
 261    pub async fn delete_channel(
 262        &self,
 263        channel_id: ChannelId,
 264        user_id: UserId,
 265    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
 266        self.transaction(move |tx| async move {
 267            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 268            self.check_user_is_channel_admin(&channel, user_id, &*tx)
 269                .await?;
 270
 271            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
 272                .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 273                .select_only()
 274                .column(channel_member::Column::UserId)
 275                .distinct()
 276                .into_values::<_, QueryUserIds>()
 277                .all(&*tx)
 278                .await?;
 279
 280            let channels_to_remove = self
 281                .get_channel_descendants_including_self(vec![channel.id], &*tx)
 282                .await?
 283                .into_iter()
 284                .map(|channel| channel.id)
 285                .collect::<Vec<_>>();
 286
 287            channel::Entity::delete_many()
 288                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
 289                .exec(&*tx)
 290                .await?;
 291
 292            Ok((channels_to_remove, members_to_notify))
 293        })
 294        .await
 295    }
 296
 297    pub async fn invite_channel_member(
 298        &self,
 299        channel_id: ChannelId,
 300        invitee_id: UserId,
 301        inviter_id: UserId,
 302        role: ChannelRole,
 303    ) -> Result<InviteMemberResult> {
 304        self.transaction(move |tx| async move {
 305            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 306            self.check_user_is_channel_admin(&channel, inviter_id, &*tx)
 307                .await?;
 308
 309            channel_member::ActiveModel {
 310                id: ActiveValue::NotSet,
 311                channel_id: ActiveValue::Set(channel_id),
 312                user_id: ActiveValue::Set(invitee_id),
 313                accepted: ActiveValue::Set(false),
 314                role: ActiveValue::Set(role),
 315            }
 316            .insert(&*tx)
 317            .await?;
 318
 319            let channel = Channel::from_model(channel, role);
 320
 321            let notifications = self
 322                .create_notification(
 323                    invitee_id,
 324                    rpc::Notification::ChannelInvitation {
 325                        channel_id: channel_id.to_proto(),
 326                        channel_name: channel.name.clone(),
 327                        inviter_id: inviter_id.to_proto(),
 328                    },
 329                    true,
 330                    &*tx,
 331                )
 332                .await?
 333                .into_iter()
 334                .collect();
 335
 336            Ok(InviteMemberResult {
 337                channel,
 338                notifications,
 339            })
 340        })
 341        .await
 342    }
 343
 344    fn sanitize_channel_name(name: &str) -> Result<&str> {
 345        let new_name = name.trim().trim_start_matches('#');
 346        if new_name == "" {
 347            Err(anyhow!("channel name can't be blank"))?;
 348        }
 349        Ok(new_name)
 350    }
 351
 352    pub async fn rename_channel(
 353        &self,
 354        channel_id: ChannelId,
 355        admin_id: UserId,
 356        new_name: &str,
 357    ) -> Result<RenameChannelResult> {
 358        self.transaction(move |tx| async move {
 359            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
 360
 361            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 362            let role = self
 363                .check_user_is_channel_admin(&channel, admin_id, &*tx)
 364                .await?;
 365
 366            let mut model = channel.into_active_model();
 367            model.name = ActiveValue::Set(new_name.clone());
 368            let channel = model.update(&*tx).await?;
 369
 370            let participants = self
 371                .get_channel_participant_details_internal(&channel, &*tx)
 372                .await?;
 373
 374            Ok(RenameChannelResult {
 375                channel: Channel::from_model(channel.clone(), role),
 376                participants_to_update: participants
 377                    .iter()
 378                    .map(|participant| {
 379                        (
 380                            participant.user_id,
 381                            Channel::from_model(channel.clone(), participant.role),
 382                        )
 383                    })
 384                    .collect(),
 385            })
 386        })
 387        .await
 388    }
 389
 390    pub async fn respond_to_channel_invite(
 391        &self,
 392        channel_id: ChannelId,
 393        user_id: UserId,
 394        accept: bool,
 395    ) -> Result<RespondToChannelInvite> {
 396        self.transaction(move |tx| async move {
 397            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 398
 399            let membership_update = if accept {
 400                let rows_affected = channel_member::Entity::update_many()
 401                    .set(channel_member::ActiveModel {
 402                        accepted: ActiveValue::Set(accept),
 403                        ..Default::default()
 404                    })
 405                    .filter(
 406                        channel_member::Column::ChannelId
 407                            .eq(channel_id)
 408                            .and(channel_member::Column::UserId.eq(user_id))
 409                            .and(channel_member::Column::Accepted.eq(false)),
 410                    )
 411                    .exec(&*tx)
 412                    .await?
 413                    .rows_affected;
 414
 415                if rows_affected == 0 {
 416                    Err(anyhow!("no such invitation"))?;
 417                }
 418
 419                Some(
 420                    self.calculate_membership_updated(&channel, user_id, &*tx)
 421                        .await?,
 422                )
 423            } else {
 424                let rows_affected = channel_member::Entity::delete_many()
 425                    .filter(
 426                        channel_member::Column::ChannelId
 427                            .eq(channel_id)
 428                            .and(channel_member::Column::UserId.eq(user_id))
 429                            .and(channel_member::Column::Accepted.eq(false)),
 430                    )
 431                    .exec(&*tx)
 432                    .await?
 433                    .rows_affected;
 434                if rows_affected == 0 {
 435                    Err(anyhow!("no such invitation"))?;
 436                }
 437
 438                None
 439            };
 440
 441            Ok(RespondToChannelInvite {
 442                membership_update,
 443                notifications: self
 444                    .mark_notification_as_read_with_response(
 445                        user_id,
 446                        &rpc::Notification::ChannelInvitation {
 447                            channel_id: channel_id.to_proto(),
 448                            channel_name: Default::default(),
 449                            inviter_id: Default::default(),
 450                        },
 451                        accept,
 452                        &*tx,
 453                    )
 454                    .await?
 455                    .into_iter()
 456                    .collect(),
 457            })
 458        })
 459        .await
 460    }
 461
 462    async fn calculate_membership_updated(
 463        &self,
 464        channel: &channel::Model,
 465        user_id: UserId,
 466        tx: &DatabaseTransaction,
 467    ) -> Result<MembershipUpdated> {
 468        let new_channels = self.get_user_channels(user_id, Some(channel), &*tx).await?;
 469        let removed_channels = self
 470            .get_channel_descendants_including_self(vec![channel.id], &*tx)
 471            .await?
 472            .into_iter()
 473            .filter_map(|channel| {
 474                if !new_channels.channels.iter().any(|c| c.id == channel.id) {
 475                    Some(channel.id)
 476                } else {
 477                    None
 478                }
 479            })
 480            .collect::<Vec<_>>();
 481
 482        Ok(MembershipUpdated {
 483            channel_id: channel.id,
 484            new_channels,
 485            removed_channels,
 486        })
 487    }
 488
 489    pub async fn remove_channel_member(
 490        &self,
 491        channel_id: ChannelId,
 492        member_id: UserId,
 493        admin_id: UserId,
 494    ) -> Result<RemoveChannelMemberResult> {
 495        self.transaction(|tx| async move {
 496            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 497            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 498                .await?;
 499
 500            let result = channel_member::Entity::delete_many()
 501                .filter(
 502                    channel_member::Column::ChannelId
 503                        .eq(channel_id)
 504                        .and(channel_member::Column::UserId.eq(member_id)),
 505                )
 506                .exec(&*tx)
 507                .await?;
 508
 509            if result.rows_affected == 0 {
 510                Err(anyhow!("no such member"))?;
 511            }
 512
 513            Ok(RemoveChannelMemberResult {
 514                membership_update: self
 515                    .calculate_membership_updated(&channel, member_id, &*tx)
 516                    .await?,
 517                notification_id: self
 518                    .remove_notification(
 519                        member_id,
 520                        rpc::Notification::ChannelInvitation {
 521                            channel_id: channel_id.to_proto(),
 522                            channel_name: Default::default(),
 523                            inviter_id: Default::default(),
 524                        },
 525                        &*tx,
 526                    )
 527                    .await?,
 528            })
 529        })
 530        .await
 531    }
 532
 533    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
 534        self.transaction(|tx| async move {
 535            let mut role_for_channel: HashMap<ChannelId, ChannelRole> = HashMap::default();
 536
 537            let channel_invites = channel_member::Entity::find()
 538                .filter(
 539                    channel_member::Column::UserId
 540                        .eq(user_id)
 541                        .and(channel_member::Column::Accepted.eq(false)),
 542                )
 543                .all(&*tx)
 544                .await?;
 545
 546            for invite in channel_invites {
 547                role_for_channel.insert(invite.channel_id, invite.role);
 548            }
 549
 550            let channels = channel::Entity::find()
 551                .filter(channel::Column::Id.is_in(role_for_channel.keys().copied()))
 552                .all(&*tx)
 553                .await?;
 554
 555            let channels = channels
 556                .into_iter()
 557                .filter_map(|channel| {
 558                    let role = *role_for_channel.get(&channel.id)?;
 559                    Some(Channel::from_model(channel, role))
 560                })
 561                .collect();
 562
 563            Ok(channels)
 564        })
 565        .await
 566    }
 567
 568    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
 569        self.transaction(|tx| async move {
 570            let tx = tx;
 571
 572            self.get_user_channels(user_id, None, &tx).await
 573        })
 574        .await
 575    }
 576
 577    pub async fn get_user_channels(
 578        &self,
 579        user_id: UserId,
 580        ancestor_channel: Option<&channel::Model>,
 581        tx: &DatabaseTransaction,
 582    ) -> Result<ChannelsForUser> {
 583        let channel_memberships = channel_member::Entity::find()
 584            .filter(
 585                channel_member::Column::UserId
 586                    .eq(user_id)
 587                    .and(channel_member::Column::Accepted.eq(true)),
 588            )
 589            .all(&*tx)
 590            .await?;
 591
 592        let descendants = self
 593            .get_channel_descendants_including_self(
 594                channel_memberships.iter().map(|m| m.channel_id),
 595                &*tx,
 596            )
 597            .await?;
 598
 599        let mut roles_by_channel_id: HashMap<ChannelId, ChannelRole> = HashMap::default();
 600        for membership in channel_memberships.iter() {
 601            roles_by_channel_id.insert(membership.channel_id, membership.role);
 602        }
 603
 604        let mut visible_channel_ids: HashSet<ChannelId> = HashSet::default();
 605
 606        let channels: Vec<Channel> = descendants
 607            .into_iter()
 608            .filter_map(|channel| {
 609                let parent_role = channel
 610                    .parent_id()
 611                    .and_then(|parent_id| roles_by_channel_id.get(&parent_id));
 612
 613                let role = if let Some(parent_role) = parent_role {
 614                    let role = if let Some(existing_role) = roles_by_channel_id.get(&channel.id) {
 615                        existing_role.max(*parent_role)
 616                    } else {
 617                        *parent_role
 618                    };
 619                    roles_by_channel_id.insert(channel.id, role);
 620                    role
 621                } else {
 622                    *roles_by_channel_id.get(&channel.id)?
 623                };
 624
 625                let can_see_parent_paths = role.can_see_all_descendants()
 626                    || role.can_only_see_public_descendants()
 627                        && channel.visibility == ChannelVisibility::Public;
 628                if !can_see_parent_paths {
 629                    return None;
 630                }
 631
 632                visible_channel_ids.insert(channel.id);
 633
 634                if let Some(ancestor) = ancestor_channel {
 635                    if !channel
 636                        .ancestors_including_self()
 637                        .any(|id| id == ancestor.id)
 638                    {
 639                        return None;
 640                    }
 641                }
 642
 643                let mut channel = Channel::from_model(channel, role);
 644                channel
 645                    .parent_path
 646                    .retain(|id| visible_channel_ids.contains(&id));
 647
 648                Some(channel)
 649            })
 650            .collect();
 651
 652        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 653        enum QueryUserIdsAndChannelIds {
 654            ChannelId,
 655            UserId,
 656        }
 657
 658        let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
 659        {
 660            let mut rows = room_participant::Entity::find()
 661                .inner_join(room::Entity)
 662                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
 663                .select_only()
 664                .column(room::Column::ChannelId)
 665                .column(room_participant::Column::UserId)
 666                .into_values::<_, QueryUserIdsAndChannelIds>()
 667                .stream(&*tx)
 668                .await?;
 669            while let Some(row) = rows.next().await {
 670                let row: (ChannelId, UserId) = row?;
 671                channel_participants.entry(row.0).or_default().push(row.1)
 672            }
 673        }
 674
 675        let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
 676        let channel_buffer_changes = self
 677            .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
 678            .await?;
 679
 680        let unseen_messages = self
 681            .unseen_channel_messages(user_id, &channel_ids, &*tx)
 682            .await?;
 683
 684        Ok(ChannelsForUser {
 685            channels,
 686            channel_participants,
 687            unseen_buffer_changes: channel_buffer_changes,
 688            channel_messages: unseen_messages,
 689        })
 690    }
 691
 692    async fn participants_to_notify_for_channel_change(
 693        &self,
 694        new_parent: &channel::Model,
 695        tx: &DatabaseTransaction,
 696    ) -> Result<Vec<(UserId, ChannelsForUser)>> {
 697        let mut results: Vec<(UserId, ChannelsForUser)> = Vec::new();
 698
 699        let members = self
 700            .get_channel_participant_details_internal(new_parent, &*tx)
 701            .await?;
 702
 703        for member in members.iter() {
 704            if !member.role.can_see_all_descendants() {
 705                continue;
 706            }
 707            results.push((
 708                member.user_id,
 709                self.get_user_channels(member.user_id, Some(new_parent), &*tx)
 710                    .await?,
 711            ))
 712        }
 713
 714        let public_parents = self
 715            .public_ancestors_including_self(new_parent, &*tx)
 716            .await?;
 717        let public_parent = public_parents.last();
 718
 719        let Some(public_parent) = public_parent else {
 720            return Ok(results);
 721        };
 722
 723        // could save some time in the common case by skipping this if the
 724        // new channel is not public and has no public descendants.
 725        let public_members = if public_parent == new_parent {
 726            members
 727        } else {
 728            self.get_channel_participant_details_internal(public_parent, &*tx)
 729                .await?
 730        };
 731
 732        for member in public_members {
 733            if !member.role.can_only_see_public_descendants() {
 734                continue;
 735            };
 736            results.push((
 737                member.user_id,
 738                self.get_user_channels(member.user_id, Some(public_parent), &*tx)
 739                    .await?,
 740            ))
 741        }
 742
 743        Ok(results)
 744    }
 745
 746    pub async fn set_channel_member_role(
 747        &self,
 748        channel_id: ChannelId,
 749        admin_id: UserId,
 750        for_user: UserId,
 751        role: ChannelRole,
 752    ) -> Result<SetMemberRoleResult> {
 753        self.transaction(|tx| async move {
 754            let channel = self.get_channel_internal(channel_id, &*tx).await?;
 755            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
 756                .await?;
 757
 758            let membership = channel_member::Entity::find()
 759                .filter(
 760                    channel_member::Column::ChannelId
 761                        .eq(channel_id)
 762                        .and(channel_member::Column::UserId.eq(for_user)),
 763                )
 764                .one(&*tx)
 765                .await?;
 766
 767            let Some(membership) = membership else {
 768                Err(anyhow!("no such member"))?
 769            };
 770
 771            let mut update = membership.into_active_model();
 772            update.role = ActiveValue::Set(role);
 773            let updated = channel_member::Entity::update(update).exec(&*tx).await?;
 774
 775            if updated.accepted {
 776                Ok(SetMemberRoleResult::MembershipUpdated(
 777                    self.calculate_membership_updated(&channel, for_user, &*tx)
 778                        .await?,
 779                ))
 780            } else {
 781                Ok(SetMemberRoleResult::InviteUpdated(Channel::from_model(
 782                    channel, role,
 783                )))
 784            }
 785        })
 786        .await
 787    }
 788
 789    pub async fn get_channel_participant_details(
 790        &self,
 791        channel_id: ChannelId,
 792        user_id: UserId,
 793    ) -> Result<Vec<proto::ChannelMember>> {
 794        let (role, members) = self
 795            .transaction(move |tx| async move {
 796                let channel = self.get_channel_internal(channel_id, &*tx).await?;
 797                let role = self
 798                    .check_user_is_channel_participant(&channel, user_id, &*tx)
 799                    .await?;
 800                Ok((
 801                    role,
 802                    self.get_channel_participant_details_internal(&channel, &*tx)
 803                        .await?,
 804                ))
 805            })
 806            .await?;
 807
 808        if role == ChannelRole::Admin {
 809            Ok(members
 810                .into_iter()
 811                .map(|channel_member| channel_member.to_proto())
 812                .collect())
 813        } else {
 814            return Ok(members
 815                .into_iter()
 816                .filter_map(|member| {
 817                    if member.kind == proto::channel_member::Kind::Invitee {
 818                        return None;
 819                    }
 820                    Some(ChannelMember {
 821                        role: member.role,
 822                        user_id: member.user_id,
 823                        kind: proto::channel_member::Kind::Member,
 824                    })
 825                })
 826                .map(|channel_member| channel_member.to_proto())
 827                .collect());
 828        }
 829    }
 830
 831    async fn get_channel_participant_details_internal(
 832        &self,
 833        channel: &channel::Model,
 834        tx: &DatabaseTransaction,
 835    ) -> Result<Vec<ChannelMember>> {
 836        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 837        enum QueryMemberDetails {
 838            UserId,
 839            Role,
 840            IsDirectMember,
 841            Accepted,
 842            Visibility,
 843        }
 844
 845        let mut stream = channel_member::Entity::find()
 846            .left_join(channel::Entity)
 847            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 848            .select_only()
 849            .column(channel_member::Column::UserId)
 850            .column(channel_member::Column::Role)
 851            .column_as(
 852                channel_member::Column::ChannelId.eq(channel.id),
 853                QueryMemberDetails::IsDirectMember,
 854            )
 855            .column(channel_member::Column::Accepted)
 856            .column(channel::Column::Visibility)
 857            .into_values::<_, QueryMemberDetails>()
 858            .stream(&*tx)
 859            .await?;
 860
 861        let mut user_details: HashMap<UserId, ChannelMember> = HashMap::default();
 862
 863        while let Some(user_membership) = stream.next().await {
 864            let (user_id, channel_role, is_direct_member, is_invite_accepted, visibility): (
 865                UserId,
 866                ChannelRole,
 867                bool,
 868                bool,
 869                ChannelVisibility,
 870            ) = user_membership?;
 871            let kind = match (is_direct_member, is_invite_accepted) {
 872                (true, true) => proto::channel_member::Kind::Member,
 873                (true, false) => proto::channel_member::Kind::Invitee,
 874                (false, true) => proto::channel_member::Kind::AncestorMember,
 875                (false, false) => continue,
 876            };
 877
 878            if channel_role == ChannelRole::Guest
 879                && visibility != ChannelVisibility::Public
 880                && channel.visibility != ChannelVisibility::Public
 881            {
 882                continue;
 883            }
 884
 885            if let Some(details_mut) = user_details.get_mut(&user_id) {
 886                if channel_role.should_override(details_mut.role) {
 887                    details_mut.role = channel_role;
 888                }
 889                if kind == Kind::Member {
 890                    details_mut.kind = kind;
 891                // the UI is going to be a bit confusing if you already have permissions
 892                // that are greater than or equal to the ones you're being invited to.
 893                } else if kind == Kind::Invitee && details_mut.kind == Kind::AncestorMember {
 894                    details_mut.kind = kind;
 895                }
 896            } else {
 897                user_details.insert(
 898                    user_id,
 899                    ChannelMember {
 900                        user_id,
 901                        kind,
 902                        role: channel_role,
 903                    },
 904                );
 905            }
 906        }
 907
 908        Ok(user_details
 909            .into_iter()
 910            .map(|(_, details)| details)
 911            .collect())
 912    }
 913
 914    pub async fn get_channel_participants(
 915        &self,
 916        channel: &channel::Model,
 917        tx: &DatabaseTransaction,
 918    ) -> Result<Vec<UserId>> {
 919        let participants = self
 920            .get_channel_participant_details_internal(channel, &*tx)
 921            .await?;
 922        Ok(participants
 923            .into_iter()
 924            .map(|member| member.user_id)
 925            .collect())
 926    }
 927
 928    pub async fn check_user_is_channel_admin(
 929        &self,
 930        channel: &channel::Model,
 931        user_id: UserId,
 932        tx: &DatabaseTransaction,
 933    ) -> Result<ChannelRole> {
 934        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 935        match role {
 936            Some(ChannelRole::Admin) => Ok(role.unwrap()),
 937            Some(ChannelRole::Member)
 938            | Some(ChannelRole::Banned)
 939            | Some(ChannelRole::Guest)
 940            | None => Err(anyhow!(
 941                "user is not a channel admin or channel does not exist"
 942            ))?,
 943        }
 944    }
 945
 946    pub async fn check_user_is_channel_member(
 947        &self,
 948        channel: &channel::Model,
 949        user_id: UserId,
 950        tx: &DatabaseTransaction,
 951    ) -> Result<ChannelRole> {
 952        let channel_role = self.channel_role_for_user(channel, user_id, tx).await?;
 953        match channel_role {
 954            Some(ChannelRole::Admin) | Some(ChannelRole::Member) => Ok(channel_role.unwrap()),
 955            Some(ChannelRole::Banned) | Some(ChannelRole::Guest) | None => Err(anyhow!(
 956                "user is not a channel member or channel does not exist"
 957            ))?,
 958        }
 959    }
 960
 961    pub async fn check_user_is_channel_participant(
 962        &self,
 963        channel: &channel::Model,
 964        user_id: UserId,
 965        tx: &DatabaseTransaction,
 966    ) -> Result<ChannelRole> {
 967        let role = self.channel_role_for_user(channel, user_id, tx).await?;
 968        match role {
 969            Some(ChannelRole::Admin) | Some(ChannelRole::Member) | Some(ChannelRole::Guest) => {
 970                Ok(role.unwrap())
 971            }
 972            Some(ChannelRole::Banned) | None => Err(anyhow!(
 973                "user is not a channel participant or channel does not exist"
 974            ))?,
 975        }
 976    }
 977
 978    pub async fn pending_invite_for_channel(
 979        &self,
 980        channel: &channel::Model,
 981        user_id: UserId,
 982        tx: &DatabaseTransaction,
 983    ) -> Result<Option<channel_member::Model>> {
 984        let row = channel_member::Entity::find()
 985            .filter(channel_member::Column::ChannelId.is_in(channel.ancestors_including_self()))
 986            .filter(channel_member::Column::UserId.eq(user_id))
 987            .filter(channel_member::Column::Accepted.eq(false))
 988            .one(&*tx)
 989            .await?;
 990
 991        Ok(row)
 992    }
 993
 994    pub async fn public_parent_channel(
 995        &self,
 996        channel: &channel::Model,
 997        tx: &DatabaseTransaction,
 998    ) -> Result<Option<channel::Model>> {
 999        let mut path = self.public_ancestors_including_self(channel, &*tx).await?;
1000        if path.last().unwrap().id == channel.id {
1001            path.pop();
1002        }
1003        Ok(path.pop())
1004    }
1005
1006    pub async fn public_ancestors_including_self(
1007        &self,
1008        channel: &channel::Model,
1009        tx: &DatabaseTransaction,
1010    ) -> Result<Vec<channel::Model>> {
1011        let visible_channels = channel::Entity::find()
1012            .filter(channel::Column::Id.is_in(channel.ancestors_including_self()))
1013            .filter(channel::Column::Visibility.eq(ChannelVisibility::Public))
1014            .order_by_asc(channel::Column::ParentPath)
1015            .all(&*tx)
1016            .await?;
1017
1018        Ok(visible_channels)
1019    }
1020
1021    pub async fn channel_role_for_user(
1022        &self,
1023        channel: &channel::Model,
1024        user_id: UserId,
1025        tx: &DatabaseTransaction,
1026    ) -> Result<Option<ChannelRole>> {
1027        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1028        enum QueryChannelMembership {
1029            ChannelId,
1030            Role,
1031            Visibility,
1032        }
1033
1034        let mut rows = channel_member::Entity::find()
1035            .left_join(channel::Entity)
1036            .filter(
1037                channel_member::Column::ChannelId
1038                    .is_in(channel.ancestors_including_self())
1039                    .and(channel_member::Column::UserId.eq(user_id))
1040                    .and(channel_member::Column::Accepted.eq(true)),
1041            )
1042            .select_only()
1043            .column(channel_member::Column::ChannelId)
1044            .column(channel_member::Column::Role)
1045            .column(channel::Column::Visibility)
1046            .into_values::<_, QueryChannelMembership>()
1047            .stream(&*tx)
1048            .await?;
1049
1050        let mut user_role: Option<ChannelRole> = None;
1051
1052        let mut is_participant = false;
1053        let mut current_channel_visibility = None;
1054
1055        // note these channels are not iterated in any particular order,
1056        // our current logic takes the highest permission available.
1057        while let Some(row) = rows.next().await {
1058            let (membership_channel, role, visibility): (
1059                ChannelId,
1060                ChannelRole,
1061                ChannelVisibility,
1062            ) = row?;
1063
1064            match role {
1065                ChannelRole::Admin | ChannelRole::Member | ChannelRole::Banned => {
1066                    if let Some(users_role) = user_role {
1067                        user_role = Some(users_role.max(role));
1068                    } else {
1069                        user_role = Some(role)
1070                    }
1071                }
1072                ChannelRole::Guest if visibility == ChannelVisibility::Public => {
1073                    is_participant = true
1074                }
1075                ChannelRole::Guest => {}
1076            }
1077            if channel.id == membership_channel {
1078                current_channel_visibility = Some(visibility);
1079            }
1080        }
1081        // free up database connection
1082        drop(rows);
1083
1084        if is_participant && user_role.is_none() {
1085            if current_channel_visibility.is_none() {
1086                current_channel_visibility = channel::Entity::find()
1087                    .filter(channel::Column::Id.eq(channel.id))
1088                    .one(&*tx)
1089                    .await?
1090                    .map(|channel| channel.visibility);
1091            }
1092            if current_channel_visibility == Some(ChannelVisibility::Public) {
1093                user_role = Some(ChannelRole::Guest);
1094            }
1095        }
1096
1097        Ok(user_role)
1098    }
1099
1100    // Get the descendants of the given set if channels, ordered by their
1101    // path.
1102    async fn get_channel_descendants_including_self(
1103        &self,
1104        channel_ids: impl IntoIterator<Item = ChannelId>,
1105        tx: &DatabaseTransaction,
1106    ) -> Result<Vec<channel::Model>> {
1107        let mut values = String::new();
1108        for id in channel_ids {
1109            if !values.is_empty() {
1110                values.push_str(", ");
1111            }
1112            write!(&mut values, "({})", id).unwrap();
1113        }
1114
1115        if values.is_empty() {
1116            return Ok(vec![]);
1117        }
1118
1119        let sql = format!(
1120            r#"
1121            SELECT DISTINCT
1122                descendant_channels.*,
1123                descendant_channels.parent_path || descendant_channels.id as full_path
1124            FROM
1125                channels parent_channels, channels descendant_channels
1126            WHERE
1127                descendant_channels.id IN ({values}) OR
1128                (
1129                    parent_channels.id IN ({values}) AND
1130                    descendant_channels.parent_path LIKE (parent_channels.parent_path || parent_channels.id || '/%')
1131                )
1132            ORDER BY
1133                full_path ASC
1134            "#
1135        );
1136
1137        Ok(channel::Entity::find()
1138            .from_raw_sql(Statement::from_string(
1139                self.pool.get_database_backend(),
1140                sql,
1141            ))
1142            .all(tx)
1143            .await?)
1144    }
1145
1146    /// Returns the channel with the given ID
1147    pub async fn get_channel(&self, channel_id: ChannelId, user_id: UserId) -> Result<Channel> {
1148        self.transaction(|tx| async move {
1149            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1150            let role = self
1151                .check_user_is_channel_participant(&channel, user_id, &*tx)
1152                .await?;
1153
1154            Ok(Channel::from_model(channel, role))
1155        })
1156        .await
1157    }
1158
1159    pub async fn get_channel_internal(
1160        &self,
1161        channel_id: ChannelId,
1162        tx: &DatabaseTransaction,
1163    ) -> Result<channel::Model> {
1164        Ok(channel::Entity::find_by_id(channel_id)
1165            .one(&*tx)
1166            .await?
1167            .ok_or_else(|| anyhow!("no such channel"))?)
1168    }
1169
1170    pub(crate) async fn get_or_create_channel_room(
1171        &self,
1172        channel_id: ChannelId,
1173        live_kit_room: &str,
1174        environment: &str,
1175        tx: &DatabaseTransaction,
1176    ) -> Result<RoomId> {
1177        let room = room::Entity::find()
1178            .filter(room::Column::ChannelId.eq(channel_id))
1179            .one(&*tx)
1180            .await?;
1181
1182        let room_id = if let Some(room) = room {
1183            if let Some(env) = room.enviroment {
1184                if &env != environment {
1185                    Err(anyhow!("must join using the {} release", env))?;
1186                }
1187            }
1188            room.id
1189        } else {
1190            let result = room::Entity::insert(room::ActiveModel {
1191                channel_id: ActiveValue::Set(Some(channel_id)),
1192                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
1193                enviroment: ActiveValue::Set(Some(environment.to_string())),
1194                ..Default::default()
1195            })
1196            .exec(&*tx)
1197            .await?;
1198
1199            result.last_insert_id
1200        };
1201
1202        Ok(room_id)
1203    }
1204
1205    /// Move a channel from one parent to another
1206    pub async fn move_channel(
1207        &self,
1208        channel_id: ChannelId,
1209        new_parent_id: Option<ChannelId>,
1210        admin_id: UserId,
1211    ) -> Result<Option<MoveChannelResult>> {
1212        self.transaction(|tx| async move {
1213            let channel = self.get_channel_internal(channel_id, &*tx).await?;
1214            self.check_user_is_channel_admin(&channel, admin_id, &*tx)
1215                .await?;
1216
1217            let new_parent_path;
1218            let new_parent_channel;
1219            if let Some(new_parent_id) = new_parent_id {
1220                let new_parent = self.get_channel_internal(new_parent_id, &*tx).await?;
1221                self.check_user_is_channel_admin(&new_parent, admin_id, &*tx)
1222                    .await?;
1223
1224                if new_parent
1225                    .ancestors_including_self()
1226                    .any(|id| id == channel.id)
1227                {
1228                    Err(anyhow!("cannot move a channel into one of its descendants"))?;
1229                }
1230
1231                new_parent_path = new_parent.path();
1232                new_parent_channel = Some(new_parent);
1233            } else {
1234                new_parent_path = String::new();
1235                new_parent_channel = None;
1236            };
1237
1238            let previous_participants = self
1239                .get_channel_participant_details_internal(&channel, &*tx)
1240                .await?;
1241
1242            let old_path = format!("{}{}/", channel.parent_path, channel.id);
1243            let new_path = format!("{}{}/", new_parent_path, channel.id);
1244
1245            if old_path == new_path {
1246                return Ok(None);
1247            }
1248
1249            let mut model = channel.into_active_model();
1250            model.parent_path = ActiveValue::Set(new_parent_path);
1251            let channel = model.update(&*tx).await?;
1252
1253            if new_parent_channel.is_none() {
1254                channel_member::ActiveModel {
1255                    id: ActiveValue::NotSet,
1256                    channel_id: ActiveValue::Set(channel_id),
1257                    user_id: ActiveValue::Set(admin_id),
1258                    accepted: ActiveValue::Set(true),
1259                    role: ActiveValue::Set(ChannelRole::Admin),
1260                }
1261                .insert(&*tx)
1262                .await?;
1263            }
1264
1265            let descendent_ids =
1266                ChannelId::find_by_statement::<QueryIds>(Statement::from_sql_and_values(
1267                    self.pool.get_database_backend(),
1268                    "
1269                    UPDATE channels SET parent_path = REPLACE(parent_path, $1, $2)
1270                    WHERE parent_path LIKE $3 || '%'
1271                    RETURNING id
1272                ",
1273                    [old_path.clone().into(), new_path.into(), old_path.into()],
1274                ))
1275                .all(&*tx)
1276                .await?;
1277
1278            let participants_to_update: HashMap<_, _> = self
1279                .participants_to_notify_for_channel_change(
1280                    new_parent_channel.as_ref().unwrap_or(&channel),
1281                    &*tx,
1282                )
1283                .await?
1284                .into_iter()
1285                .collect();
1286
1287            let mut moved_channels: HashSet<ChannelId> = HashSet::default();
1288            for id in descendent_ids {
1289                moved_channels.insert(id);
1290            }
1291            moved_channels.insert(channel_id);
1292
1293            let mut participants_to_remove: HashSet<UserId> = HashSet::default();
1294            for participant in previous_participants {
1295                if participant.kind == proto::channel_member::Kind::AncestorMember {
1296                    if !participants_to_update.contains_key(&participant.user_id) {
1297                        participants_to_remove.insert(participant.user_id);
1298                    }
1299                }
1300            }
1301
1302            Ok(Some(MoveChannelResult {
1303                participants_to_remove,
1304                participants_to_update,
1305                moved_channels,
1306            }))
1307        })
1308        .await
1309    }
1310}
1311
1312#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1313enum QueryIds {
1314    Id,
1315}
1316
1317#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1318enum QueryUserIds {
1319    UserId,
1320}