projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project {
  95                clock::ReplicaId::REMOTE_SERVER
  96            } else {
  97                clock::ReplicaId::LOCAL
  98            };
  99
 100            project_collaborator::ActiveModel {
 101                project_id: ActiveValue::set(project.id),
 102                connection_id: ActiveValue::set(connection.id as i32),
 103                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 104                user_id: ActiveValue::set(participant.user_id),
 105                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 106                is_host: ActiveValue::set(true),
 107                id: ActiveValue::NotSet,
 108                committer_name: ActiveValue::Set(None),
 109                committer_email: ActiveValue::Set(None),
 110            }
 111            .insert(&*tx)
 112            .await?;
 113
 114            let room = self.get_room(room_id, &tx).await?;
 115            Ok((project.id, room))
 116        })
 117        .await
 118    }
 119
 120    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 121        self.transaction(|tx| async move {
 122            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 123            Ok(())
 124        })
 125        .await
 126    }
 127
 128    /// Unshares the given project.
 129    pub async fn unshare_project(
 130        &self,
 131        project_id: ProjectId,
 132        connection: ConnectionId,
 133    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 134        self.project_transaction(project_id, |tx| async move {
 135            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 136            let project = project::Entity::find_by_id(project_id)
 137                .one(&*tx)
 138                .await?
 139                .context("project not found")?;
 140            let room = if let Some(room_id) = project.room_id {
 141                Some(self.get_room(room_id, &tx).await?)
 142            } else {
 143                None
 144            };
 145            if project.host_connection()? == connection {
 146                return Ok((true, room, guest_connection_ids));
 147            }
 148            Err(anyhow!("cannot unshare a project hosted by another user"))?
 149        })
 150        .await
 151    }
 152
 153    /// Updates the worktrees associated with the given project.
 154    pub async fn update_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        worktrees: &[proto::WorktreeMetadata],
 159    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let project = project::Entity::find_by_id(project_id)
 162                .filter(
 163                    Condition::all()
 164                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 165                        .add(
 166                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 167                        ),
 168                )
 169                .one(&*tx)
 170                .await?
 171                .context("no such project")?;
 172
 173            self.update_project_worktrees(project.id, worktrees, &tx)
 174                .await?;
 175
 176            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 177
 178            let room = if let Some(room_id) = project.room_id {
 179                Some(self.get_room(room_id, &tx).await?)
 180            } else {
 181                None
 182            };
 183
 184            Ok((room, guest_connection_ids))
 185        })
 186        .await
 187    }
 188
 189    pub(in crate::db) async fn update_project_worktrees(
 190        &self,
 191        project_id: ProjectId,
 192        worktrees: &[proto::WorktreeMetadata],
 193        tx: &DatabaseTransaction,
 194    ) -> Result<()> {
 195        if !worktrees.is_empty() {
 196            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 197                id: ActiveValue::set(worktree.id as i64),
 198                project_id: ActiveValue::set(project_id),
 199                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 200                root_name: ActiveValue::set(worktree.root_name.clone()),
 201                visible: ActiveValue::set(worktree.visible),
 202                scan_id: ActiveValue::set(0),
 203                completed_scan_id: ActiveValue::set(0),
 204            }))
 205            .on_conflict(
 206                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 207                    .update_column(worktree::Column::RootName)
 208                    .to_owned(),
 209            )
 210            .exec(tx)
 211            .await?;
 212        }
 213
 214        worktree::Entity::delete_many()
 215            .filter(worktree::Column::ProjectId.eq(project_id).and(
 216                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 217            ))
 218            .exec(tx)
 219            .await?;
 220
 221        Ok(())
 222    }
 223
 224    pub async fn update_worktree(
 225        &self,
 226        update: &proto::UpdateWorktree,
 227        connection: ConnectionId,
 228    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 229        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 230            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 231        {
 232            return Err(anyhow!(
 233                "invalid worktree update. removed entries: {}, updated entries: {}",
 234                update.removed_entries.len(),
 235                update.updated_entries.len()
 236            ))?;
 237        }
 238
 239        let project_id = ProjectId::from_proto(update.project_id);
 240        let worktree_id = update.worktree_id as i64;
 241        self.project_transaction(project_id, |tx| async move {
 242            // Ensure the update comes from the host.
 243            let _project = project::Entity::find_by_id(project_id)
 244                .filter(
 245                    Condition::all()
 246                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 247                        .add(
 248                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 249                        ),
 250                )
 251                .one(&*tx)
 252                .await?
 253                .with_context(|| format!("no such project: {project_id}"))?;
 254
 255            // Update metadata.
 256            worktree::Entity::update(worktree::ActiveModel {
 257                id: ActiveValue::set(worktree_id),
 258                project_id: ActiveValue::set(project_id),
 259                root_name: ActiveValue::set(update.root_name.clone()),
 260                scan_id: ActiveValue::set(update.scan_id as i64),
 261                completed_scan_id: if update.is_last_update {
 262                    ActiveValue::set(update.scan_id as i64)
 263                } else {
 264                    ActiveValue::default()
 265                },
 266                abs_path: ActiveValue::set(update.abs_path.clone()),
 267                ..Default::default()
 268            })
 269            .exec(&*tx)
 270            .await?;
 271
 272            if !update.updated_entries.is_empty() {
 273                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 274                    let mtime = entry.mtime.clone().unwrap_or_default();
 275                    worktree_entry::ActiveModel {
 276                        project_id: ActiveValue::set(project_id),
 277                        worktree_id: ActiveValue::set(worktree_id),
 278                        id: ActiveValue::set(entry.id as i64),
 279                        is_dir: ActiveValue::set(entry.is_dir),
 280                        path: ActiveValue::set(entry.path.clone()),
 281                        inode: ActiveValue::set(entry.inode as i64),
 282                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 283                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 284                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 285                        is_ignored: ActiveValue::set(entry.is_ignored),
 286                        git_status: ActiveValue::set(None),
 287                        is_external: ActiveValue::set(entry.is_external),
 288                        is_deleted: ActiveValue::set(false),
 289                        is_hidden: ActiveValue::set(entry.is_hidden),
 290                        scan_id: ActiveValue::set(update.scan_id as i64),
 291                        is_fifo: ActiveValue::set(entry.is_fifo),
 292                    }
 293                }))
 294                .on_conflict(
 295                    OnConflict::columns([
 296                        worktree_entry::Column::ProjectId,
 297                        worktree_entry::Column::WorktreeId,
 298                        worktree_entry::Column::Id,
 299                    ])
 300                    .update_columns([
 301                        worktree_entry::Column::IsDir,
 302                        worktree_entry::Column::Path,
 303                        worktree_entry::Column::Inode,
 304                        worktree_entry::Column::MtimeSeconds,
 305                        worktree_entry::Column::MtimeNanos,
 306                        worktree_entry::Column::CanonicalPath,
 307                        worktree_entry::Column::IsIgnored,
 308                        worktree_entry::Column::IsHidden,
 309                        worktree_entry::Column::ScanId,
 310                    ])
 311                    .to_owned(),
 312                )
 313                .exec(&*tx)
 314                .await?;
 315            }
 316
 317            if !update.removed_entries.is_empty() {
 318                worktree_entry::Entity::update_many()
 319                    .filter(
 320                        worktree_entry::Column::ProjectId
 321                            .eq(project_id)
 322                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 323                            .and(
 324                                worktree_entry::Column::Id
 325                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 326                            ),
 327                    )
 328                    .set(worktree_entry::ActiveModel {
 329                        is_deleted: ActiveValue::Set(true),
 330                        scan_id: ActiveValue::Set(update.scan_id as i64),
 331                        ..Default::default()
 332                    })
 333                    .exec(&*tx)
 334                    .await?;
 335            }
 336
 337            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 338            Ok(connection_ids)
 339        })
 340        .await
 341    }
 342
 343    pub async fn update_repository(
 344        &self,
 345        update: &proto::UpdateRepository,
 346        _connection: ConnectionId,
 347    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 348        let project_id = ProjectId::from_proto(update.project_id);
 349        let repository_id = update.id as i64;
 350        self.project_transaction(project_id, |tx| async move {
 351            project_repository::Entity::insert(project_repository::ActiveModel {
 352                project_id: ActiveValue::set(project_id),
 353                id: ActiveValue::set(repository_id),
 354                legacy_worktree_id: ActiveValue::set(None),
 355                abs_path: ActiveValue::set(update.abs_path.clone()),
 356                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 357                scan_id: ActiveValue::set(update.scan_id as i64),
 358                is_deleted: ActiveValue::set(false),
 359                branch_summary: ActiveValue::Set(
 360                    update
 361                        .branch_summary
 362                        .as_ref()
 363                        .map(|summary| serde_json::to_string(summary).unwrap()),
 364                ),
 365                head_commit_details: ActiveValue::Set(
 366                    update
 367                        .head_commit_details
 368                        .as_ref()
 369                        .map(|details| serde_json::to_string(details).unwrap()),
 370                ),
 371                current_merge_conflicts: ActiveValue::Set(Some(
 372                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 373                )),
 374                merge_message: ActiveValue::set(update.merge_message.clone()),
 375                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 376                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 377                linked_worktrees: ActiveValue::Set(Some(
 378                    serde_json::to_string(&update.linked_worktrees).unwrap(),
 379                )),
 380            })
 381            .on_conflict(
 382                OnConflict::columns([
 383                    project_repository::Column::ProjectId,
 384                    project_repository::Column::Id,
 385                ])
 386                .update_columns([
 387                    project_repository::Column::ScanId,
 388                    project_repository::Column::BranchSummary,
 389                    project_repository::Column::EntryIds,
 390                    project_repository::Column::AbsPath,
 391                    project_repository::Column::CurrentMergeConflicts,
 392                    project_repository::Column::HeadCommitDetails,
 393                    project_repository::Column::MergeMessage,
 394                    project_repository::Column::LinkedWorktrees,
 395                ])
 396                .to_owned(),
 397            )
 398            .exec(&*tx)
 399            .await?;
 400
 401            let has_any_statuses = !update.updated_statuses.is_empty();
 402
 403            if has_any_statuses {
 404                project_repository_statuses::Entity::insert_many(
 405                    update.updated_statuses.iter().map(|status_entry| {
 406                        let (repo_path, status_kind, first_status, second_status) =
 407                            proto_status_to_db(status_entry.clone());
 408                        project_repository_statuses::ActiveModel {
 409                            project_id: ActiveValue::set(project_id),
 410                            repository_id: ActiveValue::set(repository_id),
 411                            scan_id: ActiveValue::set(update.scan_id as i64),
 412                            is_deleted: ActiveValue::set(false),
 413                            repo_path: ActiveValue::set(repo_path),
 414                            status: ActiveValue::set(0),
 415                            status_kind: ActiveValue::set(status_kind),
 416                            first_status: ActiveValue::set(first_status),
 417                            second_status: ActiveValue::set(second_status),
 418                            lines_added: ActiveValue::set(
 419                                status_entry.diff_stat_added.map(|v| v as i32),
 420                            ),
 421                            lines_deleted: ActiveValue::set(
 422                                status_entry.diff_stat_deleted.map(|v| v as i32),
 423                            ),
 424                        }
 425                    }),
 426                )
 427                .on_conflict(
 428                    OnConflict::columns([
 429                        project_repository_statuses::Column::ProjectId,
 430                        project_repository_statuses::Column::RepositoryId,
 431                        project_repository_statuses::Column::RepoPath,
 432                    ])
 433                    .update_columns([
 434                        project_repository_statuses::Column::ScanId,
 435                        project_repository_statuses::Column::StatusKind,
 436                        project_repository_statuses::Column::FirstStatus,
 437                        project_repository_statuses::Column::SecondStatus,
 438                        project_repository_statuses::Column::LinesAdded,
 439                        project_repository_statuses::Column::LinesDeleted,
 440                    ])
 441                    .to_owned(),
 442                )
 443                .exec(&*tx)
 444                .await?;
 445            }
 446
 447            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 448
 449            if has_any_removed_statuses {
 450                project_repository_statuses::Entity::update_many()
 451                    .filter(
 452                        project_repository_statuses::Column::ProjectId
 453                            .eq(project_id)
 454                            .and(
 455                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 456                            )
 457                            .and(
 458                                project_repository_statuses::Column::RepoPath
 459                                    .is_in(update.removed_statuses.iter()),
 460                            ),
 461                    )
 462                    .set(project_repository_statuses::ActiveModel {
 463                        is_deleted: ActiveValue::Set(true),
 464                        scan_id: ActiveValue::Set(update.scan_id as i64),
 465                        ..Default::default()
 466                    })
 467                    .exec(&*tx)
 468                    .await?;
 469            }
 470
 471            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 472            Ok(connection_ids)
 473        })
 474        .await
 475    }
 476
 477    pub async fn remove_repository(
 478        &self,
 479        remove: &proto::RemoveRepository,
 480        _connection: ConnectionId,
 481    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 482        let project_id = ProjectId::from_proto(remove.project_id);
 483        let repository_id = remove.id as i64;
 484        self.project_transaction(project_id, |tx| async move {
 485            project_repository::Entity::update_many()
 486                .filter(
 487                    project_repository::Column::ProjectId
 488                        .eq(project_id)
 489                        .and(project_repository::Column::Id.eq(repository_id)),
 490                )
 491                .set(project_repository::ActiveModel {
 492                    is_deleted: ActiveValue::Set(true),
 493                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 494                    ..Default::default()
 495                })
 496                .exec(&*tx)
 497                .await?;
 498
 499            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 500            Ok(connection_ids)
 501        })
 502        .await
 503    }
 504
 505    /// Updates the diagnostic summary for the given connection.
 506    pub async fn update_diagnostic_summary(
 507        &self,
 508        update: &proto::UpdateDiagnosticSummary,
 509        connection: ConnectionId,
 510    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 511        let project_id = ProjectId::from_proto(update.project_id);
 512        let worktree_id = update.worktree_id as i64;
 513        self.project_transaction(project_id, |tx| async move {
 514            let summary = update.summary.as_ref().context("invalid summary")?;
 515
 516            // Ensure the update comes from the host.
 517            let project = project::Entity::find_by_id(project_id)
 518                .one(&*tx)
 519                .await?
 520                .context("no such project")?;
 521            if project.host_connection()? != connection {
 522                return Err(anyhow!("can't update a project hosted by someone else"))?;
 523            }
 524
 525            // Update summary.
 526            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 527                project_id: ActiveValue::set(project_id),
 528                worktree_id: ActiveValue::set(worktree_id),
 529                path: ActiveValue::set(summary.path.clone()),
 530                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 531                error_count: ActiveValue::set(summary.error_count as i32),
 532                warning_count: ActiveValue::set(summary.warning_count as i32),
 533            })
 534            .on_conflict(
 535                OnConflict::columns([
 536                    worktree_diagnostic_summary::Column::ProjectId,
 537                    worktree_diagnostic_summary::Column::WorktreeId,
 538                    worktree_diagnostic_summary::Column::Path,
 539                ])
 540                .update_columns([
 541                    worktree_diagnostic_summary::Column::LanguageServerId,
 542                    worktree_diagnostic_summary::Column::ErrorCount,
 543                    worktree_diagnostic_summary::Column::WarningCount,
 544                ])
 545                .to_owned(),
 546            )
 547            .exec(&*tx)
 548            .await?;
 549
 550            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 551            Ok(connection_ids)
 552        })
 553        .await
 554    }
 555
 556    /// Starts the language server for the given connection.
 557    pub async fn start_language_server(
 558        &self,
 559        update: &proto::StartLanguageServer,
 560        connection: ConnectionId,
 561    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 562        let project_id = ProjectId::from_proto(update.project_id);
 563        self.project_transaction(project_id, |tx| async move {
 564            let server = update.server.as_ref().context("invalid language server")?;
 565
 566            // Ensure the update comes from the host.
 567            let project = project::Entity::find_by_id(project_id)
 568                .one(&*tx)
 569                .await?
 570                .context("no such project")?;
 571            if project.host_connection()? != connection {
 572                return Err(anyhow!("can't update a project hosted by someone else"))?;
 573            }
 574
 575            // Add the newly-started language server.
 576            language_server::Entity::insert(language_server::ActiveModel {
 577                project_id: ActiveValue::set(project_id),
 578                id: ActiveValue::set(server.id as i64),
 579                name: ActiveValue::set(server.name.clone()),
 580                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 581                capabilities: ActiveValue::set(update.capabilities.clone()),
 582            })
 583            .on_conflict(
 584                OnConflict::columns([
 585                    language_server::Column::ProjectId,
 586                    language_server::Column::Id,
 587                ])
 588                .update_columns([
 589                    language_server::Column::Name,
 590                    language_server::Column::Capabilities,
 591                    language_server::Column::WorktreeId,
 592                ])
 593                .to_owned(),
 594            )
 595            .exec(&*tx)
 596            .await?;
 597
 598            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 599            Ok(connection_ids)
 600        })
 601        .await
 602    }
 603
 604    /// Updates the worktree settings for the given connection.
 605    pub async fn update_worktree_settings(
 606        &self,
 607        update: &proto::UpdateWorktreeSettings,
 608        connection: ConnectionId,
 609    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 610        let project_id = ProjectId::from_proto(update.project_id);
 611        let kind = match update.kind {
 612            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 613                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 614            None => proto::LocalSettingsKind::Settings,
 615        };
 616        let kind = LocalSettingsKind::from_proto(kind);
 617        self.project_transaction(project_id, |tx| async move {
 618            // Ensure the update comes from the host.
 619            let project = project::Entity::find_by_id(project_id)
 620                .one(&*tx)
 621                .await?
 622                .context("no such project")?;
 623            if project.host_connection()? != connection {
 624                return Err(anyhow!("can't update a project hosted by someone else"))?;
 625            }
 626
 627            if let Some(content) = &update.content {
 628                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 629                    project_id: ActiveValue::Set(project_id),
 630                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 631                    path: ActiveValue::Set(update.path.clone()),
 632                    content: ActiveValue::Set(content.clone()),
 633                    kind: ActiveValue::Set(kind),
 634                    outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
 635                })
 636                .on_conflict(
 637                    OnConflict::columns([
 638                        worktree_settings_file::Column::ProjectId,
 639                        worktree_settings_file::Column::WorktreeId,
 640                        worktree_settings_file::Column::Path,
 641                    ])
 642                    .update_columns([
 643                        worktree_settings_file::Column::Content,
 644                        worktree_settings_file::Column::OutsideWorktree,
 645                    ])
 646                    .to_owned(),
 647                )
 648                .exec(&*tx)
 649                .await?;
 650            } else {
 651                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 652                    project_id: ActiveValue::Set(project_id),
 653                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 654                    path: ActiveValue::Set(update.path.clone()),
 655                    ..Default::default()
 656                })
 657                .exec(&*tx)
 658                .await?;
 659            }
 660
 661            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 662            Ok(connection_ids)
 663        })
 664        .await
 665    }
 666
 667    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 668        self.transaction(|tx| async move {
 669            Ok(project::Entity::find_by_id(id)
 670                .one(&*tx)
 671                .await?
 672                .context("no such project")?)
 673        })
 674        .await
 675    }
 676
 677    /// Adds the given connection to the specified project
 678    /// in the current room.
 679    pub async fn join_project(
 680        &self,
 681        project_id: ProjectId,
 682        connection: ConnectionId,
 683        user_id: UserId,
 684        committer_name: Option<String>,
 685        committer_email: Option<String>,
 686    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 687        self.project_transaction(project_id, move |tx| {
 688            let committer_name = committer_name.clone();
 689            let committer_email = committer_email.clone();
 690            async move {
 691                let (project, role) = self
 692                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 693                    .await?;
 694                self.join_project_internal(
 695                    project,
 696                    user_id,
 697                    committer_name,
 698                    committer_email,
 699                    connection,
 700                    role,
 701                    &tx,
 702                )
 703                .await
 704            }
 705        })
 706        .await
 707    }
 708
 709    async fn join_project_internal(
 710        &self,
 711        project: project::Model,
 712        user_id: UserId,
 713        committer_name: Option<String>,
 714        committer_email: Option<String>,
 715        connection: ConnectionId,
 716        role: ChannelRole,
 717        tx: &DatabaseTransaction,
 718    ) -> Result<(Project, ReplicaId)> {
 719        let mut collaborators = project
 720            .find_related(project_collaborator::Entity)
 721            .all(tx)
 722            .await?;
 723        let replica_ids = collaborators
 724            .iter()
 725            .map(|c| c.replica_id)
 726            .collect::<HashSet<_>>();
 727        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 728        while replica_ids.contains(&replica_id) {
 729            replica_id.0 += 1;
 730        }
 731        let new_collaborator = project_collaborator::ActiveModel {
 732            project_id: ActiveValue::set(project.id),
 733            connection_id: ActiveValue::set(connection.id as i32),
 734            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 735            user_id: ActiveValue::set(user_id),
 736            replica_id: ActiveValue::set(replica_id),
 737            is_host: ActiveValue::set(false),
 738            id: ActiveValue::NotSet,
 739            committer_name: ActiveValue::set(committer_name),
 740            committer_email: ActiveValue::set(committer_email),
 741        }
 742        .insert(tx)
 743        .await?;
 744        collaborators.push(new_collaborator);
 745
 746        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 747        let mut worktrees = db_worktrees
 748            .into_iter()
 749            .map(|db_worktree| {
 750                (
 751                    db_worktree.id as u64,
 752                    Worktree {
 753                        id: db_worktree.id as u64,
 754                        abs_path: db_worktree.abs_path,
 755                        root_name: db_worktree.root_name,
 756                        visible: db_worktree.visible,
 757                        entries: Default::default(),
 758                        diagnostic_summaries: Default::default(),
 759                        settings_files: Default::default(),
 760                        scan_id: db_worktree.scan_id as u64,
 761                        completed_scan_id: db_worktree.completed_scan_id as u64,
 762                        legacy_repository_entries: Default::default(),
 763                    },
 764                )
 765            })
 766            .collect::<BTreeMap<_, _>>();
 767
 768        // Populate worktree entries.
 769        {
 770            let mut db_entries = worktree_entry::Entity::find()
 771                .filter(
 772                    Condition::all()
 773                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 774                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 775                )
 776                .stream(tx)
 777                .await?;
 778            while let Some(db_entry) = db_entries.next().await {
 779                let db_entry = db_entry?;
 780                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 781                    worktree.entries.push(proto::Entry {
 782                        id: db_entry.id as u64,
 783                        is_dir: db_entry.is_dir,
 784                        path: db_entry.path,
 785                        inode: db_entry.inode as u64,
 786                        mtime: Some(proto::Timestamp {
 787                            seconds: db_entry.mtime_seconds as u64,
 788                            nanos: db_entry.mtime_nanos as u32,
 789                        }),
 790                        canonical_path: db_entry.canonical_path,
 791                        is_ignored: db_entry.is_ignored,
 792                        is_external: db_entry.is_external,
 793                        is_hidden: db_entry.is_hidden,
 794                        // This is only used in the summarization backlog, so if it's None,
 795                        // that just means we won't be able to detect when to resummarize
 796                        // based on total number of backlogged bytes - instead, we'd go
 797                        // on number of files only. That shouldn't be a huge deal in practice.
 798                        size: None,
 799                        is_fifo: db_entry.is_fifo,
 800                    });
 801                }
 802            }
 803        }
 804
 805        // Populate repository entries.
 806        let mut repositories = Vec::new();
 807        {
 808            let db_repository_entries = project_repository::Entity::find()
 809                .filter(
 810                    Condition::all()
 811                        .add(project_repository::Column::ProjectId.eq(project.id))
 812                        .add(project_repository::Column::IsDeleted.eq(false)),
 813                )
 814                .all(tx)
 815                .await?;
 816            for db_repository_entry in db_repository_entries {
 817                let mut repository_statuses = project_repository_statuses::Entity::find()
 818                    .filter(
 819                        Condition::all()
 820                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 821                            .add(
 822                                project_repository_statuses::Column::RepositoryId
 823                                    .eq(db_repository_entry.id),
 824                            )
 825                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 826                    )
 827                    .stream(tx)
 828                    .await?;
 829                let mut updated_statuses = Vec::new();
 830                while let Some(status_entry) = repository_statuses.next().await {
 831                    let status_entry = status_entry?;
 832                    updated_statuses.push(db_status_to_proto(status_entry)?);
 833                }
 834
 835                let current_merge_conflicts = db_repository_entry
 836                    .current_merge_conflicts
 837                    .as_ref()
 838                    .map(|conflicts| serde_json::from_str(conflicts))
 839                    .transpose()?
 840                    .unwrap_or_default();
 841
 842                let branch_summary = db_repository_entry
 843                    .branch_summary
 844                    .as_ref()
 845                    .map(|branch_summary| serde_json::from_str(branch_summary))
 846                    .transpose()?
 847                    .unwrap_or_default();
 848
 849                let head_commit_details = db_repository_entry
 850                    .head_commit_details
 851                    .as_ref()
 852                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 853                    .transpose()?
 854                    .unwrap_or_default();
 855
 856                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 857                    .context("failed to deserialize repository's entry ids")?;
 858
 859                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 860                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 861                        worktree.legacy_repository_entries.insert(
 862                            db_repository_entry.id as u64,
 863                            proto::RepositoryEntry {
 864                                repository_id: db_repository_entry.id as u64,
 865                                updated_statuses,
 866                                removed_statuses: Vec::new(),
 867                                current_merge_conflicts,
 868                                branch_summary,
 869                            },
 870                        );
 871                    }
 872                } else {
 873                    repositories.push(proto::UpdateRepository {
 874                        project_id: db_repository_entry.project_id.0 as u64,
 875                        id: db_repository_entry.id as u64,
 876                        abs_path: db_repository_entry.abs_path.clone(),
 877                        entry_ids,
 878                        updated_statuses,
 879                        removed_statuses: Vec::new(),
 880                        current_merge_conflicts,
 881                        branch_summary,
 882                        head_commit_details,
 883                        scan_id: db_repository_entry.scan_id as u64,
 884                        is_last_update: true,
 885                        merge_message: db_repository_entry.merge_message,
 886                        stash_entries: Vec::new(),
 887                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
 888                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
 889                        original_repo_abs_path: Some(db_repository_entry.abs_path),
 890                        linked_worktrees: db_repository_entry
 891                            .linked_worktrees
 892                            .as_deref()
 893                            .and_then(|s| serde_json::from_str(s).ok())
 894                            .unwrap_or_default(),
 895                    });
 896                }
 897            }
 898        }
 899
 900        // Populate worktree diagnostic summaries.
 901        {
 902            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 903                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 904                .stream(tx)
 905                .await?;
 906            while let Some(db_summary) = db_summaries.next().await {
 907                let db_summary = db_summary?;
 908                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 909                    worktree
 910                        .diagnostic_summaries
 911                        .push(proto::DiagnosticSummary {
 912                            path: db_summary.path,
 913                            language_server_id: db_summary.language_server_id as u64,
 914                            error_count: db_summary.error_count as u32,
 915                            warning_count: db_summary.warning_count as u32,
 916                        });
 917                }
 918            }
 919        }
 920
 921        // Populate worktree settings files
 922        {
 923            let mut db_settings_files = worktree_settings_file::Entity::find()
 924                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 925                .stream(tx)
 926                .await?;
 927            while let Some(db_settings_file) = db_settings_files.next().await {
 928                let db_settings_file = db_settings_file?;
 929                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 930                    worktree.settings_files.push(WorktreeSettingsFile {
 931                        path: db_settings_file.path,
 932                        content: db_settings_file.content,
 933                        kind: db_settings_file.kind,
 934                        outside_worktree: db_settings_file.outside_worktree,
 935                    });
 936                }
 937            }
 938        }
 939
 940        // Populate language servers.
 941        let language_servers = project
 942            .find_related(language_server::Entity)
 943            .all(tx)
 944            .await?;
 945
 946        let path_style = if project.windows_paths {
 947            PathStyle::Windows
 948        } else {
 949            PathStyle::Posix
 950        };
 951
 952        let project = Project {
 953            id: project.id,
 954            role,
 955            collaborators: collaborators
 956                .into_iter()
 957                .map(|collaborator| ProjectCollaborator {
 958                    connection_id: collaborator.connection(),
 959                    user_id: collaborator.user_id,
 960                    replica_id: collaborator.replica_id,
 961                    is_host: collaborator.is_host,
 962                    committer_name: collaborator.committer_name,
 963                    committer_email: collaborator.committer_email,
 964                })
 965                .collect(),
 966            worktrees,
 967            repositories,
 968            language_servers: language_servers
 969                .into_iter()
 970                .map(|language_server| LanguageServer {
 971                    server: proto::LanguageServer {
 972                        id: language_server.id as u64,
 973                        name: language_server.name,
 974                        worktree_id: language_server.worktree_id.map(|id| id as u64),
 975                    },
 976                    capabilities: language_server.capabilities,
 977                })
 978                .collect(),
 979            path_style,
 980        };
 981        Ok((project, replica_id as ReplicaId))
 982    }
 983
 984    /// Removes the given connection from the specified project.
 985    pub async fn leave_project(
 986        &self,
 987        project_id: ProjectId,
 988        connection: ConnectionId,
 989    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 990        self.project_transaction(project_id, |tx| async move {
 991            let result = project_collaborator::Entity::delete_many()
 992                .filter(
 993                    Condition::all()
 994                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 995                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 996                        .add(
 997                            project_collaborator::Column::ConnectionServerId
 998                                .eq(connection.owner_id as i32),
 999                        ),
1000                )
1001                .exec(&*tx)
1002                .await?;
1003            if result.rows_affected == 0 {
1004                Err(anyhow!("not a collaborator on this project"))?;
1005            }
1006
1007            let project = project::Entity::find_by_id(project_id)
1008                .one(&*tx)
1009                .await?
1010                .context("no such project")?;
1011            let collaborators = project
1012                .find_related(project_collaborator::Entity)
1013                .all(&*tx)
1014                .await?;
1015            let connection_ids: Vec<ConnectionId> = collaborators
1016                .into_iter()
1017                .map(|collaborator| collaborator.connection())
1018                .collect();
1019
1020            follower::Entity::delete_many()
1021                .filter(
1022                    Condition::any()
1023                        .add(
1024                            Condition::all()
1025                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1026                                .add(
1027                                    follower::Column::LeaderConnectionServerId
1028                                        .eq(connection.owner_id),
1029                                )
1030                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1031                        )
1032                        .add(
1033                            Condition::all()
1034                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1035                                .add(
1036                                    follower::Column::FollowerConnectionServerId
1037                                        .eq(connection.owner_id),
1038                                )
1039                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1040                        ),
1041                )
1042                .exec(&*tx)
1043                .await?;
1044
1045            let room = if let Some(room_id) = project.room_id {
1046                Some(self.get_room(room_id, &tx).await?)
1047            } else {
1048                None
1049            };
1050
1051            let left_project = LeftProject {
1052                id: project_id,
1053                should_unshare: connection == project.host_connection()?,
1054                connection_ids,
1055            };
1056            Ok((room, left_project))
1057        })
1058        .await
1059    }
1060
1061    pub async fn check_user_is_project_host(
1062        &self,
1063        project_id: ProjectId,
1064        connection_id: ConnectionId,
1065    ) -> Result<()> {
1066        self.project_transaction(project_id, |tx| async move {
1067            project::Entity::find()
1068                .filter(
1069                    Condition::all()
1070                        .add(project::Column::Id.eq(project_id))
1071                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1072                        .add(
1073                            project::Column::HostConnectionServerId
1074                                .eq(Some(connection_id.owner_id as i32)),
1075                        ),
1076                )
1077                .one(&*tx)
1078                .await?
1079                .context("failed to read project host")?;
1080
1081            Ok(())
1082        })
1083        .await
1084        .map(|guard| guard.into_inner())
1085    }
1086
1087    /// Returns the current project if the given user is authorized to access it with the specified capability.
1088    pub async fn access_project(
1089        &self,
1090        project_id: ProjectId,
1091        connection_id: ConnectionId,
1092        capability: Capability,
1093        tx: &DatabaseTransaction,
1094    ) -> Result<(project::Model, ChannelRole)> {
1095        let project = project::Entity::find_by_id(project_id)
1096            .one(tx)
1097            .await?
1098            .context("no such project")?;
1099
1100        let role_from_room = if let Some(room_id) = project.room_id {
1101            room_participant::Entity::find()
1102                .filter(room_participant::Column::RoomId.eq(room_id))
1103                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1104                .one(tx)
1105                .await?
1106                .and_then(|participant| participant.role)
1107        } else {
1108            None
1109        };
1110
1111        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1112
1113        match capability {
1114            Capability::ReadWrite => {
1115                if !role.can_edit_projects() {
1116                    return Err(anyhow!("not authorized to edit projects"))?;
1117                }
1118            }
1119            Capability::ReadOnly => {
1120                if !role.can_read_projects() {
1121                    return Err(anyhow!("not authorized to read projects"))?;
1122                }
1123            }
1124        }
1125
1126        Ok((project, role))
1127    }
1128
1129    /// Returns the host connection for a read-only request to join a shared project.
1130    pub async fn host_for_read_only_project_request(
1131        &self,
1132        project_id: ProjectId,
1133        connection_id: ConnectionId,
1134    ) -> Result<ConnectionId> {
1135        self.project_transaction(project_id, |tx| async move {
1136            let (project, _) = self
1137                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1138                .await?;
1139            project.host_connection()
1140        })
1141        .await
1142        .map(|guard| guard.into_inner())
1143    }
1144
1145    /// Returns the host connection for a request to join a shared project.
1146    pub async fn host_for_mutating_project_request(
1147        &self,
1148        project_id: ProjectId,
1149        connection_id: ConnectionId,
1150    ) -> Result<ConnectionId> {
1151        self.project_transaction(project_id, |tx| async move {
1152            let (project, _) = self
1153                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1154                .await?;
1155            project.host_connection()
1156        })
1157        .await
1158        .map(|guard| guard.into_inner())
1159    }
1160
1161    pub async fn connections_for_buffer_update(
1162        &self,
1163        project_id: ProjectId,
1164        connection_id: ConnectionId,
1165        capability: Capability,
1166    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1167        self.project_transaction(project_id, |tx| async move {
1168            // Authorize
1169            let (project, _) = self
1170                .access_project(project_id, connection_id, capability, &tx)
1171                .await?;
1172
1173            let host_connection_id = project.host_connection()?;
1174
1175            let collaborators = project_collaborator::Entity::find()
1176                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1177                .all(&*tx)
1178                .await?;
1179
1180            let guest_connection_ids = collaborators
1181                .into_iter()
1182                .filter_map(|collaborator| {
1183                    if collaborator.is_host {
1184                        None
1185                    } else {
1186                        Some(collaborator.connection())
1187                    }
1188                })
1189                .collect();
1190
1191            Ok((host_connection_id, guest_connection_ids))
1192        })
1193        .await
1194    }
1195
1196    /// Returns the connection IDs in the given project.
1197    ///
1198    /// The provided `connection_id` must also be a collaborator in the project,
1199    /// otherwise an error will be returned.
1200    pub async fn project_connection_ids(
1201        &self,
1202        project_id: ProjectId,
1203        connection_id: ConnectionId,
1204        exclude_dev_server: bool,
1205    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1206        self.project_transaction(project_id, |tx| async move {
1207            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1208                .await
1209        })
1210        .await
1211    }
1212
1213    async fn internal_project_connection_ids(
1214        &self,
1215        project_id: ProjectId,
1216        connection_id: ConnectionId,
1217        exclude_dev_server: bool,
1218        tx: &DatabaseTransaction,
1219    ) -> Result<HashSet<ConnectionId>> {
1220        let project = project::Entity::find_by_id(project_id)
1221            .one(tx)
1222            .await?
1223            .context("no such project")?;
1224
1225        let mut collaborators = project_collaborator::Entity::find()
1226            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1227            .stream(tx)
1228            .await?;
1229
1230        let mut connection_ids = HashSet::default();
1231        if let Some(host_connection) = project.host_connection().log_err()
1232            && !exclude_dev_server
1233        {
1234            connection_ids.insert(host_connection);
1235        }
1236
1237        while let Some(collaborator) = collaborators.next().await {
1238            let collaborator = collaborator?;
1239            connection_ids.insert(collaborator.connection());
1240        }
1241
1242        if connection_ids.contains(&connection_id)
1243            || Some(connection_id) == project.host_connection().ok()
1244        {
1245            Ok(connection_ids)
1246        } else {
1247            Err(anyhow!(
1248                "can only send project updates to a project you're in"
1249            ))?
1250        }
1251    }
1252
1253    async fn project_guest_connection_ids(
1254        &self,
1255        project_id: ProjectId,
1256        tx: &DatabaseTransaction,
1257    ) -> Result<Vec<ConnectionId>> {
1258        let mut collaborators = project_collaborator::Entity::find()
1259            .filter(
1260                project_collaborator::Column::ProjectId
1261                    .eq(project_id)
1262                    .and(project_collaborator::Column::IsHost.eq(false)),
1263            )
1264            .stream(tx)
1265            .await?;
1266
1267        let mut guest_connection_ids = Vec::new();
1268        while let Some(collaborator) = collaborators.next().await {
1269            let collaborator = collaborator?;
1270            guest_connection_ids.push(collaborator.connection());
1271        }
1272        Ok(guest_connection_ids)
1273    }
1274
1275    /// Returns the [`RoomId`] for the given project.
1276    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1277        self.transaction(|tx| async move {
1278            Ok(project::Entity::find_by_id(project_id)
1279                .one(&*tx)
1280                .await?
1281                .and_then(|project| project.room_id))
1282        })
1283        .await
1284    }
1285
1286    pub async fn check_room_participants(
1287        &self,
1288        room_id: RoomId,
1289        leader_id: ConnectionId,
1290        follower_id: ConnectionId,
1291    ) -> Result<()> {
1292        self.transaction(|tx| async move {
1293            use room_participant::Column;
1294
1295            let count = room_participant::Entity::find()
1296                .filter(
1297                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1298                        Condition::any()
1299                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1300                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1301                            ))
1302                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1303                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1304                            )),
1305                    ),
1306                )
1307                .count(&*tx)
1308                .await?;
1309
1310            if count < 2 {
1311                Err(anyhow!("not room participants"))?;
1312            }
1313
1314            Ok(())
1315        })
1316        .await
1317    }
1318
1319    /// Adds the given follower connection as a follower of the given leader connection.
1320    pub async fn follow(
1321        &self,
1322        room_id: RoomId,
1323        project_id: ProjectId,
1324        leader_connection: ConnectionId,
1325        follower_connection: ConnectionId,
1326    ) -> Result<TransactionGuard<proto::Room>> {
1327        self.room_transaction(room_id, |tx| async move {
1328            follower::ActiveModel {
1329                room_id: ActiveValue::set(room_id),
1330                project_id: ActiveValue::set(project_id),
1331                leader_connection_server_id: ActiveValue::set(ServerId(
1332                    leader_connection.owner_id as i32,
1333                )),
1334                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1335                follower_connection_server_id: ActiveValue::set(ServerId(
1336                    follower_connection.owner_id as i32,
1337                )),
1338                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1339                ..Default::default()
1340            }
1341            .insert(&*tx)
1342            .await?;
1343
1344            let room = self.get_room(room_id, &tx).await?;
1345            Ok(room)
1346        })
1347        .await
1348    }
1349
1350    /// Removes the given follower connection as a follower of the given leader connection.
1351    pub async fn unfollow(
1352        &self,
1353        room_id: RoomId,
1354        project_id: ProjectId,
1355        leader_connection: ConnectionId,
1356        follower_connection: ConnectionId,
1357    ) -> Result<TransactionGuard<proto::Room>> {
1358        self.room_transaction(room_id, |tx| async move {
1359            follower::Entity::delete_many()
1360                .filter(
1361                    Condition::all()
1362                        .add(follower::Column::RoomId.eq(room_id))
1363                        .add(follower::Column::ProjectId.eq(project_id))
1364                        .add(
1365                            follower::Column::LeaderConnectionServerId
1366                                .eq(leader_connection.owner_id),
1367                        )
1368                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1369                        .add(
1370                            follower::Column::FollowerConnectionServerId
1371                                .eq(follower_connection.owner_id),
1372                        )
1373                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1374                )
1375                .exec(&*tx)
1376                .await?;
1377
1378            let room = self.get_room(room_id, &tx).await?;
1379            Ok(room)
1380        })
1381        .await
1382    }
1383}