projects.rs

   1use anyhow::Context as _;
   2use util::ResultExt;
   3
   4use super::*;
   5
   6impl Database {
   7    /// Returns the count of all projects, excluding ones marked as admin.
   8    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   9        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  10        enum QueryAs {
  11            Count,
  12        }
  13
  14        self.transaction(|tx| async move {
  15            Ok(project::Entity::find()
  16                .select_only()
  17                .column_as(project::Column::Id.count(), QueryAs::Count)
  18                .inner_join(user::Entity)
  19                .filter(user::Column::Admin.eq(false))
  20                .into_values::<_, QueryAs>()
  21                .one(&*tx)
  22                .await?
  23                .unwrap_or(0i64) as usize)
  24        })
  25        .await
  26    }
  27
  28    /// Shares a project with the given room.
  29    pub async fn share_project(
  30        &self,
  31        room_id: RoomId,
  32        connection: ConnectionId,
  33        worktrees: &[proto::WorktreeMetadata],
  34        is_ssh_project: bool,
  35        dev_server_project_id: Option<DevServerProjectId>,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            if let Some(dev_server_project_id) = dev_server_project_id {
  65                let project = project::Entity::find()
  66                    .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
  67                    .one(&*tx)
  68                    .await?
  69                    .ok_or_else(|| anyhow!("no remote project"))?;
  70
  71                let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
  72                    .find_also_related(dev_server::Entity)
  73                    .one(&*tx)
  74                    .await?
  75                    .ok_or_else(|| anyhow!("no dev_server_project"))?;
  76
  77                if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
  78                    return Err(anyhow!("not your dev server"))?;
  79                }
  80
  81                if project.room_id.is_some() {
  82                    return Err(anyhow!("project already shared"))?;
  83                };
  84
  85                let project = project::Entity::update(project::ActiveModel {
  86                    room_id: ActiveValue::Set(Some(room_id)),
  87                    ..project.into_active_model()
  88                })
  89                .exec(&*tx)
  90                .await?;
  91
  92                let room = self.get_room(room_id, &tx).await?;
  93                return Ok((project.id, room));
  94            }
  95
  96            let project = project::ActiveModel {
  97                room_id: ActiveValue::set(Some(participant.room_id)),
  98                host_user_id: ActiveValue::set(Some(participant.user_id)),
  99                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 100                host_connection_server_id: ActiveValue::set(Some(ServerId(
 101                    connection.owner_id as i32,
 102                ))),
 103                id: ActiveValue::NotSet,
 104                hosted_project_id: ActiveValue::Set(None),
 105                dev_server_project_id: ActiveValue::Set(None),
 106            }
 107            .insert(&*tx)
 108            .await?;
 109
 110            if !worktrees.is_empty() {
 111                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
 112                    worktree::ActiveModel {
 113                        id: ActiveValue::set(worktree.id as i64),
 114                        project_id: ActiveValue::set(project.id),
 115                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
 116                        root_name: ActiveValue::set(worktree.root_name.clone()),
 117                        visible: ActiveValue::set(worktree.visible),
 118                        scan_id: ActiveValue::set(0),
 119                        completed_scan_id: ActiveValue::set(0),
 120                    }
 121                }))
 122                .exec(&*tx)
 123                .await?;
 124            }
 125
 126            let replica_id = if is_ssh_project { 1 } else { 0 };
 127
 128            project_collaborator::ActiveModel {
 129                project_id: ActiveValue::set(project.id),
 130                connection_id: ActiveValue::set(connection.id as i32),
 131                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 132                user_id: ActiveValue::set(participant.user_id),
 133                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 134                is_host: ActiveValue::set(true),
 135                ..Default::default()
 136            }
 137            .insert(&*tx)
 138            .await?;
 139
 140            let room = self.get_room(room_id, &tx).await?;
 141            Ok((project.id, room))
 142        })
 143        .await
 144    }
 145
 146    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 147        self.weak_transaction(|tx| async move {
 148            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 149            Ok(())
 150        })
 151        .await
 152    }
 153
 154    /// Unshares the given project.
 155    pub async fn unshare_project(
 156        &self,
 157        project_id: ProjectId,
 158        connection: ConnectionId,
 159        user_id: Option<UserId>,
 160    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 161        self.project_transaction(project_id, |tx| async move {
 162            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 163            let project = project::Entity::find_by_id(project_id)
 164                .one(&*tx)
 165                .await?
 166                .ok_or_else(|| anyhow!("project not found"))?;
 167            let room = if let Some(room_id) = project.room_id {
 168                Some(self.get_room(room_id, &tx).await?)
 169            } else {
 170                None
 171            };
 172            if project.host_connection()? == connection {
 173                return Ok((true, room, guest_connection_ids));
 174            }
 175            if let Some(dev_server_project_id) = project.dev_server_project_id {
 176                if let Some(user_id) = user_id {
 177                    if user_id
 178                        != self
 179                            .owner_for_dev_server_project(dev_server_project_id, &tx)
 180                            .await?
 181                    {
 182                        Err(anyhow!("cannot unshare a project hosted by another user"))?
 183                    }
 184                    project::Entity::update(project::ActiveModel {
 185                        room_id: ActiveValue::Set(None),
 186                        ..project.into_active_model()
 187                    })
 188                    .exec(&*tx)
 189                    .await?;
 190                    return Ok((false, room, guest_connection_ids));
 191                }
 192            }
 193
 194            Err(anyhow!("cannot unshare a project hosted by another user"))?
 195        })
 196        .await
 197    }
 198
 199    /// Updates the worktrees associated with the given project.
 200    pub async fn update_project(
 201        &self,
 202        project_id: ProjectId,
 203        connection: ConnectionId,
 204        worktrees: &[proto::WorktreeMetadata],
 205    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 206        self.project_transaction(project_id, |tx| async move {
 207            let project = project::Entity::find_by_id(project_id)
 208                .filter(
 209                    Condition::all()
 210                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 211                        .add(
 212                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 213                        ),
 214                )
 215                .one(&*tx)
 216                .await?
 217                .ok_or_else(|| anyhow!("no such project"))?;
 218
 219            self.update_project_worktrees(project.id, worktrees, &tx)
 220                .await?;
 221
 222            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 223
 224            let room = if let Some(room_id) = project.room_id {
 225                Some(self.get_room(room_id, &tx).await?)
 226            } else {
 227                None
 228            };
 229
 230            Ok((room, guest_connection_ids))
 231        })
 232        .await
 233    }
 234
 235    pub(in crate::db) async fn update_project_worktrees(
 236        &self,
 237        project_id: ProjectId,
 238        worktrees: &[proto::WorktreeMetadata],
 239        tx: &DatabaseTransaction,
 240    ) -> Result<()> {
 241        if !worktrees.is_empty() {
 242            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 243                id: ActiveValue::set(worktree.id as i64),
 244                project_id: ActiveValue::set(project_id),
 245                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 246                root_name: ActiveValue::set(worktree.root_name.clone()),
 247                visible: ActiveValue::set(worktree.visible),
 248                scan_id: ActiveValue::set(0),
 249                completed_scan_id: ActiveValue::set(0),
 250            }))
 251            .on_conflict(
 252                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 253                    .update_column(worktree::Column::RootName)
 254                    .to_owned(),
 255            )
 256            .exec(tx)
 257            .await?;
 258        }
 259
 260        worktree::Entity::delete_many()
 261            .filter(worktree::Column::ProjectId.eq(project_id).and(
 262                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 263            ))
 264            .exec(tx)
 265            .await?;
 266
 267        Ok(())
 268    }
 269
 270    pub async fn update_worktree(
 271        &self,
 272        update: &proto::UpdateWorktree,
 273        connection: ConnectionId,
 274    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 275        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 276            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 277        {
 278            return Err(anyhow!(
 279                "invalid worktree update. removed entries: {}, updated entries: {}",
 280                update.removed_entries.len(),
 281                update.updated_entries.len()
 282            ))?;
 283        }
 284
 285        let project_id = ProjectId::from_proto(update.project_id);
 286        let worktree_id = update.worktree_id as i64;
 287        self.project_transaction(project_id, |tx| async move {
 288            // Ensure the update comes from the host.
 289            let _project = project::Entity::find_by_id(project_id)
 290                .filter(
 291                    Condition::all()
 292                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 293                        .add(
 294                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 295                        ),
 296                )
 297                .one(&*tx)
 298                .await?
 299                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 300
 301            // Update metadata.
 302            worktree::Entity::update(worktree::ActiveModel {
 303                id: ActiveValue::set(worktree_id),
 304                project_id: ActiveValue::set(project_id),
 305                root_name: ActiveValue::set(update.root_name.clone()),
 306                scan_id: ActiveValue::set(update.scan_id as i64),
 307                completed_scan_id: if update.is_last_update {
 308                    ActiveValue::set(update.scan_id as i64)
 309                } else {
 310                    ActiveValue::default()
 311                },
 312                abs_path: ActiveValue::set(update.abs_path.clone()),
 313                ..Default::default()
 314            })
 315            .exec(&*tx)
 316            .await?;
 317
 318            if !update.updated_entries.is_empty() {
 319                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 320                    let mtime = entry.mtime.clone().unwrap_or_default();
 321                    worktree_entry::ActiveModel {
 322                        project_id: ActiveValue::set(project_id),
 323                        worktree_id: ActiveValue::set(worktree_id),
 324                        id: ActiveValue::set(entry.id as i64),
 325                        is_dir: ActiveValue::set(entry.is_dir),
 326                        path: ActiveValue::set(entry.path.clone()),
 327                        inode: ActiveValue::set(entry.inode as i64),
 328                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 329                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 330                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 331                        is_ignored: ActiveValue::set(entry.is_ignored),
 332                        is_external: ActiveValue::set(entry.is_external),
 333                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 334                        is_deleted: ActiveValue::set(false),
 335                        scan_id: ActiveValue::set(update.scan_id as i64),
 336                        is_fifo: ActiveValue::set(entry.is_fifo),
 337                    }
 338                }))
 339                .on_conflict(
 340                    OnConflict::columns([
 341                        worktree_entry::Column::ProjectId,
 342                        worktree_entry::Column::WorktreeId,
 343                        worktree_entry::Column::Id,
 344                    ])
 345                    .update_columns([
 346                        worktree_entry::Column::IsDir,
 347                        worktree_entry::Column::Path,
 348                        worktree_entry::Column::Inode,
 349                        worktree_entry::Column::MtimeSeconds,
 350                        worktree_entry::Column::MtimeNanos,
 351                        worktree_entry::Column::CanonicalPath,
 352                        worktree_entry::Column::IsIgnored,
 353                        worktree_entry::Column::GitStatus,
 354                        worktree_entry::Column::ScanId,
 355                    ])
 356                    .to_owned(),
 357                )
 358                .exec(&*tx)
 359                .await?;
 360            }
 361
 362            if !update.removed_entries.is_empty() {
 363                worktree_entry::Entity::update_many()
 364                    .filter(
 365                        worktree_entry::Column::ProjectId
 366                            .eq(project_id)
 367                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 368                            .and(
 369                                worktree_entry::Column::Id
 370                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 371                            ),
 372                    )
 373                    .set(worktree_entry::ActiveModel {
 374                        is_deleted: ActiveValue::Set(true),
 375                        scan_id: ActiveValue::Set(update.scan_id as i64),
 376                        ..Default::default()
 377                    })
 378                    .exec(&*tx)
 379                    .await?;
 380            }
 381
 382            if !update.updated_repositories.is_empty() {
 383                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 384                    |repository| worktree_repository::ActiveModel {
 385                        project_id: ActiveValue::set(project_id),
 386                        worktree_id: ActiveValue::set(worktree_id),
 387                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 388                        scan_id: ActiveValue::set(update.scan_id as i64),
 389                        branch: ActiveValue::set(repository.branch.clone()),
 390                        is_deleted: ActiveValue::set(false),
 391                    },
 392                ))
 393                .on_conflict(
 394                    OnConflict::columns([
 395                        worktree_repository::Column::ProjectId,
 396                        worktree_repository::Column::WorktreeId,
 397                        worktree_repository::Column::WorkDirectoryId,
 398                    ])
 399                    .update_columns([
 400                        worktree_repository::Column::ScanId,
 401                        worktree_repository::Column::Branch,
 402                    ])
 403                    .to_owned(),
 404                )
 405                .exec(&*tx)
 406                .await?;
 407            }
 408
 409            if !update.removed_repositories.is_empty() {
 410                worktree_repository::Entity::update_many()
 411                    .filter(
 412                        worktree_repository::Column::ProjectId
 413                            .eq(project_id)
 414                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 415                            .and(
 416                                worktree_repository::Column::WorkDirectoryId
 417                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 418                            ),
 419                    )
 420                    .set(worktree_repository::ActiveModel {
 421                        is_deleted: ActiveValue::Set(true),
 422                        scan_id: ActiveValue::Set(update.scan_id as i64),
 423                        ..Default::default()
 424                    })
 425                    .exec(&*tx)
 426                    .await?;
 427            }
 428
 429            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 430            Ok(connection_ids)
 431        })
 432        .await
 433    }
 434
 435    /// Updates the diagnostic summary for the given connection.
 436    pub async fn update_diagnostic_summary(
 437        &self,
 438        update: &proto::UpdateDiagnosticSummary,
 439        connection: ConnectionId,
 440    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 441        let project_id = ProjectId::from_proto(update.project_id);
 442        let worktree_id = update.worktree_id as i64;
 443        self.project_transaction(project_id, |tx| async move {
 444            let summary = update
 445                .summary
 446                .as_ref()
 447                .ok_or_else(|| anyhow!("invalid summary"))?;
 448
 449            // Ensure the update comes from the host.
 450            let project = project::Entity::find_by_id(project_id)
 451                .one(&*tx)
 452                .await?
 453                .ok_or_else(|| anyhow!("no such project"))?;
 454            if project.host_connection()? != connection {
 455                return Err(anyhow!("can't update a project hosted by someone else"))?;
 456            }
 457
 458            // Update summary.
 459            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 460                project_id: ActiveValue::set(project_id),
 461                worktree_id: ActiveValue::set(worktree_id),
 462                path: ActiveValue::set(summary.path.clone()),
 463                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 464                error_count: ActiveValue::set(summary.error_count as i32),
 465                warning_count: ActiveValue::set(summary.warning_count as i32),
 466            })
 467            .on_conflict(
 468                OnConflict::columns([
 469                    worktree_diagnostic_summary::Column::ProjectId,
 470                    worktree_diagnostic_summary::Column::WorktreeId,
 471                    worktree_diagnostic_summary::Column::Path,
 472                ])
 473                .update_columns([
 474                    worktree_diagnostic_summary::Column::LanguageServerId,
 475                    worktree_diagnostic_summary::Column::ErrorCount,
 476                    worktree_diagnostic_summary::Column::WarningCount,
 477                ])
 478                .to_owned(),
 479            )
 480            .exec(&*tx)
 481            .await?;
 482
 483            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 484            Ok(connection_ids)
 485        })
 486        .await
 487    }
 488
 489    /// Starts the language server for the given connection.
 490    pub async fn start_language_server(
 491        &self,
 492        update: &proto::StartLanguageServer,
 493        connection: ConnectionId,
 494    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 495        let project_id = ProjectId::from_proto(update.project_id);
 496        self.project_transaction(project_id, |tx| async move {
 497            let server = update
 498                .server
 499                .as_ref()
 500                .ok_or_else(|| anyhow!("invalid language server"))?;
 501
 502            // Ensure the update comes from the host.
 503            let project = project::Entity::find_by_id(project_id)
 504                .one(&*tx)
 505                .await?
 506                .ok_or_else(|| anyhow!("no such project"))?;
 507            if project.host_connection()? != connection {
 508                return Err(anyhow!("can't update a project hosted by someone else"))?;
 509            }
 510
 511            // Add the newly-started language server.
 512            language_server::Entity::insert(language_server::ActiveModel {
 513                project_id: ActiveValue::set(project_id),
 514                id: ActiveValue::set(server.id as i64),
 515                name: ActiveValue::set(server.name.clone()),
 516            })
 517            .on_conflict(
 518                OnConflict::columns([
 519                    language_server::Column::ProjectId,
 520                    language_server::Column::Id,
 521                ])
 522                .update_column(language_server::Column::Name)
 523                .to_owned(),
 524            )
 525            .exec(&*tx)
 526            .await?;
 527
 528            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 529            Ok(connection_ids)
 530        })
 531        .await
 532    }
 533
 534    /// Updates the worktree settings for the given connection.
 535    pub async fn update_worktree_settings(
 536        &self,
 537        update: &proto::UpdateWorktreeSettings,
 538        connection: ConnectionId,
 539    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 540        let project_id = ProjectId::from_proto(update.project_id);
 541        let kind = match update.kind {
 542            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 543                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 544            None => proto::LocalSettingsKind::Settings,
 545        };
 546        let kind = LocalSettingsKind::from_proto(kind);
 547        self.project_transaction(project_id, |tx| async move {
 548            // Ensure the update comes from the host.
 549            let project = project::Entity::find_by_id(project_id)
 550                .one(&*tx)
 551                .await?
 552                .ok_or_else(|| anyhow!("no such project"))?;
 553            if project.host_connection()? != connection {
 554                return Err(anyhow!("can't update a project hosted by someone else"))?;
 555            }
 556
 557            if let Some(content) = &update.content {
 558                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 559                    project_id: ActiveValue::Set(project_id),
 560                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 561                    path: ActiveValue::Set(update.path.clone()),
 562                    content: ActiveValue::Set(content.clone()),
 563                    kind: ActiveValue::Set(kind),
 564                })
 565                .on_conflict(
 566                    OnConflict::columns([
 567                        worktree_settings_file::Column::ProjectId,
 568                        worktree_settings_file::Column::WorktreeId,
 569                        worktree_settings_file::Column::Path,
 570                    ])
 571                    .update_column(worktree_settings_file::Column::Content)
 572                    .to_owned(),
 573                )
 574                .exec(&*tx)
 575                .await?;
 576            } else {
 577                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 578                    project_id: ActiveValue::Set(project_id),
 579                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 580                    path: ActiveValue::Set(update.path.clone()),
 581                    ..Default::default()
 582                })
 583                .exec(&*tx)
 584                .await?;
 585            }
 586
 587            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 588            Ok(connection_ids)
 589        })
 590        .await
 591    }
 592
 593    /// Adds the given connection to the specified hosted project
 594    pub async fn join_hosted_project(
 595        &self,
 596        id: ProjectId,
 597        user_id: UserId,
 598        connection: ConnectionId,
 599    ) -> Result<(Project, ReplicaId)> {
 600        self.transaction(|tx| async move {
 601            let (project, hosted_project) = project::Entity::find_by_id(id)
 602                .find_also_related(hosted_project::Entity)
 603                .one(&*tx)
 604                .await?
 605                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 606
 607            let Some(hosted_project) = hosted_project else {
 608                return Err(anyhow!("project is not hosted"))?;
 609            };
 610
 611            let channel = channel::Entity::find_by_id(hosted_project.channel_id)
 612                .one(&*tx)
 613                .await?
 614                .ok_or_else(|| anyhow!("no such channel"))?;
 615
 616            let role = self
 617                .check_user_is_channel_participant(&channel, user_id, &tx)
 618                .await?;
 619
 620            self.join_project_internal(project, user_id, connection, role, &tx)
 621                .await
 622        })
 623        .await
 624    }
 625
 626    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 627        self.transaction(|tx| async move {
 628            Ok(project::Entity::find_by_id(id)
 629                .one(&*tx)
 630                .await?
 631                .ok_or_else(|| anyhow!("no such project"))?)
 632        })
 633        .await
 634    }
 635
 636    pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
 637        self.transaction(|tx| async move {
 638            Ok(project::Entity::find()
 639                .filter(project::Column::DevServerProjectId.eq(id))
 640                .one(&*tx)
 641                .await?
 642                .ok_or_else(|| anyhow!("no such project"))?)
 643        })
 644        .await
 645    }
 646
 647    /// Adds the given connection to the specified project
 648    /// in the current room.
 649    pub async fn join_project(
 650        &self,
 651        project_id: ProjectId,
 652        connection: ConnectionId,
 653        user_id: UserId,
 654    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 655        self.project_transaction(project_id, |tx| async move {
 656            let (project, role) = self
 657                .access_project(
 658                    project_id,
 659                    connection,
 660                    PrincipalId::UserId(user_id),
 661                    Capability::ReadOnly,
 662                    &tx,
 663                )
 664                .await?;
 665            self.join_project_internal(project, user_id, connection, role, &tx)
 666                .await
 667        })
 668        .await
 669    }
 670
 671    async fn join_project_internal(
 672        &self,
 673        project: project::Model,
 674        user_id: UserId,
 675        connection: ConnectionId,
 676        role: ChannelRole,
 677        tx: &DatabaseTransaction,
 678    ) -> Result<(Project, ReplicaId)> {
 679        let mut collaborators = project
 680            .find_related(project_collaborator::Entity)
 681            .all(tx)
 682            .await?;
 683        let replica_ids = collaborators
 684            .iter()
 685            .map(|c| c.replica_id)
 686            .collect::<HashSet<_>>();
 687        let mut replica_id = ReplicaId(1);
 688        while replica_ids.contains(&replica_id) {
 689            replica_id.0 += 1;
 690        }
 691        let new_collaborator = project_collaborator::ActiveModel {
 692            project_id: ActiveValue::set(project.id),
 693            connection_id: ActiveValue::set(connection.id as i32),
 694            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 695            user_id: ActiveValue::set(user_id),
 696            replica_id: ActiveValue::set(replica_id),
 697            is_host: ActiveValue::set(false),
 698            ..Default::default()
 699        }
 700        .insert(tx)
 701        .await?;
 702        collaborators.push(new_collaborator);
 703
 704        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 705        let mut worktrees = db_worktrees
 706            .into_iter()
 707            .map(|db_worktree| {
 708                (
 709                    db_worktree.id as u64,
 710                    Worktree {
 711                        id: db_worktree.id as u64,
 712                        abs_path: db_worktree.abs_path,
 713                        root_name: db_worktree.root_name,
 714                        visible: db_worktree.visible,
 715                        entries: Default::default(),
 716                        repository_entries: Default::default(),
 717                        diagnostic_summaries: Default::default(),
 718                        settings_files: Default::default(),
 719                        scan_id: db_worktree.scan_id as u64,
 720                        completed_scan_id: db_worktree.completed_scan_id as u64,
 721                    },
 722                )
 723            })
 724            .collect::<BTreeMap<_, _>>();
 725
 726        // Populate worktree entries.
 727        {
 728            let mut db_entries = worktree_entry::Entity::find()
 729                .filter(
 730                    Condition::all()
 731                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 732                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 733                )
 734                .stream(tx)
 735                .await?;
 736            while let Some(db_entry) = db_entries.next().await {
 737                let db_entry = db_entry?;
 738                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 739                    worktree.entries.push(proto::Entry {
 740                        id: db_entry.id as u64,
 741                        is_dir: db_entry.is_dir,
 742                        path: db_entry.path,
 743                        inode: db_entry.inode as u64,
 744                        mtime: Some(proto::Timestamp {
 745                            seconds: db_entry.mtime_seconds as u64,
 746                            nanos: db_entry.mtime_nanos as u32,
 747                        }),
 748                        canonical_path: db_entry.canonical_path,
 749                        is_ignored: db_entry.is_ignored,
 750                        is_external: db_entry.is_external,
 751                        git_status: db_entry.git_status.map(|status| status as i32),
 752                        // This is only used in the summarization backlog, so if it's None,
 753                        // that just means we won't be able to detect when to resummarize
 754                        // based on total number of backlogged bytes - instead, we'd go
 755                        // on number of files only. That shouldn't be a huge deal in practice.
 756                        size: None,
 757                        is_fifo: db_entry.is_fifo,
 758                    });
 759                }
 760            }
 761        }
 762
 763        // Populate repository entries.
 764        {
 765            let mut db_repository_entries = worktree_repository::Entity::find()
 766                .filter(
 767                    Condition::all()
 768                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 769                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 770                )
 771                .stream(tx)
 772                .await?;
 773            while let Some(db_repository_entry) = db_repository_entries.next().await {
 774                let db_repository_entry = db_repository_entry?;
 775                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 776                {
 777                    worktree.repository_entries.insert(
 778                        db_repository_entry.work_directory_id as u64,
 779                        proto::RepositoryEntry {
 780                            work_directory_id: db_repository_entry.work_directory_id as u64,
 781                            branch: db_repository_entry.branch,
 782                        },
 783                    );
 784                }
 785            }
 786        }
 787
 788        // Populate worktree diagnostic summaries.
 789        {
 790            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 791                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 792                .stream(tx)
 793                .await?;
 794            while let Some(db_summary) = db_summaries.next().await {
 795                let db_summary = db_summary?;
 796                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 797                    worktree
 798                        .diagnostic_summaries
 799                        .push(proto::DiagnosticSummary {
 800                            path: db_summary.path,
 801                            language_server_id: db_summary.language_server_id as u64,
 802                            error_count: db_summary.error_count as u32,
 803                            warning_count: db_summary.warning_count as u32,
 804                        });
 805                }
 806            }
 807        }
 808
 809        // Populate worktree settings files
 810        {
 811            let mut db_settings_files = worktree_settings_file::Entity::find()
 812                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 813                .stream(tx)
 814                .await?;
 815            while let Some(db_settings_file) = db_settings_files.next().await {
 816                let db_settings_file = db_settings_file?;
 817                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 818                    worktree.settings_files.push(WorktreeSettingsFile {
 819                        path: db_settings_file.path,
 820                        content: db_settings_file.content,
 821                        kind: db_settings_file.kind,
 822                    });
 823                }
 824            }
 825        }
 826
 827        // Populate language servers.
 828        let language_servers = project
 829            .find_related(language_server::Entity)
 830            .all(tx)
 831            .await?;
 832
 833        let project = Project {
 834            id: project.id,
 835            role,
 836            collaborators: collaborators
 837                .into_iter()
 838                .map(|collaborator| ProjectCollaborator {
 839                    connection_id: collaborator.connection(),
 840                    user_id: collaborator.user_id,
 841                    replica_id: collaborator.replica_id,
 842                    is_host: collaborator.is_host,
 843                })
 844                .collect(),
 845            worktrees,
 846            language_servers: language_servers
 847                .into_iter()
 848                .map(|language_server| proto::LanguageServer {
 849                    id: language_server.id as u64,
 850                    name: language_server.name,
 851                    worktree_id: None,
 852                })
 853                .collect(),
 854            dev_server_project_id: project.dev_server_project_id,
 855        };
 856        Ok((project, replica_id as ReplicaId))
 857    }
 858
 859    pub async fn leave_hosted_project(
 860        &self,
 861        project_id: ProjectId,
 862        connection: ConnectionId,
 863    ) -> Result<LeftProject> {
 864        self.transaction(|tx| async move {
 865            let result = project_collaborator::Entity::delete_many()
 866                .filter(
 867                    Condition::all()
 868                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 869                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 870                        .add(
 871                            project_collaborator::Column::ConnectionServerId
 872                                .eq(connection.owner_id as i32),
 873                        ),
 874                )
 875                .exec(&*tx)
 876                .await?;
 877            if result.rows_affected == 0 {
 878                return Err(anyhow!("not in the project"))?;
 879            }
 880
 881            let project = project::Entity::find_by_id(project_id)
 882                .one(&*tx)
 883                .await?
 884                .ok_or_else(|| anyhow!("no such project"))?;
 885            let collaborators = project
 886                .find_related(project_collaborator::Entity)
 887                .all(&*tx)
 888                .await?;
 889            let connection_ids = collaborators
 890                .into_iter()
 891                .map(|collaborator| collaborator.connection())
 892                .collect();
 893            Ok(LeftProject {
 894                id: project.id,
 895                connection_ids,
 896                should_unshare: false,
 897            })
 898        })
 899        .await
 900    }
 901
 902    /// Removes the given connection from the specified project.
 903    pub async fn leave_project(
 904        &self,
 905        project_id: ProjectId,
 906        connection: ConnectionId,
 907    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 908        self.project_transaction(project_id, |tx| async move {
 909            let result = project_collaborator::Entity::delete_many()
 910                .filter(
 911                    Condition::all()
 912                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 913                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 914                        .add(
 915                            project_collaborator::Column::ConnectionServerId
 916                                .eq(connection.owner_id as i32),
 917                        ),
 918                )
 919                .exec(&*tx)
 920                .await?;
 921            if result.rows_affected == 0 {
 922                Err(anyhow!("not a collaborator on this project"))?;
 923            }
 924
 925            let project = project::Entity::find_by_id(project_id)
 926                .one(&*tx)
 927                .await?
 928                .ok_or_else(|| anyhow!("no such project"))?;
 929            let collaborators = project
 930                .find_related(project_collaborator::Entity)
 931                .all(&*tx)
 932                .await?;
 933            let connection_ids: Vec<ConnectionId> = collaborators
 934                .into_iter()
 935                .map(|collaborator| collaborator.connection())
 936                .collect();
 937
 938            follower::Entity::delete_many()
 939                .filter(
 940                    Condition::any()
 941                        .add(
 942                            Condition::all()
 943                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 944                                .add(
 945                                    follower::Column::LeaderConnectionServerId
 946                                        .eq(connection.owner_id),
 947                                )
 948                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 949                        )
 950                        .add(
 951                            Condition::all()
 952                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 953                                .add(
 954                                    follower::Column::FollowerConnectionServerId
 955                                        .eq(connection.owner_id),
 956                                )
 957                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 958                        ),
 959                )
 960                .exec(&*tx)
 961                .await?;
 962
 963            let room = if let Some(room_id) = project.room_id {
 964                Some(self.get_room(room_id, &tx).await?)
 965            } else {
 966                None
 967            };
 968
 969            let left_project = LeftProject {
 970                id: project_id,
 971                should_unshare: connection == project.host_connection()?,
 972                connection_ids,
 973            };
 974            Ok((room, left_project))
 975        })
 976        .await
 977    }
 978
 979    pub async fn check_user_is_project_host(
 980        &self,
 981        project_id: ProjectId,
 982        connection_id: ConnectionId,
 983    ) -> Result<()> {
 984        self.project_transaction(project_id, |tx| async move {
 985            project::Entity::find()
 986                .filter(
 987                    Condition::all()
 988                        .add(project::Column::Id.eq(project_id))
 989                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 990                        .add(
 991                            project::Column::HostConnectionServerId
 992                                .eq(Some(connection_id.owner_id as i32)),
 993                        ),
 994                )
 995                .one(&*tx)
 996                .await?
 997                .ok_or_else(|| anyhow!("failed to read project host"))?;
 998
 999            Ok(())
1000        })
1001        .await
1002        .map(|guard| guard.into_inner())
1003    }
1004
1005    /// Returns the current project if the given user is authorized to access it with the specified capability.
1006    pub async fn access_project(
1007        &self,
1008        project_id: ProjectId,
1009        connection_id: ConnectionId,
1010        principal_id: PrincipalId,
1011        capability: Capability,
1012        tx: &DatabaseTransaction,
1013    ) -> Result<(project::Model, ChannelRole)> {
1014        let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
1015            .find_also_related(dev_server_project::Entity)
1016            .one(tx)
1017            .await?
1018            .ok_or_else(|| anyhow!("no such project"))?;
1019
1020        let user_id = match principal_id {
1021            PrincipalId::DevServerId(_) => {
1022                if project
1023                    .host_connection()
1024                    .is_ok_and(|connection| connection == connection_id)
1025                {
1026                    return Ok((project, ChannelRole::Admin));
1027                }
1028                return Err(anyhow!("not the project host"))?;
1029            }
1030            PrincipalId::UserId(user_id) => user_id,
1031        };
1032
1033        let role_from_room = if let Some(room_id) = project.room_id {
1034            room_participant::Entity::find()
1035                .filter(room_participant::Column::RoomId.eq(room_id))
1036                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1037                .one(tx)
1038                .await?
1039                .and_then(|participant| participant.role)
1040        } else {
1041            None
1042        };
1043        let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1044            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1045                .one(tx)
1046                .await?
1047                .ok_or_else(|| anyhow!("no such channel"))?;
1048            if user_id == dev_server.user_id {
1049                // If the user left the room "uncleanly" they may rejoin the
1050                // remote project before leave_room runs. IN that case kick
1051                // the project out of the room pre-emptively.
1052                if role_from_room.is_none() {
1053                    project = project::Entity::update(project::ActiveModel {
1054                        room_id: ActiveValue::Set(None),
1055                        ..project.into_active_model()
1056                    })
1057                    .exec(tx)
1058                    .await?;
1059                }
1060                Some(ChannelRole::Admin)
1061            } else {
1062                None
1063            }
1064        } else {
1065            None
1066        };
1067
1068        let role = role_from_dev_server
1069            .or(role_from_room)
1070            .unwrap_or(ChannelRole::Banned);
1071
1072        match capability {
1073            Capability::ReadWrite => {
1074                if !role.can_edit_projects() {
1075                    return Err(anyhow!("not authorized to edit projects"))?;
1076                }
1077            }
1078            Capability::ReadOnly => {
1079                if !role.can_read_projects() {
1080                    return Err(anyhow!("not authorized to read projects"))?;
1081                }
1082            }
1083        }
1084
1085        Ok((project, role))
1086    }
1087
1088    /// Returns the host connection for a read-only request to join a shared project.
1089    pub async fn host_for_read_only_project_request(
1090        &self,
1091        project_id: ProjectId,
1092        connection_id: ConnectionId,
1093        user_id: UserId,
1094    ) -> Result<ConnectionId> {
1095        self.project_transaction(project_id, |tx| async move {
1096            let (project, _) = self
1097                .access_project(
1098                    project_id,
1099                    connection_id,
1100                    PrincipalId::UserId(user_id),
1101                    Capability::ReadOnly,
1102                    &tx,
1103                )
1104                .await?;
1105            project.host_connection()
1106        })
1107        .await
1108        .map(|guard| guard.into_inner())
1109    }
1110
1111    /// Returns the host connection for a request to join a shared project.
1112    pub async fn host_for_mutating_project_request(
1113        &self,
1114        project_id: ProjectId,
1115        connection_id: ConnectionId,
1116        user_id: UserId,
1117    ) -> Result<ConnectionId> {
1118        self.project_transaction(project_id, |tx| async move {
1119            let (project, _) = self
1120                .access_project(
1121                    project_id,
1122                    connection_id,
1123                    PrincipalId::UserId(user_id),
1124                    Capability::ReadWrite,
1125                    &tx,
1126                )
1127                .await?;
1128            project.host_connection()
1129        })
1130        .await
1131        .map(|guard| guard.into_inner())
1132    }
1133
1134    /// Returns the host connection for a request to join a shared project.
1135    pub async fn host_for_owner_project_request(
1136        &self,
1137        project_id: ProjectId,
1138        _connection_id: ConnectionId,
1139        user_id: UserId,
1140    ) -> Result<ConnectionId> {
1141        self.project_transaction(project_id, |tx| async move {
1142            let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1143                .find_also_related(dev_server_project::Entity)
1144                .one(&*tx)
1145                .await?
1146                .ok_or_else(|| anyhow!("no such project"))?;
1147
1148            let Some(dev_server_project) = dev_server_project else {
1149                return Err(anyhow!("not a dev server project"))?;
1150            };
1151            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1152                .one(&*tx)
1153                .await?
1154                .ok_or_else(|| anyhow!("no such dev server"))?;
1155            if dev_server.user_id != user_id {
1156                return Err(anyhow!("not your project"))?;
1157            }
1158            project.host_connection()
1159        })
1160        .await
1161        .map(|guard| guard.into_inner())
1162    }
1163
1164    pub async fn connections_for_buffer_update(
1165        &self,
1166        project_id: ProjectId,
1167        principal_id: PrincipalId,
1168        connection_id: ConnectionId,
1169        capability: Capability,
1170    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1171        self.project_transaction(project_id, |tx| async move {
1172            // Authorize
1173            let (project, _) = self
1174                .access_project(project_id, connection_id, principal_id, capability, &tx)
1175                .await?;
1176
1177            let host_connection_id = project.host_connection()?;
1178
1179            let collaborators = project_collaborator::Entity::find()
1180                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1181                .all(&*tx)
1182                .await?;
1183
1184            let guest_connection_ids = collaborators
1185                .into_iter()
1186                .filter_map(|collaborator| {
1187                    if collaborator.is_host {
1188                        None
1189                    } else {
1190                        Some(collaborator.connection())
1191                    }
1192                })
1193                .collect();
1194
1195            Ok((host_connection_id, guest_connection_ids))
1196        })
1197        .await
1198    }
1199
1200    /// Returns the connection IDs in the given project.
1201    ///
1202    /// The provided `connection_id` must also be a collaborator in the project,
1203    /// otherwise an error will be returned.
1204    pub async fn project_connection_ids(
1205        &self,
1206        project_id: ProjectId,
1207        connection_id: ConnectionId,
1208        exclude_dev_server: bool,
1209    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1210        self.project_transaction(project_id, |tx| async move {
1211            let project = project::Entity::find_by_id(project_id)
1212                .one(&*tx)
1213                .await?
1214                .ok_or_else(|| anyhow!("no such project"))?;
1215
1216            let mut collaborators = project_collaborator::Entity::find()
1217                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1218                .stream(&*tx)
1219                .await?;
1220
1221            let mut connection_ids = HashSet::default();
1222            if let Some(host_connection) = project.host_connection().log_err() {
1223                if !exclude_dev_server {
1224                    connection_ids.insert(host_connection);
1225                }
1226            }
1227
1228            while let Some(collaborator) = collaborators.next().await {
1229                let collaborator = collaborator?;
1230                connection_ids.insert(collaborator.connection());
1231            }
1232
1233            if connection_ids.contains(&connection_id)
1234                || Some(connection_id) == project.host_connection().ok()
1235            {
1236                Ok(connection_ids)
1237            } else {
1238                Err(anyhow!(
1239                    "can only send project updates to a project you're in"
1240                ))?
1241            }
1242        })
1243        .await
1244    }
1245
1246    async fn project_guest_connection_ids(
1247        &self,
1248        project_id: ProjectId,
1249        tx: &DatabaseTransaction,
1250    ) -> Result<Vec<ConnectionId>> {
1251        let mut collaborators = project_collaborator::Entity::find()
1252            .filter(
1253                project_collaborator::Column::ProjectId
1254                    .eq(project_id)
1255                    .and(project_collaborator::Column::IsHost.eq(false)),
1256            )
1257            .stream(tx)
1258            .await?;
1259
1260        let mut guest_connection_ids = Vec::new();
1261        while let Some(collaborator) = collaborators.next().await {
1262            let collaborator = collaborator?;
1263            guest_connection_ids.push(collaborator.connection());
1264        }
1265        Ok(guest_connection_ids)
1266    }
1267
1268    /// Returns the [`RoomId`] for the given project.
1269    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1270        self.transaction(|tx| async move {
1271            Ok(project::Entity::find_by_id(project_id)
1272                .one(&*tx)
1273                .await?
1274                .and_then(|project| project.room_id))
1275        })
1276        .await
1277    }
1278
1279    pub async fn check_room_participants(
1280        &self,
1281        room_id: RoomId,
1282        leader_id: ConnectionId,
1283        follower_id: ConnectionId,
1284    ) -> Result<()> {
1285        self.transaction(|tx| async move {
1286            use room_participant::Column;
1287
1288            let count = room_participant::Entity::find()
1289                .filter(
1290                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1291                        Condition::any()
1292                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1293                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1294                            ))
1295                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1296                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1297                            )),
1298                    ),
1299                )
1300                .count(&*tx)
1301                .await?;
1302
1303            if count < 2 {
1304                Err(anyhow!("not room participants"))?;
1305            }
1306
1307            Ok(())
1308        })
1309        .await
1310    }
1311
1312    /// Adds the given follower connection as a follower of the given leader connection.
1313    pub async fn follow(
1314        &self,
1315        room_id: RoomId,
1316        project_id: ProjectId,
1317        leader_connection: ConnectionId,
1318        follower_connection: ConnectionId,
1319    ) -> Result<TransactionGuard<proto::Room>> {
1320        self.room_transaction(room_id, |tx| async move {
1321            follower::ActiveModel {
1322                room_id: ActiveValue::set(room_id),
1323                project_id: ActiveValue::set(project_id),
1324                leader_connection_server_id: ActiveValue::set(ServerId(
1325                    leader_connection.owner_id as i32,
1326                )),
1327                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1328                follower_connection_server_id: ActiveValue::set(ServerId(
1329                    follower_connection.owner_id as i32,
1330                )),
1331                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1332                ..Default::default()
1333            }
1334            .insert(&*tx)
1335            .await?;
1336
1337            let room = self.get_room(room_id, &tx).await?;
1338            Ok(room)
1339        })
1340        .await
1341    }
1342
1343    /// Removes the given follower connection as a follower of the given leader connection.
1344    pub async fn unfollow(
1345        &self,
1346        room_id: RoomId,
1347        project_id: ProjectId,
1348        leader_connection: ConnectionId,
1349        follower_connection: ConnectionId,
1350    ) -> Result<TransactionGuard<proto::Room>> {
1351        self.room_transaction(room_id, |tx| async move {
1352            follower::Entity::delete_many()
1353                .filter(
1354                    Condition::all()
1355                        .add(follower::Column::RoomId.eq(room_id))
1356                        .add(follower::Column::ProjectId.eq(project_id))
1357                        .add(
1358                            follower::Column::LeaderConnectionServerId
1359                                .eq(leader_connection.owner_id),
1360                        )
1361                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1362                        .add(
1363                            follower::Column::FollowerConnectionServerId
1364                                .eq(follower_connection.owner_id),
1365                        )
1366                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1367                )
1368                .exec(&*tx)
1369                .await?;
1370
1371            let room = self.get_room(room_id, &tx).await?;
1372            Ok(room)
1373        })
1374        .await
1375    }
1376}