projects.rs

   1use anyhow::Context as _;
   2use util::ResultExt;
   3
   4use super::*;
   5
   6impl Database {
   7    /// Returns the count of all projects, excluding ones marked as admin.
   8    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   9        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  10        enum QueryAs {
  11            Count,
  12        }
  13
  14        self.transaction(|tx| async move {
  15            Ok(project::Entity::find()
  16                .select_only()
  17                .column_as(project::Column::Id.count(), QueryAs::Count)
  18                .inner_join(user::Entity)
  19                .filter(user::Column::Admin.eq(false))
  20                .into_values::<_, QueryAs>()
  21                .one(&*tx)
  22                .await?
  23                .unwrap_or(0i64) as usize)
  24        })
  25        .await
  26    }
  27
  28    /// Shares a project with the given room.
  29    pub async fn share_project(
  30        &self,
  31        room_id: RoomId,
  32        connection: ConnectionId,
  33        worktrees: &[proto::WorktreeMetadata],
  34        is_ssh_project: bool,
  35        dev_server_project_id: Option<DevServerProjectId>,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            if let Some(dev_server_project_id) = dev_server_project_id {
  65                let project = project::Entity::find()
  66                    .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
  67                    .one(&*tx)
  68                    .await?
  69                    .ok_or_else(|| anyhow!("no remote project"))?;
  70
  71                let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
  72                    .find_also_related(dev_server::Entity)
  73                    .one(&*tx)
  74                    .await?
  75                    .ok_or_else(|| anyhow!("no dev_server_project"))?;
  76
  77                if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
  78                    return Err(anyhow!("not your dev server"))?;
  79                }
  80
  81                if project.room_id.is_some() {
  82                    return Err(anyhow!("project already shared"))?;
  83                };
  84
  85                let project = project::Entity::update(project::ActiveModel {
  86                    room_id: ActiveValue::Set(Some(room_id)),
  87                    ..project.into_active_model()
  88                })
  89                .exec(&*tx)
  90                .await?;
  91
  92                let room = self.get_room(room_id, &tx).await?;
  93                return Ok((project.id, room));
  94            }
  95
  96            let project = project::ActiveModel {
  97                room_id: ActiveValue::set(Some(participant.room_id)),
  98                host_user_id: ActiveValue::set(Some(participant.user_id)),
  99                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 100                host_connection_server_id: ActiveValue::set(Some(ServerId(
 101                    connection.owner_id as i32,
 102                ))),
 103                id: ActiveValue::NotSet,
 104                hosted_project_id: ActiveValue::Set(None),
 105                dev_server_project_id: ActiveValue::Set(None),
 106            }
 107            .insert(&*tx)
 108            .await?;
 109
 110            if !worktrees.is_empty() {
 111                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
 112                    worktree::ActiveModel {
 113                        id: ActiveValue::set(worktree.id as i64),
 114                        project_id: ActiveValue::set(project.id),
 115                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
 116                        root_name: ActiveValue::set(worktree.root_name.clone()),
 117                        visible: ActiveValue::set(worktree.visible),
 118                        scan_id: ActiveValue::set(0),
 119                        completed_scan_id: ActiveValue::set(0),
 120                    }
 121                }))
 122                .exec(&*tx)
 123                .await?;
 124            }
 125
 126            let replica_id = if is_ssh_project { 1 } else { 0 };
 127
 128            project_collaborator::ActiveModel {
 129                project_id: ActiveValue::set(project.id),
 130                connection_id: ActiveValue::set(connection.id as i32),
 131                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 132                user_id: ActiveValue::set(participant.user_id),
 133                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 134                is_host: ActiveValue::set(true),
 135                ..Default::default()
 136            }
 137            .insert(&*tx)
 138            .await?;
 139
 140            let room = self.get_room(room_id, &tx).await?;
 141            Ok((project.id, room))
 142        })
 143        .await
 144    }
 145
 146    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 147        self.weak_transaction(|tx| async move {
 148            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 149            Ok(())
 150        })
 151        .await
 152    }
 153
 154    /// Unshares the given project.
 155    pub async fn unshare_project(
 156        &self,
 157        project_id: ProjectId,
 158        connection: ConnectionId,
 159        user_id: Option<UserId>,
 160    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 161        self.project_transaction(project_id, |tx| async move {
 162            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 163            let project = project::Entity::find_by_id(project_id)
 164                .one(&*tx)
 165                .await?
 166                .ok_or_else(|| anyhow!("project not found"))?;
 167            let room = if let Some(room_id) = project.room_id {
 168                Some(self.get_room(room_id, &tx).await?)
 169            } else {
 170                None
 171            };
 172            if project.host_connection()? == connection {
 173                return Ok((true, room, guest_connection_ids));
 174            }
 175            if let Some(dev_server_project_id) = project.dev_server_project_id {
 176                if let Some(user_id) = user_id {
 177                    if user_id
 178                        != self
 179                            .owner_for_dev_server_project(dev_server_project_id, &tx)
 180                            .await?
 181                    {
 182                        Err(anyhow!("cannot unshare a project hosted by another user"))?
 183                    }
 184                    project::Entity::update(project::ActiveModel {
 185                        room_id: ActiveValue::Set(None),
 186                        ..project.into_active_model()
 187                    })
 188                    .exec(&*tx)
 189                    .await?;
 190                    return Ok((false, room, guest_connection_ids));
 191                }
 192            }
 193
 194            Err(anyhow!("cannot unshare a project hosted by another user"))?
 195        })
 196        .await
 197    }
 198
 199    /// Updates the worktrees associated with the given project.
 200    pub async fn update_project(
 201        &self,
 202        project_id: ProjectId,
 203        connection: ConnectionId,
 204        worktrees: &[proto::WorktreeMetadata],
 205    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 206        self.project_transaction(project_id, |tx| async move {
 207            let project = project::Entity::find_by_id(project_id)
 208                .filter(
 209                    Condition::all()
 210                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 211                        .add(
 212                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 213                        ),
 214                )
 215                .one(&*tx)
 216                .await?
 217                .ok_or_else(|| anyhow!("no such project"))?;
 218
 219            self.update_project_worktrees(project.id, worktrees, &tx)
 220                .await?;
 221
 222            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 223
 224            let room = if let Some(room_id) = project.room_id {
 225                Some(self.get_room(room_id, &tx).await?)
 226            } else {
 227                None
 228            };
 229
 230            Ok((room, guest_connection_ids))
 231        })
 232        .await
 233    }
 234
 235    pub(in crate::db) async fn update_project_worktrees(
 236        &self,
 237        project_id: ProjectId,
 238        worktrees: &[proto::WorktreeMetadata],
 239        tx: &DatabaseTransaction,
 240    ) -> Result<()> {
 241        if !worktrees.is_empty() {
 242            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 243                id: ActiveValue::set(worktree.id as i64),
 244                project_id: ActiveValue::set(project_id),
 245                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 246                root_name: ActiveValue::set(worktree.root_name.clone()),
 247                visible: ActiveValue::set(worktree.visible),
 248                scan_id: ActiveValue::set(0),
 249                completed_scan_id: ActiveValue::set(0),
 250            }))
 251            .on_conflict(
 252                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 253                    .update_column(worktree::Column::RootName)
 254                    .to_owned(),
 255            )
 256            .exec(tx)
 257            .await?;
 258        }
 259
 260        worktree::Entity::delete_many()
 261            .filter(worktree::Column::ProjectId.eq(project_id).and(
 262                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 263            ))
 264            .exec(tx)
 265            .await?;
 266
 267        Ok(())
 268    }
 269
 270    pub async fn update_worktree(
 271        &self,
 272        update: &proto::UpdateWorktree,
 273        connection: ConnectionId,
 274    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 275        let project_id = ProjectId::from_proto(update.project_id);
 276        let worktree_id = update.worktree_id as i64;
 277        self.project_transaction(project_id, |tx| async move {
 278            // Ensure the update comes from the host.
 279            let _project = project::Entity::find_by_id(project_id)
 280                .filter(
 281                    Condition::all()
 282                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 283                        .add(
 284                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 285                        ),
 286                )
 287                .one(&*tx)
 288                .await?
 289                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 290
 291            // Update metadata.
 292            worktree::Entity::update(worktree::ActiveModel {
 293                id: ActiveValue::set(worktree_id),
 294                project_id: ActiveValue::set(project_id),
 295                root_name: ActiveValue::set(update.root_name.clone()),
 296                scan_id: ActiveValue::set(update.scan_id as i64),
 297                completed_scan_id: if update.is_last_update {
 298                    ActiveValue::set(update.scan_id as i64)
 299                } else {
 300                    ActiveValue::default()
 301                },
 302                abs_path: ActiveValue::set(update.abs_path.clone()),
 303                ..Default::default()
 304            })
 305            .exec(&*tx)
 306            .await?;
 307
 308            if !update.updated_entries.is_empty() {
 309                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 310                    let mtime = entry.mtime.clone().unwrap_or_default();
 311                    worktree_entry::ActiveModel {
 312                        project_id: ActiveValue::set(project_id),
 313                        worktree_id: ActiveValue::set(worktree_id),
 314                        id: ActiveValue::set(entry.id as i64),
 315                        is_dir: ActiveValue::set(entry.is_dir),
 316                        path: ActiveValue::set(entry.path.clone()),
 317                        inode: ActiveValue::set(entry.inode as i64),
 318                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 319                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 320                        is_symlink: ActiveValue::set(entry.is_symlink),
 321                        is_ignored: ActiveValue::set(entry.is_ignored),
 322                        is_external: ActiveValue::set(entry.is_external),
 323                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 324                        is_deleted: ActiveValue::set(false),
 325                        scan_id: ActiveValue::set(update.scan_id as i64),
 326                        is_fifo: ActiveValue::set(entry.is_fifo),
 327                    }
 328                }))
 329                .on_conflict(
 330                    OnConflict::columns([
 331                        worktree_entry::Column::ProjectId,
 332                        worktree_entry::Column::WorktreeId,
 333                        worktree_entry::Column::Id,
 334                    ])
 335                    .update_columns([
 336                        worktree_entry::Column::IsDir,
 337                        worktree_entry::Column::Path,
 338                        worktree_entry::Column::Inode,
 339                        worktree_entry::Column::MtimeSeconds,
 340                        worktree_entry::Column::MtimeNanos,
 341                        worktree_entry::Column::IsSymlink,
 342                        worktree_entry::Column::IsIgnored,
 343                        worktree_entry::Column::GitStatus,
 344                        worktree_entry::Column::ScanId,
 345                    ])
 346                    .to_owned(),
 347                )
 348                .exec(&*tx)
 349                .await?;
 350            }
 351
 352            if !update.removed_entries.is_empty() {
 353                worktree_entry::Entity::update_many()
 354                    .filter(
 355                        worktree_entry::Column::ProjectId
 356                            .eq(project_id)
 357                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 358                            .and(
 359                                worktree_entry::Column::Id
 360                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 361                            ),
 362                    )
 363                    .set(worktree_entry::ActiveModel {
 364                        is_deleted: ActiveValue::Set(true),
 365                        scan_id: ActiveValue::Set(update.scan_id as i64),
 366                        ..Default::default()
 367                    })
 368                    .exec(&*tx)
 369                    .await?;
 370            }
 371
 372            if !update.updated_repositories.is_empty() {
 373                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 374                    |repository| worktree_repository::ActiveModel {
 375                        project_id: ActiveValue::set(project_id),
 376                        worktree_id: ActiveValue::set(worktree_id),
 377                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 378                        scan_id: ActiveValue::set(update.scan_id as i64),
 379                        branch: ActiveValue::set(repository.branch.clone()),
 380                        is_deleted: ActiveValue::set(false),
 381                    },
 382                ))
 383                .on_conflict(
 384                    OnConflict::columns([
 385                        worktree_repository::Column::ProjectId,
 386                        worktree_repository::Column::WorktreeId,
 387                        worktree_repository::Column::WorkDirectoryId,
 388                    ])
 389                    .update_columns([
 390                        worktree_repository::Column::ScanId,
 391                        worktree_repository::Column::Branch,
 392                    ])
 393                    .to_owned(),
 394                )
 395                .exec(&*tx)
 396                .await?;
 397            }
 398
 399            if !update.removed_repositories.is_empty() {
 400                worktree_repository::Entity::update_many()
 401                    .filter(
 402                        worktree_repository::Column::ProjectId
 403                            .eq(project_id)
 404                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 405                            .and(
 406                                worktree_repository::Column::WorkDirectoryId
 407                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 408                            ),
 409                    )
 410                    .set(worktree_repository::ActiveModel {
 411                        is_deleted: ActiveValue::Set(true),
 412                        scan_id: ActiveValue::Set(update.scan_id as i64),
 413                        ..Default::default()
 414                    })
 415                    .exec(&*tx)
 416                    .await?;
 417            }
 418
 419            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 420            Ok(connection_ids)
 421        })
 422        .await
 423    }
 424
 425    /// Updates the diagnostic summary for the given connection.
 426    pub async fn update_diagnostic_summary(
 427        &self,
 428        update: &proto::UpdateDiagnosticSummary,
 429        connection: ConnectionId,
 430    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 431        let project_id = ProjectId::from_proto(update.project_id);
 432        let worktree_id = update.worktree_id as i64;
 433        self.project_transaction(project_id, |tx| async move {
 434            let summary = update
 435                .summary
 436                .as_ref()
 437                .ok_or_else(|| anyhow!("invalid summary"))?;
 438
 439            // Ensure the update comes from the host.
 440            let project = project::Entity::find_by_id(project_id)
 441                .one(&*tx)
 442                .await?
 443                .ok_or_else(|| anyhow!("no such project"))?;
 444            if project.host_connection()? != connection {
 445                return Err(anyhow!("can't update a project hosted by someone else"))?;
 446            }
 447
 448            // Update summary.
 449            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 450                project_id: ActiveValue::set(project_id),
 451                worktree_id: ActiveValue::set(worktree_id),
 452                path: ActiveValue::set(summary.path.clone()),
 453                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 454                error_count: ActiveValue::set(summary.error_count as i32),
 455                warning_count: ActiveValue::set(summary.warning_count as i32),
 456            })
 457            .on_conflict(
 458                OnConflict::columns([
 459                    worktree_diagnostic_summary::Column::ProjectId,
 460                    worktree_diagnostic_summary::Column::WorktreeId,
 461                    worktree_diagnostic_summary::Column::Path,
 462                ])
 463                .update_columns([
 464                    worktree_diagnostic_summary::Column::LanguageServerId,
 465                    worktree_diagnostic_summary::Column::ErrorCount,
 466                    worktree_diagnostic_summary::Column::WarningCount,
 467                ])
 468                .to_owned(),
 469            )
 470            .exec(&*tx)
 471            .await?;
 472
 473            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 474            Ok(connection_ids)
 475        })
 476        .await
 477    }
 478
 479    /// Starts the language server for the given connection.
 480    pub async fn start_language_server(
 481        &self,
 482        update: &proto::StartLanguageServer,
 483        connection: ConnectionId,
 484    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 485        let project_id = ProjectId::from_proto(update.project_id);
 486        self.project_transaction(project_id, |tx| async move {
 487            let server = update
 488                .server
 489                .as_ref()
 490                .ok_or_else(|| anyhow!("invalid language server"))?;
 491
 492            // Ensure the update comes from the host.
 493            let project = project::Entity::find_by_id(project_id)
 494                .one(&*tx)
 495                .await?
 496                .ok_or_else(|| anyhow!("no such project"))?;
 497            if project.host_connection()? != connection {
 498                return Err(anyhow!("can't update a project hosted by someone else"))?;
 499            }
 500
 501            // Add the newly-started language server.
 502            language_server::Entity::insert(language_server::ActiveModel {
 503                project_id: ActiveValue::set(project_id),
 504                id: ActiveValue::set(server.id as i64),
 505                name: ActiveValue::set(server.name.clone()),
 506            })
 507            .on_conflict(
 508                OnConflict::columns([
 509                    language_server::Column::ProjectId,
 510                    language_server::Column::Id,
 511                ])
 512                .update_column(language_server::Column::Name)
 513                .to_owned(),
 514            )
 515            .exec(&*tx)
 516            .await?;
 517
 518            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 519            Ok(connection_ids)
 520        })
 521        .await
 522    }
 523
 524    /// Updates the worktree settings for the given connection.
 525    pub async fn update_worktree_settings(
 526        &self,
 527        update: &proto::UpdateWorktreeSettings,
 528        connection: ConnectionId,
 529    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 530        let project_id = ProjectId::from_proto(update.project_id);
 531        let kind = match update.kind {
 532            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 533                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 534            None => proto::LocalSettingsKind::Settings,
 535        };
 536        let kind = LocalSettingsKind::from_proto(kind);
 537        self.project_transaction(project_id, |tx| async move {
 538            // Ensure the update comes from the host.
 539            let project = project::Entity::find_by_id(project_id)
 540                .one(&*tx)
 541                .await?
 542                .ok_or_else(|| anyhow!("no such project"))?;
 543            if project.host_connection()? != connection {
 544                return Err(anyhow!("can't update a project hosted by someone else"))?;
 545            }
 546
 547            if let Some(content) = &update.content {
 548                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 549                    project_id: ActiveValue::Set(project_id),
 550                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 551                    path: ActiveValue::Set(update.path.clone()),
 552                    content: ActiveValue::Set(content.clone()),
 553                    kind: ActiveValue::Set(kind),
 554                })
 555                .on_conflict(
 556                    OnConflict::columns([
 557                        worktree_settings_file::Column::ProjectId,
 558                        worktree_settings_file::Column::WorktreeId,
 559                        worktree_settings_file::Column::Path,
 560                    ])
 561                    .update_column(worktree_settings_file::Column::Content)
 562                    .to_owned(),
 563                )
 564                .exec(&*tx)
 565                .await?;
 566            } else {
 567                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 568                    project_id: ActiveValue::Set(project_id),
 569                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 570                    path: ActiveValue::Set(update.path.clone()),
 571                    ..Default::default()
 572                })
 573                .exec(&*tx)
 574                .await?;
 575            }
 576
 577            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 578            Ok(connection_ids)
 579        })
 580        .await
 581    }
 582
 583    /// Adds the given connection to the specified hosted project
 584    pub async fn join_hosted_project(
 585        &self,
 586        id: ProjectId,
 587        user_id: UserId,
 588        connection: ConnectionId,
 589    ) -> Result<(Project, ReplicaId)> {
 590        self.transaction(|tx| async move {
 591            let (project, hosted_project) = project::Entity::find_by_id(id)
 592                .find_also_related(hosted_project::Entity)
 593                .one(&*tx)
 594                .await?
 595                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 596
 597            let Some(hosted_project) = hosted_project else {
 598                return Err(anyhow!("project is not hosted"))?;
 599            };
 600
 601            let channel = channel::Entity::find_by_id(hosted_project.channel_id)
 602                .one(&*tx)
 603                .await?
 604                .ok_or_else(|| anyhow!("no such channel"))?;
 605
 606            let role = self
 607                .check_user_is_channel_participant(&channel, user_id, &tx)
 608                .await?;
 609
 610            self.join_project_internal(project, user_id, connection, role, &tx)
 611                .await
 612        })
 613        .await
 614    }
 615
 616    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 617        self.transaction(|tx| async move {
 618            Ok(project::Entity::find_by_id(id)
 619                .one(&*tx)
 620                .await?
 621                .ok_or_else(|| anyhow!("no such project"))?)
 622        })
 623        .await
 624    }
 625
 626    pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
 627        self.transaction(|tx| async move {
 628            Ok(project::Entity::find()
 629                .filter(project::Column::DevServerProjectId.eq(id))
 630                .one(&*tx)
 631                .await?
 632                .ok_or_else(|| anyhow!("no such project"))?)
 633        })
 634        .await
 635    }
 636
 637    /// Adds the given connection to the specified project
 638    /// in the current room.
 639    pub async fn join_project(
 640        &self,
 641        project_id: ProjectId,
 642        connection: ConnectionId,
 643        user_id: UserId,
 644    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 645        self.project_transaction(project_id, |tx| async move {
 646            let (project, role) = self
 647                .access_project(
 648                    project_id,
 649                    connection,
 650                    PrincipalId::UserId(user_id),
 651                    Capability::ReadOnly,
 652                    &tx,
 653                )
 654                .await?;
 655            self.join_project_internal(project, user_id, connection, role, &tx)
 656                .await
 657        })
 658        .await
 659    }
 660
 661    async fn join_project_internal(
 662        &self,
 663        project: project::Model,
 664        user_id: UserId,
 665        connection: ConnectionId,
 666        role: ChannelRole,
 667        tx: &DatabaseTransaction,
 668    ) -> Result<(Project, ReplicaId)> {
 669        let mut collaborators = project
 670            .find_related(project_collaborator::Entity)
 671            .all(tx)
 672            .await?;
 673        let replica_ids = collaborators
 674            .iter()
 675            .map(|c| c.replica_id)
 676            .collect::<HashSet<_>>();
 677        let mut replica_id = ReplicaId(1);
 678        while replica_ids.contains(&replica_id) {
 679            replica_id.0 += 1;
 680        }
 681        let new_collaborator = project_collaborator::ActiveModel {
 682            project_id: ActiveValue::set(project.id),
 683            connection_id: ActiveValue::set(connection.id as i32),
 684            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 685            user_id: ActiveValue::set(user_id),
 686            replica_id: ActiveValue::set(replica_id),
 687            is_host: ActiveValue::set(false),
 688            ..Default::default()
 689        }
 690        .insert(tx)
 691        .await?;
 692        collaborators.push(new_collaborator);
 693
 694        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 695        let mut worktrees = db_worktrees
 696            .into_iter()
 697            .map(|db_worktree| {
 698                (
 699                    db_worktree.id as u64,
 700                    Worktree {
 701                        id: db_worktree.id as u64,
 702                        abs_path: db_worktree.abs_path,
 703                        root_name: db_worktree.root_name,
 704                        visible: db_worktree.visible,
 705                        entries: Default::default(),
 706                        repository_entries: Default::default(),
 707                        diagnostic_summaries: Default::default(),
 708                        settings_files: Default::default(),
 709                        scan_id: db_worktree.scan_id as u64,
 710                        completed_scan_id: db_worktree.completed_scan_id as u64,
 711                    },
 712                )
 713            })
 714            .collect::<BTreeMap<_, _>>();
 715
 716        // Populate worktree entries.
 717        {
 718            let mut db_entries = worktree_entry::Entity::find()
 719                .filter(
 720                    Condition::all()
 721                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 722                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 723                )
 724                .stream(tx)
 725                .await?;
 726            while let Some(db_entry) = db_entries.next().await {
 727                let db_entry = db_entry?;
 728                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 729                    worktree.entries.push(proto::Entry {
 730                        id: db_entry.id as u64,
 731                        is_dir: db_entry.is_dir,
 732                        path: db_entry.path,
 733                        inode: db_entry.inode as u64,
 734                        mtime: Some(proto::Timestamp {
 735                            seconds: db_entry.mtime_seconds as u64,
 736                            nanos: db_entry.mtime_nanos as u32,
 737                        }),
 738                        is_symlink: db_entry.is_symlink,
 739                        is_ignored: db_entry.is_ignored,
 740                        is_external: db_entry.is_external,
 741                        git_status: db_entry.git_status.map(|status| status as i32),
 742                        // This is only used in the summarization backlog, so if it's None,
 743                        // that just means we won't be able to detect when to resummarize
 744                        // based on total number of backlogged bytes - instead, we'd go
 745                        // on number of files only. That shouldn't be a huge deal in practice.
 746                        size: None,
 747                        is_fifo: db_entry.is_fifo,
 748                    });
 749                }
 750            }
 751        }
 752
 753        // Populate repository entries.
 754        {
 755            let mut db_repository_entries = worktree_repository::Entity::find()
 756                .filter(
 757                    Condition::all()
 758                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 759                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 760                )
 761                .stream(tx)
 762                .await?;
 763            while let Some(db_repository_entry) = db_repository_entries.next().await {
 764                let db_repository_entry = db_repository_entry?;
 765                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 766                {
 767                    worktree.repository_entries.insert(
 768                        db_repository_entry.work_directory_id as u64,
 769                        proto::RepositoryEntry {
 770                            work_directory_id: db_repository_entry.work_directory_id as u64,
 771                            branch: db_repository_entry.branch,
 772                        },
 773                    );
 774                }
 775            }
 776        }
 777
 778        // Populate worktree diagnostic summaries.
 779        {
 780            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 781                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 782                .stream(tx)
 783                .await?;
 784            while let Some(db_summary) = db_summaries.next().await {
 785                let db_summary = db_summary?;
 786                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 787                    worktree
 788                        .diagnostic_summaries
 789                        .push(proto::DiagnosticSummary {
 790                            path: db_summary.path,
 791                            language_server_id: db_summary.language_server_id as u64,
 792                            error_count: db_summary.error_count as u32,
 793                            warning_count: db_summary.warning_count as u32,
 794                        });
 795                }
 796            }
 797        }
 798
 799        // Populate worktree settings files
 800        {
 801            let mut db_settings_files = worktree_settings_file::Entity::find()
 802                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 803                .stream(tx)
 804                .await?;
 805            while let Some(db_settings_file) = db_settings_files.next().await {
 806                let db_settings_file = db_settings_file?;
 807                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 808                    worktree.settings_files.push(WorktreeSettingsFile {
 809                        path: db_settings_file.path,
 810                        content: db_settings_file.content,
 811                        kind: db_settings_file.kind,
 812                    });
 813                }
 814            }
 815        }
 816
 817        // Populate language servers.
 818        let language_servers = project
 819            .find_related(language_server::Entity)
 820            .all(tx)
 821            .await?;
 822
 823        let project = Project {
 824            id: project.id,
 825            role,
 826            collaborators: collaborators
 827                .into_iter()
 828                .map(|collaborator| ProjectCollaborator {
 829                    connection_id: collaborator.connection(),
 830                    user_id: collaborator.user_id,
 831                    replica_id: collaborator.replica_id,
 832                    is_host: collaborator.is_host,
 833                })
 834                .collect(),
 835            worktrees,
 836            language_servers: language_servers
 837                .into_iter()
 838                .map(|language_server| proto::LanguageServer {
 839                    id: language_server.id as u64,
 840                    name: language_server.name,
 841                })
 842                .collect(),
 843            dev_server_project_id: project.dev_server_project_id,
 844        };
 845        Ok((project, replica_id as ReplicaId))
 846    }
 847
 848    pub async fn leave_hosted_project(
 849        &self,
 850        project_id: ProjectId,
 851        connection: ConnectionId,
 852    ) -> Result<LeftProject> {
 853        self.transaction(|tx| async move {
 854            let result = project_collaborator::Entity::delete_many()
 855                .filter(
 856                    Condition::all()
 857                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 858                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 859                        .add(
 860                            project_collaborator::Column::ConnectionServerId
 861                                .eq(connection.owner_id as i32),
 862                        ),
 863                )
 864                .exec(&*tx)
 865                .await?;
 866            if result.rows_affected == 0 {
 867                return Err(anyhow!("not in the project"))?;
 868            }
 869
 870            let project = project::Entity::find_by_id(project_id)
 871                .one(&*tx)
 872                .await?
 873                .ok_or_else(|| anyhow!("no such project"))?;
 874            let collaborators = project
 875                .find_related(project_collaborator::Entity)
 876                .all(&*tx)
 877                .await?;
 878            let connection_ids = collaborators
 879                .into_iter()
 880                .map(|collaborator| collaborator.connection())
 881                .collect();
 882            Ok(LeftProject {
 883                id: project.id,
 884                connection_ids,
 885                should_unshare: false,
 886            })
 887        })
 888        .await
 889    }
 890
 891    /// Removes the given connection from the specified project.
 892    pub async fn leave_project(
 893        &self,
 894        project_id: ProjectId,
 895        connection: ConnectionId,
 896    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 897        self.project_transaction(project_id, |tx| async move {
 898            let result = project_collaborator::Entity::delete_many()
 899                .filter(
 900                    Condition::all()
 901                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 902                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 903                        .add(
 904                            project_collaborator::Column::ConnectionServerId
 905                                .eq(connection.owner_id as i32),
 906                        ),
 907                )
 908                .exec(&*tx)
 909                .await?;
 910            if result.rows_affected == 0 {
 911                Err(anyhow!("not a collaborator on this project"))?;
 912            }
 913
 914            let project = project::Entity::find_by_id(project_id)
 915                .one(&*tx)
 916                .await?
 917                .ok_or_else(|| anyhow!("no such project"))?;
 918            let collaborators = project
 919                .find_related(project_collaborator::Entity)
 920                .all(&*tx)
 921                .await?;
 922            let connection_ids: Vec<ConnectionId> = collaborators
 923                .into_iter()
 924                .map(|collaborator| collaborator.connection())
 925                .collect();
 926
 927            follower::Entity::delete_many()
 928                .filter(
 929                    Condition::any()
 930                        .add(
 931                            Condition::all()
 932                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 933                                .add(
 934                                    follower::Column::LeaderConnectionServerId
 935                                        .eq(connection.owner_id),
 936                                )
 937                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 938                        )
 939                        .add(
 940                            Condition::all()
 941                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 942                                .add(
 943                                    follower::Column::FollowerConnectionServerId
 944                                        .eq(connection.owner_id),
 945                                )
 946                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 947                        ),
 948                )
 949                .exec(&*tx)
 950                .await?;
 951
 952            let room = if let Some(room_id) = project.room_id {
 953                Some(self.get_room(room_id, &tx).await?)
 954            } else {
 955                None
 956            };
 957
 958            let left_project = LeftProject {
 959                id: project_id,
 960                should_unshare: connection == project.host_connection()?,
 961                connection_ids,
 962            };
 963            Ok((room, left_project))
 964        })
 965        .await
 966    }
 967
 968    pub async fn check_user_is_project_host(
 969        &self,
 970        project_id: ProjectId,
 971        connection_id: ConnectionId,
 972    ) -> Result<()> {
 973        self.project_transaction(project_id, |tx| async move {
 974            project::Entity::find()
 975                .filter(
 976                    Condition::all()
 977                        .add(project::Column::Id.eq(project_id))
 978                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 979                        .add(
 980                            project::Column::HostConnectionServerId
 981                                .eq(Some(connection_id.owner_id as i32)),
 982                        ),
 983                )
 984                .one(&*tx)
 985                .await?
 986                .ok_or_else(|| anyhow!("failed to read project host"))?;
 987
 988            Ok(())
 989        })
 990        .await
 991        .map(|guard| guard.into_inner())
 992    }
 993
 994    /// Returns the current project if the given user is authorized to access it with the specified capability.
 995    pub async fn access_project(
 996        &self,
 997        project_id: ProjectId,
 998        connection_id: ConnectionId,
 999        principal_id: PrincipalId,
1000        capability: Capability,
1001        tx: &DatabaseTransaction,
1002    ) -> Result<(project::Model, ChannelRole)> {
1003        let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
1004            .find_also_related(dev_server_project::Entity)
1005            .one(tx)
1006            .await?
1007            .ok_or_else(|| anyhow!("no such project"))?;
1008
1009        let user_id = match principal_id {
1010            PrincipalId::DevServerId(_) => {
1011                if project
1012                    .host_connection()
1013                    .is_ok_and(|connection| connection == connection_id)
1014                {
1015                    return Ok((project, ChannelRole::Admin));
1016                }
1017                return Err(anyhow!("not the project host"))?;
1018            }
1019            PrincipalId::UserId(user_id) => user_id,
1020        };
1021
1022        let role_from_room = if let Some(room_id) = project.room_id {
1023            room_participant::Entity::find()
1024                .filter(room_participant::Column::RoomId.eq(room_id))
1025                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1026                .one(tx)
1027                .await?
1028                .and_then(|participant| participant.role)
1029        } else {
1030            None
1031        };
1032        let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1033            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1034                .one(tx)
1035                .await?
1036                .ok_or_else(|| anyhow!("no such channel"))?;
1037            if user_id == dev_server.user_id {
1038                // If the user left the room "uncleanly" they may rejoin the
1039                // remote project before leave_room runs. IN that case kick
1040                // the project out of the room pre-emptively.
1041                if role_from_room.is_none() {
1042                    project = project::Entity::update(project::ActiveModel {
1043                        room_id: ActiveValue::Set(None),
1044                        ..project.into_active_model()
1045                    })
1046                    .exec(tx)
1047                    .await?;
1048                }
1049                Some(ChannelRole::Admin)
1050            } else {
1051                None
1052            }
1053        } else {
1054            None
1055        };
1056
1057        let role = role_from_dev_server
1058            .or(role_from_room)
1059            .unwrap_or(ChannelRole::Banned);
1060
1061        match capability {
1062            Capability::ReadWrite => {
1063                if !role.can_edit_projects() {
1064                    return Err(anyhow!("not authorized to edit projects"))?;
1065                }
1066            }
1067            Capability::ReadOnly => {
1068                if !role.can_read_projects() {
1069                    return Err(anyhow!("not authorized to read projects"))?;
1070                }
1071            }
1072        }
1073
1074        Ok((project, role))
1075    }
1076
1077    /// Returns the host connection for a read-only request to join a shared project.
1078    pub async fn host_for_read_only_project_request(
1079        &self,
1080        project_id: ProjectId,
1081        connection_id: ConnectionId,
1082        user_id: UserId,
1083    ) -> Result<ConnectionId> {
1084        self.project_transaction(project_id, |tx| async move {
1085            let (project, _) = self
1086                .access_project(
1087                    project_id,
1088                    connection_id,
1089                    PrincipalId::UserId(user_id),
1090                    Capability::ReadOnly,
1091                    &tx,
1092                )
1093                .await?;
1094            project.host_connection()
1095        })
1096        .await
1097        .map(|guard| guard.into_inner())
1098    }
1099
1100    /// Returns the host connection for a request to join a shared project.
1101    pub async fn host_for_mutating_project_request(
1102        &self,
1103        project_id: ProjectId,
1104        connection_id: ConnectionId,
1105        user_id: UserId,
1106    ) -> Result<ConnectionId> {
1107        self.project_transaction(project_id, |tx| async move {
1108            let (project, _) = self
1109                .access_project(
1110                    project_id,
1111                    connection_id,
1112                    PrincipalId::UserId(user_id),
1113                    Capability::ReadWrite,
1114                    &tx,
1115                )
1116                .await?;
1117            project.host_connection()
1118        })
1119        .await
1120        .map(|guard| guard.into_inner())
1121    }
1122
1123    /// Returns the host connection for a request to join a shared project.
1124    pub async fn host_for_owner_project_request(
1125        &self,
1126        project_id: ProjectId,
1127        _connection_id: ConnectionId,
1128        user_id: UserId,
1129    ) -> Result<ConnectionId> {
1130        self.project_transaction(project_id, |tx| async move {
1131            let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1132                .find_also_related(dev_server_project::Entity)
1133                .one(&*tx)
1134                .await?
1135                .ok_or_else(|| anyhow!("no such project"))?;
1136
1137            let Some(dev_server_project) = dev_server_project else {
1138                return Err(anyhow!("not a dev server project"))?;
1139            };
1140            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1141                .one(&*tx)
1142                .await?
1143                .ok_or_else(|| anyhow!("no such dev server"))?;
1144            if dev_server.user_id != user_id {
1145                return Err(anyhow!("not your project"))?;
1146            }
1147            project.host_connection()
1148        })
1149        .await
1150        .map(|guard| guard.into_inner())
1151    }
1152
1153    pub async fn connections_for_buffer_update(
1154        &self,
1155        project_id: ProjectId,
1156        principal_id: PrincipalId,
1157        connection_id: ConnectionId,
1158        capability: Capability,
1159    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1160        self.project_transaction(project_id, |tx| async move {
1161            // Authorize
1162            let (project, _) = self
1163                .access_project(project_id, connection_id, principal_id, capability, &tx)
1164                .await?;
1165
1166            let host_connection_id = project.host_connection()?;
1167
1168            let collaborators = project_collaborator::Entity::find()
1169                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1170                .all(&*tx)
1171                .await?;
1172
1173            let guest_connection_ids = collaborators
1174                .into_iter()
1175                .filter_map(|collaborator| {
1176                    if collaborator.is_host {
1177                        None
1178                    } else {
1179                        Some(collaborator.connection())
1180                    }
1181                })
1182                .collect();
1183
1184            Ok((host_connection_id, guest_connection_ids))
1185        })
1186        .await
1187    }
1188
1189    /// Returns the connection IDs in the given project.
1190    ///
1191    /// The provided `connection_id` must also be a collaborator in the project,
1192    /// otherwise an error will be returned.
1193    pub async fn project_connection_ids(
1194        &self,
1195        project_id: ProjectId,
1196        connection_id: ConnectionId,
1197        exclude_dev_server: bool,
1198    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1199        self.project_transaction(project_id, |tx| async move {
1200            let project = project::Entity::find_by_id(project_id)
1201                .one(&*tx)
1202                .await?
1203                .ok_or_else(|| anyhow!("no such project"))?;
1204
1205            let mut collaborators = project_collaborator::Entity::find()
1206                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1207                .stream(&*tx)
1208                .await?;
1209
1210            let mut connection_ids = HashSet::default();
1211            if let Some(host_connection) = project.host_connection().log_err() {
1212                if !exclude_dev_server {
1213                    connection_ids.insert(host_connection);
1214                }
1215            }
1216
1217            while let Some(collaborator) = collaborators.next().await {
1218                let collaborator = collaborator?;
1219                connection_ids.insert(collaborator.connection());
1220            }
1221
1222            if connection_ids.contains(&connection_id)
1223                || Some(connection_id) == project.host_connection().ok()
1224            {
1225                Ok(connection_ids)
1226            } else {
1227                Err(anyhow!(
1228                    "can only send project updates to a project you're in"
1229                ))?
1230            }
1231        })
1232        .await
1233    }
1234
1235    async fn project_guest_connection_ids(
1236        &self,
1237        project_id: ProjectId,
1238        tx: &DatabaseTransaction,
1239    ) -> Result<Vec<ConnectionId>> {
1240        let mut collaborators = project_collaborator::Entity::find()
1241            .filter(
1242                project_collaborator::Column::ProjectId
1243                    .eq(project_id)
1244                    .and(project_collaborator::Column::IsHost.eq(false)),
1245            )
1246            .stream(tx)
1247            .await?;
1248
1249        let mut guest_connection_ids = Vec::new();
1250        while let Some(collaborator) = collaborators.next().await {
1251            let collaborator = collaborator?;
1252            guest_connection_ids.push(collaborator.connection());
1253        }
1254        Ok(guest_connection_ids)
1255    }
1256
1257    /// Returns the [`RoomId`] for the given project.
1258    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1259        self.transaction(|tx| async move {
1260            Ok(project::Entity::find_by_id(project_id)
1261                .one(&*tx)
1262                .await?
1263                .and_then(|project| project.room_id))
1264        })
1265        .await
1266    }
1267
1268    pub async fn check_room_participants(
1269        &self,
1270        room_id: RoomId,
1271        leader_id: ConnectionId,
1272        follower_id: ConnectionId,
1273    ) -> Result<()> {
1274        self.transaction(|tx| async move {
1275            use room_participant::Column;
1276
1277            let count = room_participant::Entity::find()
1278                .filter(
1279                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1280                        Condition::any()
1281                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1282                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1283                            ))
1284                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1285                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1286                            )),
1287                    ),
1288                )
1289                .count(&*tx)
1290                .await?;
1291
1292            if count < 2 {
1293                Err(anyhow!("not room participants"))?;
1294            }
1295
1296            Ok(())
1297        })
1298        .await
1299    }
1300
1301    /// Adds the given follower connection as a follower of the given leader connection.
1302    pub async fn follow(
1303        &self,
1304        room_id: RoomId,
1305        project_id: ProjectId,
1306        leader_connection: ConnectionId,
1307        follower_connection: ConnectionId,
1308    ) -> Result<TransactionGuard<proto::Room>> {
1309        self.room_transaction(room_id, |tx| async move {
1310            follower::ActiveModel {
1311                room_id: ActiveValue::set(room_id),
1312                project_id: ActiveValue::set(project_id),
1313                leader_connection_server_id: ActiveValue::set(ServerId(
1314                    leader_connection.owner_id as i32,
1315                )),
1316                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1317                follower_connection_server_id: ActiveValue::set(ServerId(
1318                    follower_connection.owner_id as i32,
1319                )),
1320                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1321                ..Default::default()
1322            }
1323            .insert(&*tx)
1324            .await?;
1325
1326            let room = self.get_room(room_id, &tx).await?;
1327            Ok(room)
1328        })
1329        .await
1330    }
1331
1332    /// Removes the given follower connection as a follower of the given leader connection.
1333    pub async fn unfollow(
1334        &self,
1335        room_id: RoomId,
1336        project_id: ProjectId,
1337        leader_connection: ConnectionId,
1338        follower_connection: ConnectionId,
1339    ) -> Result<TransactionGuard<proto::Room>> {
1340        self.room_transaction(room_id, |tx| async move {
1341            follower::Entity::delete_many()
1342                .filter(
1343                    Condition::all()
1344                        .add(follower::Column::RoomId.eq(room_id))
1345                        .add(follower::Column::ProjectId.eq(project_id))
1346                        .add(
1347                            follower::Column::LeaderConnectionServerId
1348                                .eq(leader_connection.owner_id),
1349                        )
1350                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1351                        .add(
1352                            follower::Column::FollowerConnectionServerId
1353                                .eq(follower_connection.owner_id),
1354                        )
1355                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1356                )
1357                .exec(&*tx)
1358                .await?;
1359
1360            let room = self.get_room(room_id, &tx).await?;
1361            Ok(room)
1362        })
1363        .await
1364    }
1365}