projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37        features: &[String],
  38    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  39        self.room_transaction(room_id, |tx| async move {
  40            let participant = room_participant::Entity::find()
  41                .filter(
  42                    Condition::all()
  43                        .add(
  44                            room_participant::Column::AnsweringConnectionId
  45                                .eq(connection.id as i32),
  46                        )
  47                        .add(
  48                            room_participant::Column::AnsweringConnectionServerId
  49                                .eq(connection.owner_id as i32),
  50                        ),
  51                )
  52                .one(&*tx)
  53                .await?
  54                .context("could not find participant")?;
  55            if participant.room_id != room_id {
  56                return Err(anyhow!("shared project on unexpected room"))?;
  57            }
  58            if !participant
  59                .role
  60                .unwrap_or(ChannelRole::Member)
  61                .can_edit_projects()
  62            {
  63                return Err(anyhow!("guests cannot share projects"))?;
  64            }
  65
  66            let project = project::ActiveModel {
  67                room_id: ActiveValue::set(Some(participant.room_id)),
  68                host_user_id: ActiveValue::set(Some(participant.user_id)),
  69                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  70                host_connection_server_id: ActiveValue::set(Some(ServerId(
  71                    connection.owner_id as i32,
  72                ))),
  73                id: ActiveValue::NotSet,
  74                windows_paths: ActiveValue::set(windows_paths),
  75                features: ActiveValue::set(serde_json::to_string(features).unwrap()),
  76            }
  77            .insert(&*tx)
  78            .await?;
  79
  80            if !worktrees.is_empty() {
  81                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  82                    worktree::ActiveModel {
  83                        id: ActiveValue::set(worktree.id as i64),
  84                        project_id: ActiveValue::set(project.id),
  85                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  86                        root_name: ActiveValue::set(worktree.root_name.clone()),
  87                        visible: ActiveValue::set(worktree.visible),
  88                        scan_id: ActiveValue::set(0),
  89                        completed_scan_id: ActiveValue::set(0),
  90                    }
  91                }))
  92                .exec(&*tx)
  93                .await?;
  94            }
  95
  96            let replica_id = if is_ssh_project {
  97                clock::ReplicaId::REMOTE_SERVER
  98            } else {
  99                clock::ReplicaId::LOCAL
 100            };
 101
 102            project_collaborator::ActiveModel {
 103                project_id: ActiveValue::set(project.id),
 104                connection_id: ActiveValue::set(connection.id as i32),
 105                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 106                user_id: ActiveValue::set(participant.user_id),
 107                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 108                is_host: ActiveValue::set(true),
 109                id: ActiveValue::NotSet,
 110                committer_name: ActiveValue::Set(None),
 111                committer_email: ActiveValue::Set(None),
 112            }
 113            .insert(&*tx)
 114            .await?;
 115
 116            let room = self.get_room(room_id, &tx).await?;
 117            Ok((project.id, room))
 118        })
 119        .await
 120    }
 121
 122    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 123        self.transaction(|tx| async move {
 124            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 125            Ok(())
 126        })
 127        .await
 128    }
 129
 130    /// Unshares the given project.
 131    pub async fn unshare_project(
 132        &self,
 133        project_id: ProjectId,
 134        connection: ConnectionId,
 135    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 136        self.project_transaction(project_id, |tx| async move {
 137            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 138            let project = project::Entity::find_by_id(project_id)
 139                .one(&*tx)
 140                .await?
 141                .context("project not found")?;
 142            let room = if let Some(room_id) = project.room_id {
 143                Some(self.get_room(room_id, &tx).await?)
 144            } else {
 145                None
 146            };
 147            if project.host_connection()? == connection {
 148                return Ok((true, room, guest_connection_ids));
 149            }
 150            Err(anyhow!("cannot unshare a project hosted by another user"))?
 151        })
 152        .await
 153    }
 154
 155    /// Updates the worktrees associated with the given project.
 156    pub async fn update_project(
 157        &self,
 158        project_id: ProjectId,
 159        connection: ConnectionId,
 160        worktrees: &[proto::WorktreeMetadata],
 161    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 162        self.project_transaction(project_id, |tx| async move {
 163            let project = project::Entity::find_by_id(project_id)
 164                .filter(
 165                    Condition::all()
 166                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 167                        .add(
 168                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 169                        ),
 170                )
 171                .one(&*tx)
 172                .await?
 173                .context("no such project")?;
 174
 175            self.update_project_worktrees(project.id, worktrees, &tx)
 176                .await?;
 177
 178            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 179
 180            let room = if let Some(room_id) = project.room_id {
 181                Some(self.get_room(room_id, &tx).await?)
 182            } else {
 183                None
 184            };
 185
 186            Ok((room, guest_connection_ids))
 187        })
 188        .await
 189    }
 190
 191    pub(in crate::db) async fn update_project_worktrees(
 192        &self,
 193        project_id: ProjectId,
 194        worktrees: &[proto::WorktreeMetadata],
 195        tx: &DatabaseTransaction,
 196    ) -> Result<()> {
 197        if !worktrees.is_empty() {
 198            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 199                id: ActiveValue::set(worktree.id as i64),
 200                project_id: ActiveValue::set(project_id),
 201                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 202                root_name: ActiveValue::set(worktree.root_name.clone()),
 203                visible: ActiveValue::set(worktree.visible),
 204                scan_id: ActiveValue::set(0),
 205                completed_scan_id: ActiveValue::set(0),
 206            }))
 207            .on_conflict(
 208                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 209                    .update_column(worktree::Column::RootName)
 210                    .to_owned(),
 211            )
 212            .exec(tx)
 213            .await?;
 214        }
 215
 216        worktree::Entity::delete_many()
 217            .filter(worktree::Column::ProjectId.eq(project_id).and(
 218                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 219            ))
 220            .exec(tx)
 221            .await?;
 222
 223        Ok(())
 224    }
 225
 226    pub async fn update_worktree(
 227        &self,
 228        update: &proto::UpdateWorktree,
 229        connection: ConnectionId,
 230    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 231        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 232            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 233        {
 234            return Err(anyhow!(
 235                "invalid worktree update. removed entries: {}, updated entries: {}",
 236                update.removed_entries.len(),
 237                update.updated_entries.len()
 238            ))?;
 239        }
 240
 241        let project_id = ProjectId::from_proto(update.project_id);
 242        let worktree_id = update.worktree_id as i64;
 243        self.project_transaction(project_id, |tx| async move {
 244            // Ensure the update comes from the host.
 245            let _project = project::Entity::find_by_id(project_id)
 246                .filter(
 247                    Condition::all()
 248                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 249                        .add(
 250                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 251                        ),
 252                )
 253                .one(&*tx)
 254                .await?
 255                .with_context(|| format!("no such project: {project_id}"))?;
 256
 257            // Update metadata.
 258            worktree::Entity::update(worktree::ActiveModel {
 259                id: ActiveValue::set(worktree_id),
 260                project_id: ActiveValue::set(project_id),
 261                root_name: ActiveValue::set(update.root_name.clone()),
 262                scan_id: ActiveValue::set(update.scan_id as i64),
 263                completed_scan_id: if update.is_last_update {
 264                    ActiveValue::set(update.scan_id as i64)
 265                } else {
 266                    ActiveValue::default()
 267                },
 268                abs_path: ActiveValue::set(update.abs_path.clone()),
 269                ..Default::default()
 270            })
 271            .exec(&*tx)
 272            .await?;
 273
 274            if !update.updated_entries.is_empty() {
 275                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 276                    let mtime = entry.mtime.clone().unwrap_or_default();
 277                    worktree_entry::ActiveModel {
 278                        project_id: ActiveValue::set(project_id),
 279                        worktree_id: ActiveValue::set(worktree_id),
 280                        id: ActiveValue::set(entry.id as i64),
 281                        is_dir: ActiveValue::set(entry.is_dir),
 282                        path: ActiveValue::set(entry.path.clone()),
 283                        inode: ActiveValue::set(entry.inode as i64),
 284                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 285                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 286                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 287                        is_ignored: ActiveValue::set(entry.is_ignored),
 288                        git_status: ActiveValue::set(None),
 289                        is_external: ActiveValue::set(entry.is_external),
 290                        is_deleted: ActiveValue::set(false),
 291                        is_hidden: ActiveValue::set(entry.is_hidden),
 292                        scan_id: ActiveValue::set(update.scan_id as i64),
 293                        is_fifo: ActiveValue::set(entry.is_fifo),
 294                    }
 295                }))
 296                .on_conflict(
 297                    OnConflict::columns([
 298                        worktree_entry::Column::ProjectId,
 299                        worktree_entry::Column::WorktreeId,
 300                        worktree_entry::Column::Id,
 301                    ])
 302                    .update_columns([
 303                        worktree_entry::Column::IsDir,
 304                        worktree_entry::Column::Path,
 305                        worktree_entry::Column::Inode,
 306                        worktree_entry::Column::MtimeSeconds,
 307                        worktree_entry::Column::MtimeNanos,
 308                        worktree_entry::Column::CanonicalPath,
 309                        worktree_entry::Column::IsIgnored,
 310                        worktree_entry::Column::IsHidden,
 311                        worktree_entry::Column::ScanId,
 312                    ])
 313                    .to_owned(),
 314                )
 315                .exec(&*tx)
 316                .await?;
 317            }
 318
 319            if !update.removed_entries.is_empty() {
 320                worktree_entry::Entity::update_many()
 321                    .filter(
 322                        worktree_entry::Column::ProjectId
 323                            .eq(project_id)
 324                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 325                            .and(
 326                                worktree_entry::Column::Id
 327                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 328                            ),
 329                    )
 330                    .set(worktree_entry::ActiveModel {
 331                        is_deleted: ActiveValue::Set(true),
 332                        scan_id: ActiveValue::Set(update.scan_id as i64),
 333                        ..Default::default()
 334                    })
 335                    .exec(&*tx)
 336                    .await?;
 337            }
 338
 339            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 340            Ok(connection_ids)
 341        })
 342        .await
 343    }
 344
 345    pub async fn update_repository(
 346        &self,
 347        update: &proto::UpdateRepository,
 348        _connection: ConnectionId,
 349    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 350        let project_id = ProjectId::from_proto(update.project_id);
 351        let repository_id = update.id as i64;
 352        self.project_transaction(project_id, |tx| async move {
 353            project_repository::Entity::insert(project_repository::ActiveModel {
 354                project_id: ActiveValue::set(project_id),
 355                id: ActiveValue::set(repository_id),
 356                legacy_worktree_id: ActiveValue::set(None),
 357                abs_path: ActiveValue::set(update.abs_path.clone()),
 358                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 359                scan_id: ActiveValue::set(update.scan_id as i64),
 360                is_deleted: ActiveValue::set(false),
 361                branch_summary: ActiveValue::Set(
 362                    update
 363                        .branch_summary
 364                        .as_ref()
 365                        .map(|summary| serde_json::to_string(summary).unwrap()),
 366                ),
 367                head_commit_details: ActiveValue::Set(
 368                    update
 369                        .head_commit_details
 370                        .as_ref()
 371                        .map(|details| serde_json::to_string(details).unwrap()),
 372                ),
 373                current_merge_conflicts: ActiveValue::Set(Some(
 374                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 375                )),
 376                merge_message: ActiveValue::set(update.merge_message.clone()),
 377                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 378                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 379                linked_worktrees: ActiveValue::Set(Some(
 380                    serde_json::to_string(&update.linked_worktrees).unwrap(),
 381                )),
 382            })
 383            .on_conflict(
 384                OnConflict::columns([
 385                    project_repository::Column::ProjectId,
 386                    project_repository::Column::Id,
 387                ])
 388                .update_columns([
 389                    project_repository::Column::ScanId,
 390                    project_repository::Column::BranchSummary,
 391                    project_repository::Column::EntryIds,
 392                    project_repository::Column::AbsPath,
 393                    project_repository::Column::CurrentMergeConflicts,
 394                    project_repository::Column::HeadCommitDetails,
 395                    project_repository::Column::MergeMessage,
 396                    project_repository::Column::LinkedWorktrees,
 397                ])
 398                .to_owned(),
 399            )
 400            .exec(&*tx)
 401            .await?;
 402
 403            let has_any_statuses = !update.updated_statuses.is_empty();
 404
 405            if has_any_statuses {
 406                project_repository_statuses::Entity::insert_many(
 407                    update.updated_statuses.iter().map(|status_entry| {
 408                        let (repo_path, status_kind, first_status, second_status) =
 409                            proto_status_to_db(status_entry.clone());
 410                        project_repository_statuses::ActiveModel {
 411                            project_id: ActiveValue::set(project_id),
 412                            repository_id: ActiveValue::set(repository_id),
 413                            scan_id: ActiveValue::set(update.scan_id as i64),
 414                            is_deleted: ActiveValue::set(false),
 415                            repo_path: ActiveValue::set(repo_path),
 416                            status: ActiveValue::set(0),
 417                            status_kind: ActiveValue::set(status_kind),
 418                            first_status: ActiveValue::set(first_status),
 419                            second_status: ActiveValue::set(second_status),
 420                            lines_added: ActiveValue::set(
 421                                status_entry.diff_stat_added.map(|v| v as i32),
 422                            ),
 423                            lines_deleted: ActiveValue::set(
 424                                status_entry.diff_stat_deleted.map(|v| v as i32),
 425                            ),
 426                        }
 427                    }),
 428                )
 429                .on_conflict(
 430                    OnConflict::columns([
 431                        project_repository_statuses::Column::ProjectId,
 432                        project_repository_statuses::Column::RepositoryId,
 433                        project_repository_statuses::Column::RepoPath,
 434                    ])
 435                    .update_columns([
 436                        project_repository_statuses::Column::ScanId,
 437                        project_repository_statuses::Column::StatusKind,
 438                        project_repository_statuses::Column::FirstStatus,
 439                        project_repository_statuses::Column::SecondStatus,
 440                        project_repository_statuses::Column::LinesAdded,
 441                        project_repository_statuses::Column::LinesDeleted,
 442                    ])
 443                    .to_owned(),
 444                )
 445                .exec(&*tx)
 446                .await?;
 447            }
 448
 449            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 450
 451            if has_any_removed_statuses {
 452                project_repository_statuses::Entity::update_many()
 453                    .filter(
 454                        project_repository_statuses::Column::ProjectId
 455                            .eq(project_id)
 456                            .and(
 457                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 458                            )
 459                            .and(
 460                                project_repository_statuses::Column::RepoPath
 461                                    .is_in(update.removed_statuses.iter()),
 462                            ),
 463                    )
 464                    .set(project_repository_statuses::ActiveModel {
 465                        is_deleted: ActiveValue::Set(true),
 466                        scan_id: ActiveValue::Set(update.scan_id as i64),
 467                        ..Default::default()
 468                    })
 469                    .exec(&*tx)
 470                    .await?;
 471            }
 472
 473            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 474            Ok(connection_ids)
 475        })
 476        .await
 477    }
 478
 479    pub async fn remove_repository(
 480        &self,
 481        remove: &proto::RemoveRepository,
 482        _connection: ConnectionId,
 483    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 484        let project_id = ProjectId::from_proto(remove.project_id);
 485        let repository_id = remove.id as i64;
 486        self.project_transaction(project_id, |tx| async move {
 487            project_repository::Entity::update_many()
 488                .filter(
 489                    project_repository::Column::ProjectId
 490                        .eq(project_id)
 491                        .and(project_repository::Column::Id.eq(repository_id)),
 492                )
 493                .set(project_repository::ActiveModel {
 494                    is_deleted: ActiveValue::Set(true),
 495                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 496                    ..Default::default()
 497                })
 498                .exec(&*tx)
 499                .await?;
 500
 501            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 502            Ok(connection_ids)
 503        })
 504        .await
 505    }
 506
 507    /// Updates the diagnostic summary for the given connection.
 508    pub async fn update_diagnostic_summary(
 509        &self,
 510        update: &proto::UpdateDiagnosticSummary,
 511        connection: ConnectionId,
 512    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 513        let project_id = ProjectId::from_proto(update.project_id);
 514        let worktree_id = update.worktree_id as i64;
 515        self.project_transaction(project_id, |tx| async move {
 516            let summary = update.summary.as_ref().context("invalid summary")?;
 517
 518            // Ensure the update comes from the host.
 519            let project = project::Entity::find_by_id(project_id)
 520                .one(&*tx)
 521                .await?
 522                .context("no such project")?;
 523            if project.host_connection()? != connection {
 524                return Err(anyhow!("can't update a project hosted by someone else"))?;
 525            }
 526
 527            // Update summary.
 528            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 529                project_id: ActiveValue::set(project_id),
 530                worktree_id: ActiveValue::set(worktree_id),
 531                path: ActiveValue::set(summary.path.clone()),
 532                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 533                error_count: ActiveValue::set(summary.error_count as i32),
 534                warning_count: ActiveValue::set(summary.warning_count as i32),
 535            })
 536            .on_conflict(
 537                OnConflict::columns([
 538                    worktree_diagnostic_summary::Column::ProjectId,
 539                    worktree_diagnostic_summary::Column::WorktreeId,
 540                    worktree_diagnostic_summary::Column::Path,
 541                ])
 542                .update_columns([
 543                    worktree_diagnostic_summary::Column::LanguageServerId,
 544                    worktree_diagnostic_summary::Column::ErrorCount,
 545                    worktree_diagnostic_summary::Column::WarningCount,
 546                ])
 547                .to_owned(),
 548            )
 549            .exec(&*tx)
 550            .await?;
 551
 552            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 553            Ok(connection_ids)
 554        })
 555        .await
 556    }
 557
 558    /// Starts the language server for the given connection.
 559    pub async fn start_language_server(
 560        &self,
 561        update: &proto::StartLanguageServer,
 562        connection: ConnectionId,
 563    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 564        let project_id = ProjectId::from_proto(update.project_id);
 565        self.project_transaction(project_id, |tx| async move {
 566            let server = update.server.as_ref().context("invalid language server")?;
 567
 568            // Ensure the update comes from the host.
 569            let project = project::Entity::find_by_id(project_id)
 570                .one(&*tx)
 571                .await?
 572                .context("no such project")?;
 573            if project.host_connection()? != connection {
 574                return Err(anyhow!("can't update a project hosted by someone else"))?;
 575            }
 576
 577            // Add the newly-started language server.
 578            language_server::Entity::insert(language_server::ActiveModel {
 579                project_id: ActiveValue::set(project_id),
 580                id: ActiveValue::set(server.id as i64),
 581                name: ActiveValue::set(server.name.clone()),
 582                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 583                capabilities: ActiveValue::set(update.capabilities.clone()),
 584            })
 585            .on_conflict(
 586                OnConflict::columns([
 587                    language_server::Column::ProjectId,
 588                    language_server::Column::Id,
 589                ])
 590                .update_columns([
 591                    language_server::Column::Name,
 592                    language_server::Column::Capabilities,
 593                    language_server::Column::WorktreeId,
 594                ])
 595                .to_owned(),
 596            )
 597            .exec(&*tx)
 598            .await?;
 599
 600            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 601            Ok(connection_ids)
 602        })
 603        .await
 604    }
 605
 606    /// Updates the worktree settings for the given connection.
 607    pub async fn update_worktree_settings(
 608        &self,
 609        update: &proto::UpdateWorktreeSettings,
 610        connection: ConnectionId,
 611    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 612        let project_id = ProjectId::from_proto(update.project_id);
 613        let kind = match update.kind {
 614            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 615                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 616            None => proto::LocalSettingsKind::Settings,
 617        };
 618        let kind = LocalSettingsKind::from_proto(kind);
 619        self.project_transaction(project_id, |tx| async move {
 620            // Ensure the update comes from the host.
 621            let project = project::Entity::find_by_id(project_id)
 622                .one(&*tx)
 623                .await?
 624                .context("no such project")?;
 625            if project.host_connection()? != connection {
 626                return Err(anyhow!("can't update a project hosted by someone else"))?;
 627            }
 628
 629            if let Some(content) = &update.content {
 630                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 631                    project_id: ActiveValue::Set(project_id),
 632                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 633                    path: ActiveValue::Set(update.path.clone()),
 634                    content: ActiveValue::Set(content.clone()),
 635                    kind: ActiveValue::Set(kind),
 636                    outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
 637                })
 638                .on_conflict(
 639                    OnConflict::columns([
 640                        worktree_settings_file::Column::ProjectId,
 641                        worktree_settings_file::Column::WorktreeId,
 642                        worktree_settings_file::Column::Path,
 643                    ])
 644                    .update_columns([
 645                        worktree_settings_file::Column::Content,
 646                        worktree_settings_file::Column::OutsideWorktree,
 647                    ])
 648                    .to_owned(),
 649                )
 650                .exec(&*tx)
 651                .await?;
 652            } else {
 653                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 654                    project_id: ActiveValue::Set(project_id),
 655                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 656                    path: ActiveValue::Set(update.path.clone()),
 657                    ..Default::default()
 658                })
 659                .exec(&*tx)
 660                .await?;
 661            }
 662
 663            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 664            Ok(connection_ids)
 665        })
 666        .await
 667    }
 668
 669    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 670        self.transaction(|tx| async move {
 671            Ok(project::Entity::find_by_id(id)
 672                .one(&*tx)
 673                .await?
 674                .context("no such project")?)
 675        })
 676        .await
 677    }
 678
 679    /// Adds the given connection to the specified project
 680    /// in the current room.
 681    pub async fn join_project(
 682        &self,
 683        project_id: ProjectId,
 684        connection: ConnectionId,
 685        user_id: UserId,
 686        committer_name: Option<String>,
 687        committer_email: Option<String>,
 688    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 689        self.project_transaction(project_id, move |tx| {
 690            let committer_name = committer_name.clone();
 691            let committer_email = committer_email.clone();
 692            async move {
 693                let (project, role) = self
 694                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 695                    .await?;
 696                self.join_project_internal(
 697                    project,
 698                    user_id,
 699                    committer_name,
 700                    committer_email,
 701                    connection,
 702                    role,
 703                    &tx,
 704                )
 705                .await
 706            }
 707        })
 708        .await
 709    }
 710
 711    async fn join_project_internal(
 712        &self,
 713        project: project::Model,
 714        user_id: UserId,
 715        committer_name: Option<String>,
 716        committer_email: Option<String>,
 717        connection: ConnectionId,
 718        role: ChannelRole,
 719        tx: &DatabaseTransaction,
 720    ) -> Result<(Project, ReplicaId)> {
 721        let mut collaborators = project
 722            .find_related(project_collaborator::Entity)
 723            .all(tx)
 724            .await?;
 725        let replica_ids = collaborators
 726            .iter()
 727            .map(|c| c.replica_id)
 728            .collect::<HashSet<_>>();
 729        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 730        while replica_ids.contains(&replica_id) {
 731            replica_id.0 += 1;
 732        }
 733        let new_collaborator = project_collaborator::ActiveModel {
 734            project_id: ActiveValue::set(project.id),
 735            connection_id: ActiveValue::set(connection.id as i32),
 736            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 737            user_id: ActiveValue::set(user_id),
 738            replica_id: ActiveValue::set(replica_id),
 739            is_host: ActiveValue::set(false),
 740            id: ActiveValue::NotSet,
 741            committer_name: ActiveValue::set(committer_name),
 742            committer_email: ActiveValue::set(committer_email),
 743        }
 744        .insert(tx)
 745        .await?;
 746        collaborators.push(new_collaborator);
 747
 748        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 749        let mut worktrees = db_worktrees
 750            .into_iter()
 751            .map(|db_worktree| {
 752                (
 753                    db_worktree.id as u64,
 754                    Worktree {
 755                        id: db_worktree.id as u64,
 756                        abs_path: db_worktree.abs_path,
 757                        root_name: db_worktree.root_name,
 758                        visible: db_worktree.visible,
 759                        entries: Default::default(),
 760                        diagnostic_summaries: Default::default(),
 761                        settings_files: Default::default(),
 762                        scan_id: db_worktree.scan_id as u64,
 763                        completed_scan_id: db_worktree.completed_scan_id as u64,
 764                        legacy_repository_entries: Default::default(),
 765                    },
 766                )
 767            })
 768            .collect::<BTreeMap<_, _>>();
 769
 770        // Populate worktree entries.
 771        {
 772            let mut db_entries = worktree_entry::Entity::find()
 773                .filter(
 774                    Condition::all()
 775                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 776                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 777                )
 778                .stream(tx)
 779                .await?;
 780            while let Some(db_entry) = db_entries.next().await {
 781                let db_entry = db_entry?;
 782                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 783                    worktree.entries.push(proto::Entry {
 784                        id: db_entry.id as u64,
 785                        is_dir: db_entry.is_dir,
 786                        path: db_entry.path,
 787                        inode: db_entry.inode as u64,
 788                        mtime: Some(proto::Timestamp {
 789                            seconds: db_entry.mtime_seconds as u64,
 790                            nanos: db_entry.mtime_nanos as u32,
 791                        }),
 792                        canonical_path: db_entry.canonical_path,
 793                        is_ignored: db_entry.is_ignored,
 794                        is_external: db_entry.is_external,
 795                        is_hidden: db_entry.is_hidden,
 796                        // This is only used in the summarization backlog, so if it's None,
 797                        // that just means we won't be able to detect when to resummarize
 798                        // based on total number of backlogged bytes - instead, we'd go
 799                        // on number of files only. That shouldn't be a huge deal in practice.
 800                        size: None,
 801                        is_fifo: db_entry.is_fifo,
 802                    });
 803                }
 804            }
 805        }
 806
 807        // Populate repository entries.
 808        let mut repositories = Vec::new();
 809        {
 810            let db_repository_entries = project_repository::Entity::find()
 811                .filter(
 812                    Condition::all()
 813                        .add(project_repository::Column::ProjectId.eq(project.id))
 814                        .add(project_repository::Column::IsDeleted.eq(false)),
 815                )
 816                .all(tx)
 817                .await?;
 818            for db_repository_entry in db_repository_entries {
 819                let mut repository_statuses = project_repository_statuses::Entity::find()
 820                    .filter(
 821                        Condition::all()
 822                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 823                            .add(
 824                                project_repository_statuses::Column::RepositoryId
 825                                    .eq(db_repository_entry.id),
 826                            )
 827                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 828                    )
 829                    .stream(tx)
 830                    .await?;
 831                let mut updated_statuses = Vec::new();
 832                while let Some(status_entry) = repository_statuses.next().await {
 833                    let status_entry = status_entry?;
 834                    updated_statuses.push(db_status_to_proto(status_entry)?);
 835                }
 836
 837                let current_merge_conflicts = db_repository_entry
 838                    .current_merge_conflicts
 839                    .as_ref()
 840                    .map(|conflicts| serde_json::from_str(conflicts))
 841                    .transpose()?
 842                    .unwrap_or_default();
 843
 844                let branch_summary = db_repository_entry
 845                    .branch_summary
 846                    .as_ref()
 847                    .map(|branch_summary| serde_json::from_str(branch_summary))
 848                    .transpose()?
 849                    .unwrap_or_default();
 850
 851                let head_commit_details = db_repository_entry
 852                    .head_commit_details
 853                    .as_ref()
 854                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 855                    .transpose()?
 856                    .unwrap_or_default();
 857
 858                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 859                    .context("failed to deserialize repository's entry ids")?;
 860
 861                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 862                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 863                        worktree.legacy_repository_entries.insert(
 864                            db_repository_entry.id as u64,
 865                            proto::RepositoryEntry {
 866                                repository_id: db_repository_entry.id as u64,
 867                                updated_statuses,
 868                                removed_statuses: Vec::new(),
 869                                current_merge_conflicts,
 870                                branch_summary,
 871                            },
 872                        );
 873                    }
 874                } else {
 875                    repositories.push(proto::UpdateRepository {
 876                        project_id: db_repository_entry.project_id.0 as u64,
 877                        id: db_repository_entry.id as u64,
 878                        abs_path: db_repository_entry.abs_path.clone(),
 879                        entry_ids,
 880                        updated_statuses,
 881                        removed_statuses: Vec::new(),
 882                        current_merge_conflicts,
 883                        branch_summary,
 884                        head_commit_details,
 885                        scan_id: db_repository_entry.scan_id as u64,
 886                        is_last_update: true,
 887                        merge_message: db_repository_entry.merge_message,
 888                        stash_entries: Vec::new(),
 889                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
 890                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
 891                        original_repo_abs_path: Some(db_repository_entry.abs_path),
 892                        linked_worktrees: db_repository_entry
 893                            .linked_worktrees
 894                            .as_deref()
 895                            .and_then(|s| serde_json::from_str(s).ok())
 896                            .unwrap_or_default(),
 897                    });
 898                }
 899            }
 900        }
 901
 902        // Populate worktree diagnostic summaries.
 903        {
 904            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 905                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 906                .stream(tx)
 907                .await?;
 908            while let Some(db_summary) = db_summaries.next().await {
 909                let db_summary = db_summary?;
 910                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 911                    worktree
 912                        .diagnostic_summaries
 913                        .push(proto::DiagnosticSummary {
 914                            path: db_summary.path,
 915                            language_server_id: db_summary.language_server_id as u64,
 916                            error_count: db_summary.error_count as u32,
 917                            warning_count: db_summary.warning_count as u32,
 918                        });
 919                }
 920            }
 921        }
 922
 923        // Populate worktree settings files
 924        {
 925            let mut db_settings_files = worktree_settings_file::Entity::find()
 926                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 927                .stream(tx)
 928                .await?;
 929            while let Some(db_settings_file) = db_settings_files.next().await {
 930                let db_settings_file = db_settings_file?;
 931                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 932                    worktree.settings_files.push(WorktreeSettingsFile {
 933                        path: db_settings_file.path,
 934                        content: db_settings_file.content,
 935                        kind: db_settings_file.kind,
 936                        outside_worktree: db_settings_file.outside_worktree,
 937                    });
 938                }
 939            }
 940        }
 941
 942        // Populate language servers.
 943        let language_servers = project
 944            .find_related(language_server::Entity)
 945            .all(tx)
 946            .await?;
 947
 948        let path_style = if project.windows_paths {
 949            PathStyle::Windows
 950        } else {
 951            PathStyle::Posix
 952        };
 953        let features: Vec<String> = serde_json::from_str(&project.features).unwrap_or_default();
 954
 955        let project = Project {
 956            id: project.id,
 957            role,
 958            collaborators: collaborators
 959                .into_iter()
 960                .map(|collaborator| ProjectCollaborator {
 961                    connection_id: collaborator.connection(),
 962                    user_id: collaborator.user_id,
 963                    replica_id: collaborator.replica_id,
 964                    is_host: collaborator.is_host,
 965                    committer_name: collaborator.committer_name,
 966                    committer_email: collaborator.committer_email,
 967                })
 968                .collect(),
 969            worktrees,
 970            repositories,
 971            language_servers: language_servers
 972                .into_iter()
 973                .map(|language_server| LanguageServer {
 974                    server: proto::LanguageServer {
 975                        id: language_server.id as u64,
 976                        name: language_server.name,
 977                        worktree_id: language_server.worktree_id.map(|id| id as u64),
 978                    },
 979                    capabilities: language_server.capabilities,
 980                })
 981                .collect(),
 982            path_style,
 983            features,
 984        };
 985        Ok((project, replica_id as ReplicaId))
 986    }
 987
 988    /// Removes the given connection from the specified project.
 989    pub async fn leave_project(
 990        &self,
 991        project_id: ProjectId,
 992        connection: ConnectionId,
 993    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 994        self.project_transaction(project_id, |tx| async move {
 995            let result = project_collaborator::Entity::delete_many()
 996                .filter(
 997                    Condition::all()
 998                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 999                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1000                        .add(
1001                            project_collaborator::Column::ConnectionServerId
1002                                .eq(connection.owner_id as i32),
1003                        ),
1004                )
1005                .exec(&*tx)
1006                .await?;
1007            if result.rows_affected == 0 {
1008                Err(anyhow!("not a collaborator on this project"))?;
1009            }
1010
1011            let project = project::Entity::find_by_id(project_id)
1012                .one(&*tx)
1013                .await?
1014                .context("no such project")?;
1015            let collaborators = project
1016                .find_related(project_collaborator::Entity)
1017                .all(&*tx)
1018                .await?;
1019            let connection_ids: Vec<ConnectionId> = collaborators
1020                .into_iter()
1021                .map(|collaborator| collaborator.connection())
1022                .collect();
1023
1024            follower::Entity::delete_many()
1025                .filter(
1026                    Condition::any()
1027                        .add(
1028                            Condition::all()
1029                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1030                                .add(
1031                                    follower::Column::LeaderConnectionServerId
1032                                        .eq(connection.owner_id),
1033                                )
1034                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1035                        )
1036                        .add(
1037                            Condition::all()
1038                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1039                                .add(
1040                                    follower::Column::FollowerConnectionServerId
1041                                        .eq(connection.owner_id),
1042                                )
1043                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1044                        ),
1045                )
1046                .exec(&*tx)
1047                .await?;
1048
1049            let room = if let Some(room_id) = project.room_id {
1050                Some(self.get_room(room_id, &tx).await?)
1051            } else {
1052                None
1053            };
1054
1055            let left_project = LeftProject {
1056                id: project_id,
1057                should_unshare: connection == project.host_connection()?,
1058                connection_ids,
1059            };
1060            Ok((room, left_project))
1061        })
1062        .await
1063    }
1064
1065    pub async fn check_user_is_project_host(
1066        &self,
1067        project_id: ProjectId,
1068        connection_id: ConnectionId,
1069    ) -> Result<()> {
1070        self.project_transaction(project_id, |tx| async move {
1071            project::Entity::find()
1072                .filter(
1073                    Condition::all()
1074                        .add(project::Column::Id.eq(project_id))
1075                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1076                        .add(
1077                            project::Column::HostConnectionServerId
1078                                .eq(Some(connection_id.owner_id as i32)),
1079                        ),
1080                )
1081                .one(&*tx)
1082                .await?
1083                .context("failed to read project host")?;
1084
1085            Ok(())
1086        })
1087        .await
1088        .map(|guard| guard.into_inner())
1089    }
1090
1091    /// Returns the current project if the given user is authorized to access it with the specified capability.
1092    pub async fn access_project(
1093        &self,
1094        project_id: ProjectId,
1095        connection_id: ConnectionId,
1096        capability: Capability,
1097        tx: &DatabaseTransaction,
1098    ) -> Result<(project::Model, ChannelRole)> {
1099        let project = project::Entity::find_by_id(project_id)
1100            .one(tx)
1101            .await?
1102            .context("no such project")?;
1103
1104        let role_from_room = if let Some(room_id) = project.room_id {
1105            room_participant::Entity::find()
1106                .filter(room_participant::Column::RoomId.eq(room_id))
1107                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1108                .one(tx)
1109                .await?
1110                .and_then(|participant| participant.role)
1111        } else {
1112            None
1113        };
1114
1115        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1116
1117        match capability {
1118            Capability::ReadWrite => {
1119                if !role.can_edit_projects() {
1120                    return Err(anyhow!("not authorized to edit projects"))?;
1121                }
1122            }
1123            Capability::ReadOnly => {
1124                if !role.can_read_projects() {
1125                    return Err(anyhow!("not authorized to read projects"))?;
1126                }
1127            }
1128        }
1129
1130        Ok((project, role))
1131    }
1132
1133    /// Returns the host connection for a read-only request to join a shared project.
1134    pub async fn host_for_read_only_project_request(
1135        &self,
1136        project_id: ProjectId,
1137        connection_id: ConnectionId,
1138    ) -> Result<ConnectionId> {
1139        self.project_transaction(project_id, |tx| async move {
1140            let (project, _) = self
1141                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1142                .await?;
1143            project.host_connection()
1144        })
1145        .await
1146        .map(|guard| guard.into_inner())
1147    }
1148
1149    /// Returns the host connection for a request to join a shared project.
1150    pub async fn host_for_mutating_project_request(
1151        &self,
1152        project_id: ProjectId,
1153        connection_id: ConnectionId,
1154    ) -> Result<ConnectionId> {
1155        self.project_transaction(project_id, |tx| async move {
1156            let (project, _) = self
1157                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1158                .await?;
1159            project.host_connection()
1160        })
1161        .await
1162        .map(|guard| guard.into_inner())
1163    }
1164
1165    pub async fn connections_for_buffer_update(
1166        &self,
1167        project_id: ProjectId,
1168        connection_id: ConnectionId,
1169        capability: Capability,
1170    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1171        self.project_transaction(project_id, |tx| async move {
1172            // Authorize
1173            let (project, _) = self
1174                .access_project(project_id, connection_id, capability, &tx)
1175                .await?;
1176
1177            let host_connection_id = project.host_connection()?;
1178
1179            let collaborators = project_collaborator::Entity::find()
1180                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1181                .all(&*tx)
1182                .await?;
1183
1184            let guest_connection_ids = collaborators
1185                .into_iter()
1186                .filter_map(|collaborator| {
1187                    if collaborator.is_host {
1188                        None
1189                    } else {
1190                        Some(collaborator.connection())
1191                    }
1192                })
1193                .collect();
1194
1195            Ok((host_connection_id, guest_connection_ids))
1196        })
1197        .await
1198    }
1199
1200    /// Returns the connection IDs in the given project.
1201    ///
1202    /// The provided `connection_id` must also be a collaborator in the project,
1203    /// otherwise an error will be returned.
1204    pub async fn project_connection_ids(
1205        &self,
1206        project_id: ProjectId,
1207        connection_id: ConnectionId,
1208        exclude_dev_server: bool,
1209    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1210        self.project_transaction(project_id, |tx| async move {
1211            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1212                .await
1213        })
1214        .await
1215    }
1216
1217    async fn internal_project_connection_ids(
1218        &self,
1219        project_id: ProjectId,
1220        connection_id: ConnectionId,
1221        exclude_dev_server: bool,
1222        tx: &DatabaseTransaction,
1223    ) -> Result<HashSet<ConnectionId>> {
1224        let project = project::Entity::find_by_id(project_id)
1225            .one(tx)
1226            .await?
1227            .context("no such project")?;
1228
1229        let mut collaborators = project_collaborator::Entity::find()
1230            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1231            .stream(tx)
1232            .await?;
1233
1234        let mut connection_ids = HashSet::default();
1235        if let Some(host_connection) = project.host_connection().log_err()
1236            && !exclude_dev_server
1237        {
1238            connection_ids.insert(host_connection);
1239        }
1240
1241        while let Some(collaborator) = collaborators.next().await {
1242            let collaborator = collaborator?;
1243            connection_ids.insert(collaborator.connection());
1244        }
1245
1246        if connection_ids.contains(&connection_id)
1247            || Some(connection_id) == project.host_connection().ok()
1248        {
1249            Ok(connection_ids)
1250        } else {
1251            Err(anyhow!(
1252                "can only send project updates to a project you're in"
1253            ))?
1254        }
1255    }
1256
1257    async fn project_guest_connection_ids(
1258        &self,
1259        project_id: ProjectId,
1260        tx: &DatabaseTransaction,
1261    ) -> Result<Vec<ConnectionId>> {
1262        let mut collaborators = project_collaborator::Entity::find()
1263            .filter(
1264                project_collaborator::Column::ProjectId
1265                    .eq(project_id)
1266                    .and(project_collaborator::Column::IsHost.eq(false)),
1267            )
1268            .stream(tx)
1269            .await?;
1270
1271        let mut guest_connection_ids = Vec::new();
1272        while let Some(collaborator) = collaborators.next().await {
1273            let collaborator = collaborator?;
1274            guest_connection_ids.push(collaborator.connection());
1275        }
1276        Ok(guest_connection_ids)
1277    }
1278
1279    /// Returns the [`RoomId`] for the given project.
1280    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1281        self.transaction(|tx| async move {
1282            Ok(project::Entity::find_by_id(project_id)
1283                .one(&*tx)
1284                .await?
1285                .and_then(|project| project.room_id))
1286        })
1287        .await
1288    }
1289
1290    pub async fn check_room_participants(
1291        &self,
1292        room_id: RoomId,
1293        leader_id: ConnectionId,
1294        follower_id: ConnectionId,
1295    ) -> Result<()> {
1296        self.transaction(|tx| async move {
1297            use room_participant::Column;
1298
1299            let count = room_participant::Entity::find()
1300                .filter(
1301                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1302                        Condition::any()
1303                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1304                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1305                            ))
1306                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1307                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1308                            )),
1309                    ),
1310                )
1311                .count(&*tx)
1312                .await?;
1313
1314            if count < 2 {
1315                Err(anyhow!("not room participants"))?;
1316            }
1317
1318            Ok(())
1319        })
1320        .await
1321    }
1322
1323    /// Adds the given follower connection as a follower of the given leader connection.
1324    pub async fn follow(
1325        &self,
1326        room_id: RoomId,
1327        project_id: ProjectId,
1328        leader_connection: ConnectionId,
1329        follower_connection: ConnectionId,
1330    ) -> Result<TransactionGuard<proto::Room>> {
1331        self.room_transaction(room_id, |tx| async move {
1332            follower::ActiveModel {
1333                room_id: ActiveValue::set(room_id),
1334                project_id: ActiveValue::set(project_id),
1335                leader_connection_server_id: ActiveValue::set(ServerId(
1336                    leader_connection.owner_id as i32,
1337                )),
1338                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1339                follower_connection_server_id: ActiveValue::set(ServerId(
1340                    follower_connection.owner_id as i32,
1341                )),
1342                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1343                ..Default::default()
1344            }
1345            .insert(&*tx)
1346            .await?;
1347
1348            let room = self.get_room(room_id, &tx).await?;
1349            Ok(room)
1350        })
1351        .await
1352    }
1353
1354    /// Removes the given follower connection as a follower of the given leader connection.
1355    pub async fn unfollow(
1356        &self,
1357        room_id: RoomId,
1358        project_id: ProjectId,
1359        leader_connection: ConnectionId,
1360        follower_connection: ConnectionId,
1361    ) -> Result<TransactionGuard<proto::Room>> {
1362        self.room_transaction(room_id, |tx| async move {
1363            follower::Entity::delete_many()
1364                .filter(
1365                    Condition::all()
1366                        .add(follower::Column::RoomId.eq(room_id))
1367                        .add(follower::Column::ProjectId.eq(project_id))
1368                        .add(
1369                            follower::Column::LeaderConnectionServerId
1370                                .eq(leader_connection.owner_id),
1371                        )
1372                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1373                        .add(
1374                            follower::Column::FollowerConnectionServerId
1375                                .eq(follower_connection.owner_id),
1376                        )
1377                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1378                )
1379                .exec(&*tx)
1380                .await?;
1381
1382            let room = self.get_room(room_id, &tx).await?;
1383            Ok(room)
1384        })
1385        .await
1386    }
1387}