projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37        features: &[String],
  38    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  39        self.room_transaction(room_id, |tx| async move {
  40            let participant = room_participant::Entity::find()
  41                .filter(
  42                    Condition::all()
  43                        .add(
  44                            room_participant::Column::AnsweringConnectionId
  45                                .eq(connection.id as i32),
  46                        )
  47                        .add(
  48                            room_participant::Column::AnsweringConnectionServerId
  49                                .eq(connection.owner_id as i32),
  50                        ),
  51                )
  52                .one(&*tx)
  53                .await?
  54                .context("could not find participant")?;
  55            if participant.room_id != room_id {
  56                return Err(anyhow!("shared project on unexpected room"))?;
  57            }
  58            if !participant
  59                .role
  60                .unwrap_or(ChannelRole::Member)
  61                .can_edit_projects()
  62            {
  63                return Err(anyhow!("guests cannot share projects"))?;
  64            }
  65
  66            let project = project::ActiveModel {
  67                room_id: ActiveValue::set(Some(participant.room_id)),
  68                host_user_id: ActiveValue::set(Some(participant.user_id)),
  69                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  70                host_connection_server_id: ActiveValue::set(Some(ServerId(
  71                    connection.owner_id as i32,
  72                ))),
  73                id: ActiveValue::NotSet,
  74                windows_paths: ActiveValue::set(windows_paths),
  75                features: ActiveValue::set(serde_json::to_string(features).unwrap()),
  76            }
  77            .insert(&*tx)
  78            .await?;
  79
  80            if !worktrees.is_empty() {
  81                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  82                    worktree::ActiveModel {
  83                        id: ActiveValue::set(worktree.id as i64),
  84                        project_id: ActiveValue::set(project.id),
  85                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  86                        root_name: ActiveValue::set(worktree.root_name.clone()),
  87                        visible: ActiveValue::set(worktree.visible),
  88                        scan_id: ActiveValue::set(0),
  89                        completed_scan_id: ActiveValue::set(0),
  90                        root_repo_common_dir: ActiveValue::set(None),
  91                    }
  92                }))
  93                .exec(&*tx)
  94                .await?;
  95            }
  96
  97            let replica_id = if is_ssh_project {
  98                clock::ReplicaId::REMOTE_SERVER
  99            } else {
 100                clock::ReplicaId::LOCAL
 101            };
 102
 103            project_collaborator::ActiveModel {
 104                project_id: ActiveValue::set(project.id),
 105                connection_id: ActiveValue::set(connection.id as i32),
 106                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 107                user_id: ActiveValue::set(participant.user_id),
 108                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 109                is_host: ActiveValue::set(true),
 110                id: ActiveValue::NotSet,
 111                committer_name: ActiveValue::Set(None),
 112                committer_email: ActiveValue::Set(None),
 113            }
 114            .insert(&*tx)
 115            .await?;
 116
 117            let room = self.get_room(room_id, &tx).await?;
 118            Ok((project.id, room))
 119        })
 120        .await
 121    }
 122
 123    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 124        self.transaction(|tx| async move {
 125            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 126            Ok(())
 127        })
 128        .await
 129    }
 130
 131    /// Unshares the given project.
 132    pub async fn unshare_project(
 133        &self,
 134        project_id: ProjectId,
 135        connection: ConnectionId,
 136    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 137        self.project_transaction(project_id, |tx| async move {
 138            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 139            let project = project::Entity::find_by_id(project_id)
 140                .one(&*tx)
 141                .await?
 142                .context("project not found")?;
 143            let room = if let Some(room_id) = project.room_id {
 144                Some(self.get_room(room_id, &tx).await?)
 145            } else {
 146                None
 147            };
 148            if project.host_connection()? == connection {
 149                return Ok((true, room, guest_connection_ids));
 150            }
 151            Err(anyhow!("cannot unshare a project hosted by another user"))?
 152        })
 153        .await
 154    }
 155
 156    /// Updates the worktrees associated with the given project.
 157    pub async fn update_project(
 158        &self,
 159        project_id: ProjectId,
 160        connection: ConnectionId,
 161        worktrees: &[proto::WorktreeMetadata],
 162    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 163        self.project_transaction(project_id, |tx| async move {
 164            let project = project::Entity::find_by_id(project_id)
 165                .filter(
 166                    Condition::all()
 167                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 168                        .add(
 169                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 170                        ),
 171                )
 172                .one(&*tx)
 173                .await?
 174                .context("no such project")?;
 175
 176            self.update_project_worktrees(project.id, worktrees, &tx)
 177                .await?;
 178
 179            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 180
 181            let room = if let Some(room_id) = project.room_id {
 182                Some(self.get_room(room_id, &tx).await?)
 183            } else {
 184                None
 185            };
 186
 187            Ok((room, guest_connection_ids))
 188        })
 189        .await
 190    }
 191
 192    pub(in crate::db) async fn update_project_worktrees(
 193        &self,
 194        project_id: ProjectId,
 195        worktrees: &[proto::WorktreeMetadata],
 196        tx: &DatabaseTransaction,
 197    ) -> Result<()> {
 198        if !worktrees.is_empty() {
 199            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 200                id: ActiveValue::set(worktree.id as i64),
 201                project_id: ActiveValue::set(project_id),
 202                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 203                root_name: ActiveValue::set(worktree.root_name.clone()),
 204                visible: ActiveValue::set(worktree.visible),
 205                scan_id: ActiveValue::set(0),
 206                completed_scan_id: ActiveValue::set(0),
 207                root_repo_common_dir: ActiveValue::set(None),
 208            }))
 209            .on_conflict(
 210                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 211                    .update_column(worktree::Column::RootName)
 212                    .to_owned(),
 213            )
 214            .exec(tx)
 215            .await?;
 216        }
 217
 218        worktree::Entity::delete_many()
 219            .filter(worktree::Column::ProjectId.eq(project_id).and(
 220                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 221            ))
 222            .exec(tx)
 223            .await?;
 224
 225        Ok(())
 226    }
 227
 228    pub async fn update_worktree(
 229        &self,
 230        update: &proto::UpdateWorktree,
 231        connection: ConnectionId,
 232    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 233        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 234            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 235        {
 236            return Err(anyhow!(
 237                "invalid worktree update. removed entries: {}, updated entries: {}",
 238                update.removed_entries.len(),
 239                update.updated_entries.len()
 240            ))?;
 241        }
 242
 243        let project_id = ProjectId::from_proto(update.project_id);
 244        let worktree_id = update.worktree_id as i64;
 245        self.project_transaction(project_id, |tx| async move {
 246            // Ensure the update comes from the host.
 247            let _project = project::Entity::find_by_id(project_id)
 248                .filter(
 249                    Condition::all()
 250                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 251                        .add(
 252                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 253                        ),
 254                )
 255                .one(&*tx)
 256                .await?
 257                .with_context(|| format!("no such project: {project_id}"))?;
 258
 259            // Update metadata.
 260            worktree::Entity::update(worktree::ActiveModel {
 261                id: ActiveValue::set(worktree_id),
 262                project_id: ActiveValue::set(project_id),
 263                root_name: ActiveValue::set(update.root_name.clone()),
 264                scan_id: ActiveValue::set(update.scan_id as i64),
 265                completed_scan_id: if update.is_last_update {
 266                    ActiveValue::set(update.scan_id as i64)
 267                } else {
 268                    ActiveValue::default()
 269                },
 270                abs_path: ActiveValue::set(update.abs_path.clone()),
 271                root_repo_common_dir: ActiveValue::set(update.root_repo_common_dir.clone()),
 272                ..Default::default()
 273            })
 274            .exec(&*tx)
 275            .await?;
 276
 277            if !update.updated_entries.is_empty() {
 278                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 279                    let mtime = entry.mtime.clone().unwrap_or_default();
 280                    worktree_entry::ActiveModel {
 281                        project_id: ActiveValue::set(project_id),
 282                        worktree_id: ActiveValue::set(worktree_id),
 283                        id: ActiveValue::set(entry.id as i64),
 284                        is_dir: ActiveValue::set(entry.is_dir),
 285                        path: ActiveValue::set(entry.path.clone()),
 286                        inode: ActiveValue::set(entry.inode as i64),
 287                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 288                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 289                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 290                        is_ignored: ActiveValue::set(entry.is_ignored),
 291                        git_status: ActiveValue::set(None),
 292                        is_external: ActiveValue::set(entry.is_external),
 293                        is_deleted: ActiveValue::set(false),
 294                        is_hidden: ActiveValue::set(entry.is_hidden),
 295                        scan_id: ActiveValue::set(update.scan_id as i64),
 296                        is_fifo: ActiveValue::set(entry.is_fifo),
 297                    }
 298                }))
 299                .on_conflict(
 300                    OnConflict::columns([
 301                        worktree_entry::Column::ProjectId,
 302                        worktree_entry::Column::WorktreeId,
 303                        worktree_entry::Column::Id,
 304                    ])
 305                    .update_columns([
 306                        worktree_entry::Column::IsDir,
 307                        worktree_entry::Column::Path,
 308                        worktree_entry::Column::Inode,
 309                        worktree_entry::Column::MtimeSeconds,
 310                        worktree_entry::Column::MtimeNanos,
 311                        worktree_entry::Column::CanonicalPath,
 312                        worktree_entry::Column::IsIgnored,
 313                        worktree_entry::Column::IsHidden,
 314                        worktree_entry::Column::ScanId,
 315                    ])
 316                    .to_owned(),
 317                )
 318                .exec(&*tx)
 319                .await?;
 320            }
 321
 322            if !update.removed_entries.is_empty() {
 323                worktree_entry::Entity::update_many()
 324                    .filter(
 325                        worktree_entry::Column::ProjectId
 326                            .eq(project_id)
 327                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 328                            .and(
 329                                worktree_entry::Column::Id
 330                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 331                            ),
 332                    )
 333                    .set(worktree_entry::ActiveModel {
 334                        is_deleted: ActiveValue::Set(true),
 335                        scan_id: ActiveValue::Set(update.scan_id as i64),
 336                        ..Default::default()
 337                    })
 338                    .exec(&*tx)
 339                    .await?;
 340            }
 341
 342            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 343            Ok(connection_ids)
 344        })
 345        .await
 346    }
 347
 348    pub async fn update_repository(
 349        &self,
 350        update: &proto::UpdateRepository,
 351        _connection: ConnectionId,
 352    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 353        let project_id = ProjectId::from_proto(update.project_id);
 354        let repository_id = update.id as i64;
 355        self.project_transaction(project_id, |tx| async move {
 356            project_repository::Entity::insert(project_repository::ActiveModel {
 357                project_id: ActiveValue::set(project_id),
 358                id: ActiveValue::set(repository_id),
 359                legacy_worktree_id: ActiveValue::set(None),
 360                abs_path: ActiveValue::set(update.abs_path.clone()),
 361                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 362                scan_id: ActiveValue::set(update.scan_id as i64),
 363                is_deleted: ActiveValue::set(false),
 364                branch_summary: ActiveValue::Set(
 365                    update
 366                        .branch_summary
 367                        .as_ref()
 368                        .map(|summary| serde_json::to_string(summary).unwrap()),
 369                ),
 370                head_commit_details: ActiveValue::Set(
 371                    update
 372                        .head_commit_details
 373                        .as_ref()
 374                        .map(|details| serde_json::to_string(details).unwrap()),
 375                ),
 376                current_merge_conflicts: ActiveValue::Set(Some(
 377                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 378                )),
 379                merge_message: ActiveValue::set(update.merge_message.clone()),
 380                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 381                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 382                linked_worktrees: ActiveValue::Set(Some(
 383                    serde_json::to_string(&update.linked_worktrees).unwrap(),
 384                )),
 385            })
 386            .on_conflict(
 387                OnConflict::columns([
 388                    project_repository::Column::ProjectId,
 389                    project_repository::Column::Id,
 390                ])
 391                .update_columns([
 392                    project_repository::Column::ScanId,
 393                    project_repository::Column::BranchSummary,
 394                    project_repository::Column::EntryIds,
 395                    project_repository::Column::AbsPath,
 396                    project_repository::Column::CurrentMergeConflicts,
 397                    project_repository::Column::HeadCommitDetails,
 398                    project_repository::Column::MergeMessage,
 399                    project_repository::Column::LinkedWorktrees,
 400                ])
 401                .to_owned(),
 402            )
 403            .exec(&*tx)
 404            .await?;
 405
 406            let has_any_statuses = !update.updated_statuses.is_empty();
 407
 408            if has_any_statuses {
 409                project_repository_statuses::Entity::insert_many(
 410                    update.updated_statuses.iter().map(|status_entry| {
 411                        let (repo_path, status_kind, first_status, second_status) =
 412                            proto_status_to_db(status_entry.clone());
 413                        project_repository_statuses::ActiveModel {
 414                            project_id: ActiveValue::set(project_id),
 415                            repository_id: ActiveValue::set(repository_id),
 416                            scan_id: ActiveValue::set(update.scan_id as i64),
 417                            is_deleted: ActiveValue::set(false),
 418                            repo_path: ActiveValue::set(repo_path),
 419                            status: ActiveValue::set(0),
 420                            status_kind: ActiveValue::set(status_kind),
 421                            first_status: ActiveValue::set(first_status),
 422                            second_status: ActiveValue::set(second_status),
 423                            lines_added: ActiveValue::set(
 424                                status_entry.diff_stat_added.map(|v| v as i32),
 425                            ),
 426                            lines_deleted: ActiveValue::set(
 427                                status_entry.diff_stat_deleted.map(|v| v as i32),
 428                            ),
 429                        }
 430                    }),
 431                )
 432                .on_conflict(
 433                    OnConflict::columns([
 434                        project_repository_statuses::Column::ProjectId,
 435                        project_repository_statuses::Column::RepositoryId,
 436                        project_repository_statuses::Column::RepoPath,
 437                    ])
 438                    .update_columns([
 439                        project_repository_statuses::Column::ScanId,
 440                        project_repository_statuses::Column::StatusKind,
 441                        project_repository_statuses::Column::FirstStatus,
 442                        project_repository_statuses::Column::SecondStatus,
 443                        project_repository_statuses::Column::LinesAdded,
 444                        project_repository_statuses::Column::LinesDeleted,
 445                    ])
 446                    .to_owned(),
 447                )
 448                .exec(&*tx)
 449                .await?;
 450            }
 451
 452            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 453
 454            if has_any_removed_statuses {
 455                project_repository_statuses::Entity::update_many()
 456                    .filter(
 457                        project_repository_statuses::Column::ProjectId
 458                            .eq(project_id)
 459                            .and(
 460                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 461                            )
 462                            .and(
 463                                project_repository_statuses::Column::RepoPath
 464                                    .is_in(update.removed_statuses.iter()),
 465                            ),
 466                    )
 467                    .set(project_repository_statuses::ActiveModel {
 468                        is_deleted: ActiveValue::Set(true),
 469                        scan_id: ActiveValue::Set(update.scan_id as i64),
 470                        ..Default::default()
 471                    })
 472                    .exec(&*tx)
 473                    .await?;
 474            }
 475
 476            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 477            Ok(connection_ids)
 478        })
 479        .await
 480    }
 481
 482    pub async fn remove_repository(
 483        &self,
 484        remove: &proto::RemoveRepository,
 485        _connection: ConnectionId,
 486    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 487        let project_id = ProjectId::from_proto(remove.project_id);
 488        let repository_id = remove.id as i64;
 489        self.project_transaction(project_id, |tx| async move {
 490            project_repository::Entity::update_many()
 491                .filter(
 492                    project_repository::Column::ProjectId
 493                        .eq(project_id)
 494                        .and(project_repository::Column::Id.eq(repository_id)),
 495                )
 496                .set(project_repository::ActiveModel {
 497                    is_deleted: ActiveValue::Set(true),
 498                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 499                    ..Default::default()
 500                })
 501                .exec(&*tx)
 502                .await?;
 503
 504            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 505            Ok(connection_ids)
 506        })
 507        .await
 508    }
 509
 510    /// Updates the diagnostic summary for the given connection.
 511    pub async fn update_diagnostic_summary(
 512        &self,
 513        update: &proto::UpdateDiagnosticSummary,
 514        connection: ConnectionId,
 515    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 516        let project_id = ProjectId::from_proto(update.project_id);
 517        let worktree_id = update.worktree_id as i64;
 518        self.project_transaction(project_id, |tx| async move {
 519            let summary = update.summary.as_ref().context("invalid summary")?;
 520
 521            // Ensure the update comes from the host.
 522            let project = project::Entity::find_by_id(project_id)
 523                .one(&*tx)
 524                .await?
 525                .context("no such project")?;
 526            if project.host_connection()? != connection {
 527                return Err(anyhow!("can't update a project hosted by someone else"))?;
 528            }
 529
 530            // Update summary.
 531            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 532                project_id: ActiveValue::set(project_id),
 533                worktree_id: ActiveValue::set(worktree_id),
 534                path: ActiveValue::set(summary.path.clone()),
 535                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 536                error_count: ActiveValue::set(summary.error_count as i32),
 537                warning_count: ActiveValue::set(summary.warning_count as i32),
 538            })
 539            .on_conflict(
 540                OnConflict::columns([
 541                    worktree_diagnostic_summary::Column::ProjectId,
 542                    worktree_diagnostic_summary::Column::WorktreeId,
 543                    worktree_diagnostic_summary::Column::Path,
 544                ])
 545                .update_columns([
 546                    worktree_diagnostic_summary::Column::LanguageServerId,
 547                    worktree_diagnostic_summary::Column::ErrorCount,
 548                    worktree_diagnostic_summary::Column::WarningCount,
 549                ])
 550                .to_owned(),
 551            )
 552            .exec(&*tx)
 553            .await?;
 554
 555            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 556            Ok(connection_ids)
 557        })
 558        .await
 559    }
 560
 561    /// Starts the language server for the given connection.
 562    pub async fn start_language_server(
 563        &self,
 564        update: &proto::StartLanguageServer,
 565        connection: ConnectionId,
 566    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 567        let project_id = ProjectId::from_proto(update.project_id);
 568        self.project_transaction(project_id, |tx| async move {
 569            let server = update.server.as_ref().context("invalid language server")?;
 570
 571            // Ensure the update comes from the host.
 572            let project = project::Entity::find_by_id(project_id)
 573                .one(&*tx)
 574                .await?
 575                .context("no such project")?;
 576            if project.host_connection()? != connection {
 577                return Err(anyhow!("can't update a project hosted by someone else"))?;
 578            }
 579
 580            // Add the newly-started language server.
 581            language_server::Entity::insert(language_server::ActiveModel {
 582                project_id: ActiveValue::set(project_id),
 583                id: ActiveValue::set(server.id as i64),
 584                name: ActiveValue::set(server.name.clone()),
 585                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 586                capabilities: ActiveValue::set(update.capabilities.clone()),
 587            })
 588            .on_conflict(
 589                OnConflict::columns([
 590                    language_server::Column::ProjectId,
 591                    language_server::Column::Id,
 592                ])
 593                .update_columns([
 594                    language_server::Column::Name,
 595                    language_server::Column::Capabilities,
 596                    language_server::Column::WorktreeId,
 597                ])
 598                .to_owned(),
 599            )
 600            .exec(&*tx)
 601            .await?;
 602
 603            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 604            Ok(connection_ids)
 605        })
 606        .await
 607    }
 608
 609    /// Updates the worktree settings for the given connection.
 610    pub async fn update_worktree_settings(
 611        &self,
 612        update: &proto::UpdateWorktreeSettings,
 613        connection: ConnectionId,
 614    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 615        let project_id = ProjectId::from_proto(update.project_id);
 616        let kind = match update.kind {
 617            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 618                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 619            None => proto::LocalSettingsKind::Settings,
 620        };
 621        let kind = LocalSettingsKind::from_proto(kind);
 622        self.project_transaction(project_id, |tx| async move {
 623            // Ensure the update comes from the host.
 624            let project = project::Entity::find_by_id(project_id)
 625                .one(&*tx)
 626                .await?
 627                .context("no such project")?;
 628            if project.host_connection()? != connection {
 629                return Err(anyhow!("can't update a project hosted by someone else"))?;
 630            }
 631
 632            if let Some(content) = &update.content {
 633                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 634                    project_id: ActiveValue::Set(project_id),
 635                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 636                    path: ActiveValue::Set(update.path.clone()),
 637                    content: ActiveValue::Set(content.clone()),
 638                    kind: ActiveValue::Set(kind),
 639                    outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
 640                })
 641                .on_conflict(
 642                    OnConflict::columns([
 643                        worktree_settings_file::Column::ProjectId,
 644                        worktree_settings_file::Column::WorktreeId,
 645                        worktree_settings_file::Column::Path,
 646                    ])
 647                    .update_columns([
 648                        worktree_settings_file::Column::Content,
 649                        worktree_settings_file::Column::OutsideWorktree,
 650                    ])
 651                    .to_owned(),
 652                )
 653                .exec(&*tx)
 654                .await?;
 655            } else {
 656                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 657                    project_id: ActiveValue::Set(project_id),
 658                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 659                    path: ActiveValue::Set(update.path.clone()),
 660                    ..Default::default()
 661                })
 662                .exec(&*tx)
 663                .await?;
 664            }
 665
 666            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 667            Ok(connection_ids)
 668        })
 669        .await
 670    }
 671
 672    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 673        self.transaction(|tx| async move {
 674            Ok(project::Entity::find_by_id(id)
 675                .one(&*tx)
 676                .await?
 677                .context("no such project")?)
 678        })
 679        .await
 680    }
 681
 682    /// Adds the given connection to the specified project
 683    /// in the current room.
 684    pub async fn join_project(
 685        &self,
 686        project_id: ProjectId,
 687        connection: ConnectionId,
 688        user_id: UserId,
 689        committer_name: Option<String>,
 690        committer_email: Option<String>,
 691    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 692        self.project_transaction(project_id, move |tx| {
 693            let committer_name = committer_name.clone();
 694            let committer_email = committer_email.clone();
 695            async move {
 696                let (project, role) = self
 697                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 698                    .await?;
 699                self.join_project_internal(
 700                    project,
 701                    user_id,
 702                    committer_name,
 703                    committer_email,
 704                    connection,
 705                    role,
 706                    &tx,
 707                )
 708                .await
 709            }
 710        })
 711        .await
 712    }
 713
 714    async fn join_project_internal(
 715        &self,
 716        project: project::Model,
 717        user_id: UserId,
 718        committer_name: Option<String>,
 719        committer_email: Option<String>,
 720        connection: ConnectionId,
 721        role: ChannelRole,
 722        tx: &DatabaseTransaction,
 723    ) -> Result<(Project, ReplicaId)> {
 724        let mut collaborators = project
 725            .find_related(project_collaborator::Entity)
 726            .all(tx)
 727            .await?;
 728        let replica_ids = collaborators
 729            .iter()
 730            .map(|c| c.replica_id)
 731            .collect::<HashSet<_>>();
 732        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 733        while replica_ids.contains(&replica_id) {
 734            replica_id.0 += 1;
 735        }
 736        let new_collaborator = project_collaborator::ActiveModel {
 737            project_id: ActiveValue::set(project.id),
 738            connection_id: ActiveValue::set(connection.id as i32),
 739            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 740            user_id: ActiveValue::set(user_id),
 741            replica_id: ActiveValue::set(replica_id),
 742            is_host: ActiveValue::set(false),
 743            id: ActiveValue::NotSet,
 744            committer_name: ActiveValue::set(committer_name),
 745            committer_email: ActiveValue::set(committer_email),
 746        }
 747        .insert(tx)
 748        .await?;
 749        collaborators.push(new_collaborator);
 750
 751        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 752        let mut worktrees = db_worktrees
 753            .into_iter()
 754            .map(|db_worktree| {
 755                (
 756                    db_worktree.id as u64,
 757                    Worktree {
 758                        id: db_worktree.id as u64,
 759                        abs_path: db_worktree.abs_path,
 760                        root_name: db_worktree.root_name,
 761                        visible: db_worktree.visible,
 762                        entries: Default::default(),
 763                        diagnostic_summaries: Default::default(),
 764                        settings_files: Default::default(),
 765                        scan_id: db_worktree.scan_id as u64,
 766                        completed_scan_id: db_worktree.completed_scan_id as u64,
 767                        root_repo_common_dir: db_worktree.root_repo_common_dir,
 768                        legacy_repository_entries: Default::default(),
 769                    },
 770                )
 771            })
 772            .collect::<BTreeMap<_, _>>();
 773
 774        // Populate worktree entries.
 775        {
 776            let mut db_entries = worktree_entry::Entity::find()
 777                .filter(
 778                    Condition::all()
 779                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 780                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 781                )
 782                .stream(tx)
 783                .await?;
 784            while let Some(db_entry) = db_entries.next().await {
 785                let db_entry = db_entry?;
 786                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 787                    worktree.entries.push(proto::Entry {
 788                        id: db_entry.id as u64,
 789                        is_dir: db_entry.is_dir,
 790                        path: db_entry.path,
 791                        inode: db_entry.inode as u64,
 792                        mtime: Some(proto::Timestamp {
 793                            seconds: db_entry.mtime_seconds as u64,
 794                            nanos: db_entry.mtime_nanos as u32,
 795                        }),
 796                        canonical_path: db_entry.canonical_path,
 797                        is_ignored: db_entry.is_ignored,
 798                        is_external: db_entry.is_external,
 799                        is_hidden: db_entry.is_hidden,
 800                        // This is only used in the summarization backlog, so if it's None,
 801                        // that just means we won't be able to detect when to resummarize
 802                        // based on total number of backlogged bytes - instead, we'd go
 803                        // on number of files only. That shouldn't be a huge deal in practice.
 804                        size: None,
 805                        is_fifo: db_entry.is_fifo,
 806                    });
 807                }
 808            }
 809        }
 810
 811        // Populate repository entries.
 812        let mut repositories = Vec::new();
 813        {
 814            let db_repository_entries = project_repository::Entity::find()
 815                .filter(
 816                    Condition::all()
 817                        .add(project_repository::Column::ProjectId.eq(project.id))
 818                        .add(project_repository::Column::IsDeleted.eq(false)),
 819                )
 820                .all(tx)
 821                .await?;
 822            for db_repository_entry in db_repository_entries {
 823                let mut repository_statuses = project_repository_statuses::Entity::find()
 824                    .filter(
 825                        Condition::all()
 826                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 827                            .add(
 828                                project_repository_statuses::Column::RepositoryId
 829                                    .eq(db_repository_entry.id),
 830                            )
 831                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 832                    )
 833                    .stream(tx)
 834                    .await?;
 835                let mut updated_statuses = Vec::new();
 836                while let Some(status_entry) = repository_statuses.next().await {
 837                    let status_entry = status_entry?;
 838                    updated_statuses.push(db_status_to_proto(status_entry)?);
 839                }
 840
 841                let current_merge_conflicts = db_repository_entry
 842                    .current_merge_conflicts
 843                    .as_ref()
 844                    .map(|conflicts| serde_json::from_str(conflicts))
 845                    .transpose()?
 846                    .unwrap_or_default();
 847
 848                let branch_summary = db_repository_entry
 849                    .branch_summary
 850                    .as_ref()
 851                    .map(|branch_summary| serde_json::from_str(branch_summary))
 852                    .transpose()?
 853                    .unwrap_or_default();
 854
 855                let head_commit_details = db_repository_entry
 856                    .head_commit_details
 857                    .as_ref()
 858                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 859                    .transpose()?
 860                    .unwrap_or_default();
 861
 862                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 863                    .context("failed to deserialize repository's entry ids")?;
 864
 865                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 866                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 867                        worktree.legacy_repository_entries.insert(
 868                            db_repository_entry.id as u64,
 869                            proto::RepositoryEntry {
 870                                repository_id: db_repository_entry.id as u64,
 871                                updated_statuses,
 872                                removed_statuses: Vec::new(),
 873                                current_merge_conflicts,
 874                                branch_summary,
 875                            },
 876                        );
 877                    }
 878                } else {
 879                    repositories.push(proto::UpdateRepository {
 880                        project_id: db_repository_entry.project_id.0 as u64,
 881                        id: db_repository_entry.id as u64,
 882                        abs_path: db_repository_entry.abs_path.clone(),
 883                        entry_ids,
 884                        updated_statuses,
 885                        removed_statuses: Vec::new(),
 886                        current_merge_conflicts,
 887                        branch_summary,
 888                        head_commit_details,
 889                        branch_list: Vec::new(),
 890                        scan_id: db_repository_entry.scan_id as u64,
 891                        is_last_update: true,
 892                        merge_message: db_repository_entry.merge_message,
 893                        stash_entries: Vec::new(),
 894                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
 895                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
 896                        original_repo_abs_path: Some(db_repository_entry.abs_path),
 897                        linked_worktrees: db_repository_entry
 898                            .linked_worktrees
 899                            .as_deref()
 900                            .and_then(|s| serde_json::from_str(s).ok())
 901                            .unwrap_or_default(),
 902                    });
 903                }
 904            }
 905        }
 906
 907        // Populate worktree diagnostic summaries.
 908        {
 909            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 910                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 911                .stream(tx)
 912                .await?;
 913            while let Some(db_summary) = db_summaries.next().await {
 914                let db_summary = db_summary?;
 915                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 916                    worktree
 917                        .diagnostic_summaries
 918                        .push(proto::DiagnosticSummary {
 919                            path: db_summary.path,
 920                            language_server_id: db_summary.language_server_id as u64,
 921                            error_count: db_summary.error_count as u32,
 922                            warning_count: db_summary.warning_count as u32,
 923                        });
 924                }
 925            }
 926        }
 927
 928        // Populate worktree settings files
 929        {
 930            let mut db_settings_files = worktree_settings_file::Entity::find()
 931                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 932                .stream(tx)
 933                .await?;
 934            while let Some(db_settings_file) = db_settings_files.next().await {
 935                let db_settings_file = db_settings_file?;
 936                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 937                    worktree.settings_files.push(WorktreeSettingsFile {
 938                        path: db_settings_file.path,
 939                        content: db_settings_file.content,
 940                        kind: db_settings_file.kind,
 941                        outside_worktree: db_settings_file.outside_worktree,
 942                    });
 943                }
 944            }
 945        }
 946
 947        // Populate language servers.
 948        let language_servers = project
 949            .find_related(language_server::Entity)
 950            .all(tx)
 951            .await?;
 952
 953        let path_style = if project.windows_paths {
 954            PathStyle::Windows
 955        } else {
 956            PathStyle::Posix
 957        };
 958        let features: Vec<String> = serde_json::from_str(&project.features).unwrap_or_default();
 959
 960        let project = Project {
 961            id: project.id,
 962            role,
 963            collaborators: collaborators
 964                .into_iter()
 965                .map(|collaborator| ProjectCollaborator {
 966                    connection_id: collaborator.connection(),
 967                    user_id: collaborator.user_id,
 968                    replica_id: collaborator.replica_id,
 969                    is_host: collaborator.is_host,
 970                    committer_name: collaborator.committer_name,
 971                    committer_email: collaborator.committer_email,
 972                })
 973                .collect(),
 974            worktrees,
 975            repositories,
 976            language_servers: language_servers
 977                .into_iter()
 978                .map(|language_server| LanguageServer {
 979                    server: proto::LanguageServer {
 980                        id: language_server.id as u64,
 981                        name: language_server.name,
 982                        worktree_id: language_server.worktree_id.map(|id| id as u64),
 983                    },
 984                    capabilities: language_server.capabilities,
 985                })
 986                .collect(),
 987            path_style,
 988            features,
 989        };
 990        Ok((project, replica_id as ReplicaId))
 991    }
 992
 993    /// Removes the given connection from the specified project.
 994    pub async fn leave_project(
 995        &self,
 996        project_id: ProjectId,
 997        connection: ConnectionId,
 998    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 999        self.project_transaction(project_id, |tx| async move {
1000            let result = project_collaborator::Entity::delete_many()
1001                .filter(
1002                    Condition::all()
1003                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1004                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1005                        .add(
1006                            project_collaborator::Column::ConnectionServerId
1007                                .eq(connection.owner_id as i32),
1008                        ),
1009                )
1010                .exec(&*tx)
1011                .await?;
1012            if result.rows_affected == 0 {
1013                Err(anyhow!("not a collaborator on this project"))?;
1014            }
1015
1016            let project = project::Entity::find_by_id(project_id)
1017                .one(&*tx)
1018                .await?
1019                .context("no such project")?;
1020            let collaborators = project
1021                .find_related(project_collaborator::Entity)
1022                .all(&*tx)
1023                .await?;
1024            let connection_ids: Vec<ConnectionId> = collaborators
1025                .into_iter()
1026                .map(|collaborator| collaborator.connection())
1027                .collect();
1028
1029            follower::Entity::delete_many()
1030                .filter(
1031                    Condition::any()
1032                        .add(
1033                            Condition::all()
1034                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1035                                .add(
1036                                    follower::Column::LeaderConnectionServerId
1037                                        .eq(connection.owner_id),
1038                                )
1039                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1040                        )
1041                        .add(
1042                            Condition::all()
1043                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1044                                .add(
1045                                    follower::Column::FollowerConnectionServerId
1046                                        .eq(connection.owner_id),
1047                                )
1048                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1049                        ),
1050                )
1051                .exec(&*tx)
1052                .await?;
1053
1054            let room = if let Some(room_id) = project.room_id {
1055                Some(self.get_room(room_id, &tx).await?)
1056            } else {
1057                None
1058            };
1059
1060            let left_project = LeftProject {
1061                id: project_id,
1062                should_unshare: connection == project.host_connection()?,
1063                connection_ids,
1064            };
1065            Ok((room, left_project))
1066        })
1067        .await
1068    }
1069
1070    pub async fn check_user_is_project_host(
1071        &self,
1072        project_id: ProjectId,
1073        connection_id: ConnectionId,
1074    ) -> Result<()> {
1075        self.project_transaction(project_id, |tx| async move {
1076            project::Entity::find()
1077                .filter(
1078                    Condition::all()
1079                        .add(project::Column::Id.eq(project_id))
1080                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1081                        .add(
1082                            project::Column::HostConnectionServerId
1083                                .eq(Some(connection_id.owner_id as i32)),
1084                        ),
1085                )
1086                .one(&*tx)
1087                .await?
1088                .context("failed to read project host")?;
1089
1090            Ok(())
1091        })
1092        .await
1093        .map(|guard| guard.into_inner())
1094    }
1095
1096    /// Returns the current project if the given user is authorized to access it with the specified capability.
1097    pub async fn access_project(
1098        &self,
1099        project_id: ProjectId,
1100        connection_id: ConnectionId,
1101        capability: Capability,
1102        tx: &DatabaseTransaction,
1103    ) -> Result<(project::Model, ChannelRole)> {
1104        let project = project::Entity::find_by_id(project_id)
1105            .one(tx)
1106            .await?
1107            .context("no such project")?;
1108
1109        let role_from_room = if let Some(room_id) = project.room_id {
1110            room_participant::Entity::find()
1111                .filter(room_participant::Column::RoomId.eq(room_id))
1112                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1113                .one(tx)
1114                .await?
1115                .and_then(|participant| participant.role)
1116        } else {
1117            None
1118        };
1119
1120        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1121
1122        match capability {
1123            Capability::ReadWrite => {
1124                if !role.can_edit_projects() {
1125                    return Err(anyhow!("not authorized to edit projects"))?;
1126                }
1127            }
1128            Capability::ReadOnly => {
1129                if !role.can_read_projects() {
1130                    return Err(anyhow!("not authorized to read projects"))?;
1131                }
1132            }
1133        }
1134
1135        Ok((project, role))
1136    }
1137
1138    /// Returns the host connection for a read-only request to join a shared project.
1139    pub async fn host_for_read_only_project_request(
1140        &self,
1141        project_id: ProjectId,
1142        connection_id: ConnectionId,
1143    ) -> Result<ConnectionId> {
1144        self.project_transaction(project_id, |tx| async move {
1145            let (project, _) = self
1146                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1147                .await?;
1148            project.host_connection()
1149        })
1150        .await
1151        .map(|guard| guard.into_inner())
1152    }
1153
1154    /// Returns the host connection for a request to join a shared project.
1155    pub async fn host_for_mutating_project_request(
1156        &self,
1157        project_id: ProjectId,
1158        connection_id: ConnectionId,
1159    ) -> Result<ConnectionId> {
1160        self.project_transaction(project_id, |tx| async move {
1161            let (project, _) = self
1162                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1163                .await?;
1164            project.host_connection()
1165        })
1166        .await
1167        .map(|guard| guard.into_inner())
1168    }
1169
1170    pub async fn connections_for_buffer_update(
1171        &self,
1172        project_id: ProjectId,
1173        connection_id: ConnectionId,
1174        capability: Capability,
1175    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1176        self.project_transaction(project_id, |tx| async move {
1177            // Authorize
1178            let (project, _) = self
1179                .access_project(project_id, connection_id, capability, &tx)
1180                .await?;
1181
1182            let host_connection_id = project.host_connection()?;
1183
1184            let collaborators = project_collaborator::Entity::find()
1185                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1186                .all(&*tx)
1187                .await?;
1188
1189            let guest_connection_ids = collaborators
1190                .into_iter()
1191                .filter_map(|collaborator| {
1192                    if collaborator.is_host {
1193                        None
1194                    } else {
1195                        Some(collaborator.connection())
1196                    }
1197                })
1198                .collect();
1199
1200            Ok((host_connection_id, guest_connection_ids))
1201        })
1202        .await
1203    }
1204
1205    /// Returns the connection IDs in the given project.
1206    ///
1207    /// The provided `connection_id` must also be a collaborator in the project,
1208    /// otherwise an error will be returned.
1209    pub async fn project_connection_ids(
1210        &self,
1211        project_id: ProjectId,
1212        connection_id: ConnectionId,
1213        exclude_dev_server: bool,
1214    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1215        self.project_transaction(project_id, |tx| async move {
1216            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1217                .await
1218        })
1219        .await
1220    }
1221
1222    async fn internal_project_connection_ids(
1223        &self,
1224        project_id: ProjectId,
1225        connection_id: ConnectionId,
1226        exclude_dev_server: bool,
1227        tx: &DatabaseTransaction,
1228    ) -> Result<HashSet<ConnectionId>> {
1229        let project = project::Entity::find_by_id(project_id)
1230            .one(tx)
1231            .await?
1232            .context("no such project")?;
1233
1234        let mut collaborators = project_collaborator::Entity::find()
1235            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1236            .stream(tx)
1237            .await?;
1238
1239        let mut connection_ids = HashSet::default();
1240        if let Some(host_connection) = project.host_connection().log_err()
1241            && !exclude_dev_server
1242        {
1243            connection_ids.insert(host_connection);
1244        }
1245
1246        while let Some(collaborator) = collaborators.next().await {
1247            let collaborator = collaborator?;
1248            connection_ids.insert(collaborator.connection());
1249        }
1250
1251        if connection_ids.contains(&connection_id)
1252            || Some(connection_id) == project.host_connection().ok()
1253        {
1254            Ok(connection_ids)
1255        } else {
1256            Err(anyhow!(
1257                "can only send project updates to a project you're in"
1258            ))?
1259        }
1260    }
1261
1262    async fn project_guest_connection_ids(
1263        &self,
1264        project_id: ProjectId,
1265        tx: &DatabaseTransaction,
1266    ) -> Result<Vec<ConnectionId>> {
1267        let mut collaborators = project_collaborator::Entity::find()
1268            .filter(
1269                project_collaborator::Column::ProjectId
1270                    .eq(project_id)
1271                    .and(project_collaborator::Column::IsHost.eq(false)),
1272            )
1273            .stream(tx)
1274            .await?;
1275
1276        let mut guest_connection_ids = Vec::new();
1277        while let Some(collaborator) = collaborators.next().await {
1278            let collaborator = collaborator?;
1279            guest_connection_ids.push(collaborator.connection());
1280        }
1281        Ok(guest_connection_ids)
1282    }
1283
1284    /// Returns the [`RoomId`] for the given project.
1285    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1286        self.transaction(|tx| async move {
1287            Ok(project::Entity::find_by_id(project_id)
1288                .one(&*tx)
1289                .await?
1290                .and_then(|project| project.room_id))
1291        })
1292        .await
1293    }
1294
1295    pub async fn check_room_participants(
1296        &self,
1297        room_id: RoomId,
1298        leader_id: ConnectionId,
1299        follower_id: ConnectionId,
1300    ) -> Result<()> {
1301        self.transaction(|tx| async move {
1302            use room_participant::Column;
1303
1304            let count = room_participant::Entity::find()
1305                .filter(
1306                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1307                        Condition::any()
1308                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1309                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1310                            ))
1311                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1312                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1313                            )),
1314                    ),
1315                )
1316                .count(&*tx)
1317                .await?;
1318
1319            if count < 2 {
1320                Err(anyhow!("not room participants"))?;
1321            }
1322
1323            Ok(())
1324        })
1325        .await
1326    }
1327
1328    /// Adds the given follower connection as a follower of the given leader connection.
1329    pub async fn follow(
1330        &self,
1331        room_id: RoomId,
1332        project_id: ProjectId,
1333        leader_connection: ConnectionId,
1334        follower_connection: ConnectionId,
1335    ) -> Result<TransactionGuard<proto::Room>> {
1336        self.room_transaction(room_id, |tx| async move {
1337            follower::ActiveModel {
1338                room_id: ActiveValue::set(room_id),
1339                project_id: ActiveValue::set(project_id),
1340                leader_connection_server_id: ActiveValue::set(ServerId(
1341                    leader_connection.owner_id as i32,
1342                )),
1343                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1344                follower_connection_server_id: ActiveValue::set(ServerId(
1345                    follower_connection.owner_id as i32,
1346                )),
1347                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1348                ..Default::default()
1349            }
1350            .insert(&*tx)
1351            .await?;
1352
1353            let room = self.get_room(room_id, &tx).await?;
1354            Ok(room)
1355        })
1356        .await
1357    }
1358
1359    /// Removes the given follower connection as a follower of the given leader connection.
1360    pub async fn unfollow(
1361        &self,
1362        room_id: RoomId,
1363        project_id: ProjectId,
1364        leader_connection: ConnectionId,
1365        follower_connection: ConnectionId,
1366    ) -> Result<TransactionGuard<proto::Room>> {
1367        self.room_transaction(room_id, |tx| async move {
1368            follower::Entity::delete_many()
1369                .filter(
1370                    Condition::all()
1371                        .add(follower::Column::RoomId.eq(room_id))
1372                        .add(follower::Column::ProjectId.eq(project_id))
1373                        .add(
1374                            follower::Column::LeaderConnectionServerId
1375                                .eq(leader_connection.owner_id),
1376                        )
1377                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1378                        .add(
1379                            follower::Column::FollowerConnectionServerId
1380                                .eq(follower_connection.owner_id),
1381                        )
1382                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1383                )
1384                .exec(&*tx)
1385                .await?;
1386
1387            let room = self.get_room(room_id, &tx).await?;
1388            Ok(room)
1389        })
1390        .await
1391    }
1392}