projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37        features: &[String],
  38    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  39        self.room_transaction(room_id, |tx| async move {
  40            let participant = room_participant::Entity::find()
  41                .filter(
  42                    Condition::all()
  43                        .add(
  44                            room_participant::Column::AnsweringConnectionId
  45                                .eq(connection.id as i32),
  46                        )
  47                        .add(
  48                            room_participant::Column::AnsweringConnectionServerId
  49                                .eq(connection.owner_id as i32),
  50                        ),
  51                )
  52                .one(&*tx)
  53                .await?
  54                .context("could not find participant")?;
  55            if participant.room_id != room_id {
  56                return Err(anyhow!("shared project on unexpected room"))?;
  57            }
  58            if !participant
  59                .role
  60                .unwrap_or(ChannelRole::Member)
  61                .can_edit_projects()
  62            {
  63                return Err(anyhow!("guests cannot share projects"))?;
  64            }
  65
  66            let project = project::ActiveModel {
  67                room_id: ActiveValue::set(Some(participant.room_id)),
  68                host_user_id: ActiveValue::set(Some(participant.user_id)),
  69                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  70                host_connection_server_id: ActiveValue::set(Some(ServerId(
  71                    connection.owner_id as i32,
  72                ))),
  73                id: ActiveValue::NotSet,
  74                windows_paths: ActiveValue::set(windows_paths),
  75                features: ActiveValue::set(serde_json::to_string(features).unwrap()),
  76            }
  77            .insert(&*tx)
  78            .await?;
  79
  80            if !worktrees.is_empty() {
  81                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  82                    worktree::ActiveModel {
  83                        id: ActiveValue::set(worktree.id as i64),
  84                        project_id: ActiveValue::set(project.id),
  85                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  86                        root_name: ActiveValue::set(worktree.root_name.clone()),
  87                        visible: ActiveValue::set(worktree.visible),
  88                        scan_id: ActiveValue::set(0),
  89                        completed_scan_id: ActiveValue::set(0),
  90                        root_repo_common_dir: ActiveValue::set(None),
  91                    }
  92                }))
  93                .exec(&*tx)
  94                .await?;
  95            }
  96
  97            let replica_id = if is_ssh_project {
  98                clock::ReplicaId::REMOTE_SERVER
  99            } else {
 100                clock::ReplicaId::LOCAL
 101            };
 102
 103            project_collaborator::ActiveModel {
 104                project_id: ActiveValue::set(project.id),
 105                connection_id: ActiveValue::set(connection.id as i32),
 106                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 107                user_id: ActiveValue::set(participant.user_id),
 108                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 109                is_host: ActiveValue::set(true),
 110                id: ActiveValue::NotSet,
 111                committer_name: ActiveValue::Set(None),
 112                committer_email: ActiveValue::Set(None),
 113            }
 114            .insert(&*tx)
 115            .await?;
 116
 117            let room = self.get_room(room_id, &tx).await?;
 118            Ok((project.id, room))
 119        })
 120        .await
 121    }
 122
 123    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 124        self.transaction(|tx| async move {
 125            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 126            Ok(())
 127        })
 128        .await
 129    }
 130
 131    /// Unshares the given project.
 132    pub async fn unshare_project(
 133        &self,
 134        project_id: ProjectId,
 135        connection: ConnectionId,
 136    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 137        self.project_transaction(project_id, |tx| async move {
 138            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 139            let project = project::Entity::find_by_id(project_id)
 140                .one(&*tx)
 141                .await?
 142                .context("project not found")?;
 143            let room = if let Some(room_id) = project.room_id {
 144                Some(self.get_room(room_id, &tx).await?)
 145            } else {
 146                None
 147            };
 148            if project.host_connection()? == connection {
 149                return Ok((true, room, guest_connection_ids));
 150            }
 151            Err(anyhow!("cannot unshare a project hosted by another user"))?
 152        })
 153        .await
 154    }
 155
 156    /// Updates the worktrees associated with the given project.
 157    pub async fn update_project(
 158        &self,
 159        project_id: ProjectId,
 160        connection: ConnectionId,
 161        worktrees: &[proto::WorktreeMetadata],
 162    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 163        self.project_transaction(project_id, |tx| async move {
 164            let project = project::Entity::find_by_id(project_id)
 165                .filter(
 166                    Condition::all()
 167                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 168                        .add(
 169                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 170                        ),
 171                )
 172                .one(&*tx)
 173                .await?
 174                .context("no such project")?;
 175
 176            self.update_project_worktrees(project.id, worktrees, &tx)
 177                .await?;
 178
 179            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 180
 181            let room = if let Some(room_id) = project.room_id {
 182                Some(self.get_room(room_id, &tx).await?)
 183            } else {
 184                None
 185            };
 186
 187            Ok((room, guest_connection_ids))
 188        })
 189        .await
 190    }
 191
 192    pub(in crate::db) async fn update_project_worktrees(
 193        &self,
 194        project_id: ProjectId,
 195        worktrees: &[proto::WorktreeMetadata],
 196        tx: &DatabaseTransaction,
 197    ) -> Result<()> {
 198        if !worktrees.is_empty() {
 199            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 200                id: ActiveValue::set(worktree.id as i64),
 201                project_id: ActiveValue::set(project_id),
 202                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 203                root_name: ActiveValue::set(worktree.root_name.clone()),
 204                visible: ActiveValue::set(worktree.visible),
 205                scan_id: ActiveValue::set(0),
 206                completed_scan_id: ActiveValue::set(0),
 207                root_repo_common_dir: ActiveValue::set(None),
 208            }))
 209            .on_conflict(
 210                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 211                    .update_column(worktree::Column::RootName)
 212                    .to_owned(),
 213            )
 214            .exec(tx)
 215            .await?;
 216        }
 217
 218        worktree::Entity::delete_many()
 219            .filter(worktree::Column::ProjectId.eq(project_id).and(
 220                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 221            ))
 222            .exec(tx)
 223            .await?;
 224
 225        Ok(())
 226    }
 227
 228    pub async fn update_worktree(
 229        &self,
 230        update: &proto::UpdateWorktree,
 231        connection: ConnectionId,
 232    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 233        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 234            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 235        {
 236            return Err(anyhow!(
 237                "invalid worktree update. removed entries: {}, updated entries: {}",
 238                update.removed_entries.len(),
 239                update.updated_entries.len()
 240            ))?;
 241        }
 242
 243        let project_id = ProjectId::from_proto(update.project_id);
 244        let worktree_id = update.worktree_id as i64;
 245        self.project_transaction(project_id, |tx| async move {
 246            // Ensure the update comes from the host.
 247            let _project = project::Entity::find_by_id(project_id)
 248                .filter(
 249                    Condition::all()
 250                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 251                        .add(
 252                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 253                        ),
 254                )
 255                .one(&*tx)
 256                .await?
 257                .with_context(|| format!("no such project: {project_id}"))?;
 258
 259            // Update metadata.
 260            worktree::Entity::update(worktree::ActiveModel {
 261                id: ActiveValue::set(worktree_id),
 262                project_id: ActiveValue::set(project_id),
 263                root_name: ActiveValue::set(update.root_name.clone()),
 264                scan_id: ActiveValue::set(update.scan_id as i64),
 265                completed_scan_id: if update.is_last_update {
 266                    ActiveValue::set(update.scan_id as i64)
 267                } else {
 268                    ActiveValue::default()
 269                },
 270                abs_path: ActiveValue::set(update.abs_path.clone()),
 271                root_repo_common_dir: ActiveValue::set(update.root_repo_common_dir.clone()),
 272                ..Default::default()
 273            })
 274            .exec(&*tx)
 275            .await?;
 276
 277            if !update.updated_entries.is_empty() {
 278                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 279                    let mtime = entry.mtime.clone().unwrap_or_default();
 280                    worktree_entry::ActiveModel {
 281                        project_id: ActiveValue::set(project_id),
 282                        worktree_id: ActiveValue::set(worktree_id),
 283                        id: ActiveValue::set(entry.id as i64),
 284                        is_dir: ActiveValue::set(entry.is_dir),
 285                        path: ActiveValue::set(entry.path.clone()),
 286                        inode: ActiveValue::set(entry.inode as i64),
 287                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 288                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 289                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 290                        is_ignored: ActiveValue::set(entry.is_ignored),
 291                        git_status: ActiveValue::set(None),
 292                        is_external: ActiveValue::set(entry.is_external),
 293                        is_deleted: ActiveValue::set(false),
 294                        is_hidden: ActiveValue::set(entry.is_hidden),
 295                        scan_id: ActiveValue::set(update.scan_id as i64),
 296                        is_fifo: ActiveValue::set(entry.is_fifo),
 297                    }
 298                }))
 299                .on_conflict(
 300                    OnConflict::columns([
 301                        worktree_entry::Column::ProjectId,
 302                        worktree_entry::Column::WorktreeId,
 303                        worktree_entry::Column::Id,
 304                    ])
 305                    .update_columns([
 306                        worktree_entry::Column::IsDir,
 307                        worktree_entry::Column::Path,
 308                        worktree_entry::Column::Inode,
 309                        worktree_entry::Column::MtimeSeconds,
 310                        worktree_entry::Column::MtimeNanos,
 311                        worktree_entry::Column::CanonicalPath,
 312                        worktree_entry::Column::IsIgnored,
 313                        worktree_entry::Column::IsHidden,
 314                        worktree_entry::Column::ScanId,
 315                    ])
 316                    .to_owned(),
 317                )
 318                .exec(&*tx)
 319                .await?;
 320            }
 321
 322            if !update.removed_entries.is_empty() {
 323                worktree_entry::Entity::update_many()
 324                    .filter(
 325                        worktree_entry::Column::ProjectId
 326                            .eq(project_id)
 327                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 328                            .and(
 329                                worktree_entry::Column::Id
 330                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 331                            ),
 332                    )
 333                    .set(worktree_entry::ActiveModel {
 334                        is_deleted: ActiveValue::Set(true),
 335                        scan_id: ActiveValue::Set(update.scan_id as i64),
 336                        ..Default::default()
 337                    })
 338                    .exec(&*tx)
 339                    .await?;
 340            }
 341
 342            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 343            Ok(connection_ids)
 344        })
 345        .await
 346    }
 347
 348    pub async fn update_repository(
 349        &self,
 350        update: &proto::UpdateRepository,
 351        _connection: ConnectionId,
 352    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 353        let project_id = ProjectId::from_proto(update.project_id);
 354        let repository_id = update.id as i64;
 355        self.project_transaction(project_id, |tx| async move {
 356            project_repository::Entity::insert(project_repository::ActiveModel {
 357                project_id: ActiveValue::set(project_id),
 358                id: ActiveValue::set(repository_id),
 359                legacy_worktree_id: ActiveValue::set(None),
 360                abs_path: ActiveValue::set(update.abs_path.clone()),
 361                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 362                scan_id: ActiveValue::set(update.scan_id as i64),
 363                is_deleted: ActiveValue::set(false),
 364                branch_summary: ActiveValue::Set(
 365                    update
 366                        .branch_summary
 367                        .as_ref()
 368                        .map(|summary| serde_json::to_string(summary).unwrap()),
 369                ),
 370                head_commit_details: ActiveValue::Set(
 371                    update
 372                        .head_commit_details
 373                        .as_ref()
 374                        .map(|details| serde_json::to_string(details).unwrap()),
 375                ),
 376                current_merge_conflicts: ActiveValue::Set(Some(
 377                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 378                )),
 379                merge_message: ActiveValue::set(update.merge_message.clone()),
 380                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 381                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 382                linked_worktrees: ActiveValue::Set(Some(
 383                    serde_json::to_string(&update.linked_worktrees).unwrap(),
 384                )),
 385            })
 386            .on_conflict(
 387                OnConflict::columns([
 388                    project_repository::Column::ProjectId,
 389                    project_repository::Column::Id,
 390                ])
 391                .update_columns([
 392                    project_repository::Column::ScanId,
 393                    project_repository::Column::BranchSummary,
 394                    project_repository::Column::EntryIds,
 395                    project_repository::Column::AbsPath,
 396                    project_repository::Column::CurrentMergeConflicts,
 397                    project_repository::Column::HeadCommitDetails,
 398                    project_repository::Column::MergeMessage,
 399                    project_repository::Column::LinkedWorktrees,
 400                ])
 401                .to_owned(),
 402            )
 403            .exec(&*tx)
 404            .await?;
 405
 406            let has_any_statuses = !update.updated_statuses.is_empty();
 407
 408            if has_any_statuses {
 409                project_repository_statuses::Entity::insert_many(
 410                    update.updated_statuses.iter().map(|status_entry| {
 411                        let (repo_path, status_kind, first_status, second_status) =
 412                            proto_status_to_db(status_entry.clone());
 413                        project_repository_statuses::ActiveModel {
 414                            project_id: ActiveValue::set(project_id),
 415                            repository_id: ActiveValue::set(repository_id),
 416                            scan_id: ActiveValue::set(update.scan_id as i64),
 417                            is_deleted: ActiveValue::set(false),
 418                            repo_path: ActiveValue::set(repo_path),
 419                            status: ActiveValue::set(0),
 420                            status_kind: ActiveValue::set(status_kind),
 421                            first_status: ActiveValue::set(first_status),
 422                            second_status: ActiveValue::set(second_status),
 423                            lines_added: ActiveValue::set(
 424                                status_entry.diff_stat_added.map(|v| v as i32),
 425                            ),
 426                            lines_deleted: ActiveValue::set(
 427                                status_entry.diff_stat_deleted.map(|v| v as i32),
 428                            ),
 429                        }
 430                    }),
 431                )
 432                .on_conflict(
 433                    OnConflict::columns([
 434                        project_repository_statuses::Column::ProjectId,
 435                        project_repository_statuses::Column::RepositoryId,
 436                        project_repository_statuses::Column::RepoPath,
 437                    ])
 438                    .update_columns([
 439                        project_repository_statuses::Column::ScanId,
 440                        project_repository_statuses::Column::StatusKind,
 441                        project_repository_statuses::Column::FirstStatus,
 442                        project_repository_statuses::Column::SecondStatus,
 443                        project_repository_statuses::Column::LinesAdded,
 444                        project_repository_statuses::Column::LinesDeleted,
 445                    ])
 446                    .to_owned(),
 447                )
 448                .exec(&*tx)
 449                .await?;
 450            }
 451
 452            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 453
 454            if has_any_removed_statuses {
 455                project_repository_statuses::Entity::update_many()
 456                    .filter(
 457                        project_repository_statuses::Column::ProjectId
 458                            .eq(project_id)
 459                            .and(
 460                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 461                            )
 462                            .and(
 463                                project_repository_statuses::Column::RepoPath
 464                                    .is_in(update.removed_statuses.iter()),
 465                            ),
 466                    )
 467                    .set(project_repository_statuses::ActiveModel {
 468                        is_deleted: ActiveValue::Set(true),
 469                        scan_id: ActiveValue::Set(update.scan_id as i64),
 470                        ..Default::default()
 471                    })
 472                    .exec(&*tx)
 473                    .await?;
 474            }
 475
 476            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 477            Ok(connection_ids)
 478        })
 479        .await
 480    }
 481
 482    pub async fn remove_repository(
 483        &self,
 484        remove: &proto::RemoveRepository,
 485        _connection: ConnectionId,
 486    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 487        let project_id = ProjectId::from_proto(remove.project_id);
 488        let repository_id = remove.id as i64;
 489        self.project_transaction(project_id, |tx| async move {
 490            project_repository::Entity::update_many()
 491                .filter(
 492                    project_repository::Column::ProjectId
 493                        .eq(project_id)
 494                        .and(project_repository::Column::Id.eq(repository_id)),
 495                )
 496                .set(project_repository::ActiveModel {
 497                    is_deleted: ActiveValue::Set(true),
 498                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 499                    ..Default::default()
 500                })
 501                .exec(&*tx)
 502                .await?;
 503
 504            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 505            Ok(connection_ids)
 506        })
 507        .await
 508    }
 509
 510    /// Updates the diagnostic summary for the given connection.
 511    pub async fn update_diagnostic_summary(
 512        &self,
 513        update: &proto::UpdateDiagnosticSummary,
 514        connection: ConnectionId,
 515    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 516        let project_id = ProjectId::from_proto(update.project_id);
 517        let worktree_id = update.worktree_id as i64;
 518        self.project_transaction(project_id, |tx| async move {
 519            let summary = update.summary.as_ref().context("invalid summary")?;
 520
 521            // Ensure the update comes from the host.
 522            let project = project::Entity::find_by_id(project_id)
 523                .one(&*tx)
 524                .await?
 525                .context("no such project")?;
 526            if project.host_connection()? != connection {
 527                return Err(anyhow!("can't update a project hosted by someone else"))?;
 528            }
 529
 530            // Update summary.
 531            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 532                project_id: ActiveValue::set(project_id),
 533                worktree_id: ActiveValue::set(worktree_id),
 534                path: ActiveValue::set(summary.path.clone()),
 535                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 536                error_count: ActiveValue::set(summary.error_count as i32),
 537                warning_count: ActiveValue::set(summary.warning_count as i32),
 538            })
 539            .on_conflict(
 540                OnConflict::columns([
 541                    worktree_diagnostic_summary::Column::ProjectId,
 542                    worktree_diagnostic_summary::Column::WorktreeId,
 543                    worktree_diagnostic_summary::Column::Path,
 544                ])
 545                .update_columns([
 546                    worktree_diagnostic_summary::Column::LanguageServerId,
 547                    worktree_diagnostic_summary::Column::ErrorCount,
 548                    worktree_diagnostic_summary::Column::WarningCount,
 549                ])
 550                .to_owned(),
 551            )
 552            .exec(&*tx)
 553            .await?;
 554
 555            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 556            Ok(connection_ids)
 557        })
 558        .await
 559    }
 560
 561    /// Starts the language server for the given connection.
 562    pub async fn start_language_server(
 563        &self,
 564        update: &proto::StartLanguageServer,
 565        connection: ConnectionId,
 566    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 567        let project_id = ProjectId::from_proto(update.project_id);
 568        self.project_transaction(project_id, |tx| async move {
 569            let server = update.server.as_ref().context("invalid language server")?;
 570
 571            // Ensure the update comes from the host.
 572            let project = project::Entity::find_by_id(project_id)
 573                .one(&*tx)
 574                .await?
 575                .context("no such project")?;
 576            if project.host_connection()? != connection {
 577                return Err(anyhow!("can't update a project hosted by someone else"))?;
 578            }
 579
 580            // Add the newly-started language server.
 581            language_server::Entity::insert(language_server::ActiveModel {
 582                project_id: ActiveValue::set(project_id),
 583                id: ActiveValue::set(server.id as i64),
 584                name: ActiveValue::set(server.name.clone()),
 585                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 586                capabilities: ActiveValue::set(update.capabilities.clone()),
 587            })
 588            .on_conflict(
 589                OnConflict::columns([
 590                    language_server::Column::ProjectId,
 591                    language_server::Column::Id,
 592                ])
 593                .update_columns([
 594                    language_server::Column::Name,
 595                    language_server::Column::Capabilities,
 596                    language_server::Column::WorktreeId,
 597                ])
 598                .to_owned(),
 599            )
 600            .exec(&*tx)
 601            .await?;
 602
 603            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 604            Ok(connection_ids)
 605        })
 606        .await
 607    }
 608
 609    /// Updates the worktree settings for the given connection.
 610    pub async fn update_worktree_settings(
 611        &self,
 612        update: &proto::UpdateWorktreeSettings,
 613        connection: ConnectionId,
 614    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 615        let project_id = ProjectId::from_proto(update.project_id);
 616        let kind = match update.kind {
 617            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 618                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 619            None => proto::LocalSettingsKind::Settings,
 620        };
 621        let kind = LocalSettingsKind::from_proto(kind);
 622        self.project_transaction(project_id, |tx| async move {
 623            // Ensure the update comes from the host.
 624            let project = project::Entity::find_by_id(project_id)
 625                .one(&*tx)
 626                .await?
 627                .context("no such project")?;
 628            if project.host_connection()? != connection {
 629                return Err(anyhow!("can't update a project hosted by someone else"))?;
 630            }
 631
 632            if let Some(content) = &update.content {
 633                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 634                    project_id: ActiveValue::Set(project_id),
 635                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 636                    path: ActiveValue::Set(update.path.clone()),
 637                    content: ActiveValue::Set(content.clone()),
 638                    kind: ActiveValue::Set(kind),
 639                    outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
 640                })
 641                .on_conflict(
 642                    OnConflict::columns([
 643                        worktree_settings_file::Column::ProjectId,
 644                        worktree_settings_file::Column::WorktreeId,
 645                        worktree_settings_file::Column::Path,
 646                    ])
 647                    .update_columns([
 648                        worktree_settings_file::Column::Content,
 649                        worktree_settings_file::Column::OutsideWorktree,
 650                    ])
 651                    .to_owned(),
 652                )
 653                .exec(&*tx)
 654                .await?;
 655            } else {
 656                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 657                    project_id: ActiveValue::Set(project_id),
 658                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 659                    path: ActiveValue::Set(update.path.clone()),
 660                    ..Default::default()
 661                })
 662                .exec(&*tx)
 663                .await?;
 664            }
 665
 666            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 667            Ok(connection_ids)
 668        })
 669        .await
 670    }
 671
 672    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 673        self.transaction(|tx| async move {
 674            Ok(project::Entity::find_by_id(id)
 675                .one(&*tx)
 676                .await?
 677                .context("no such project")?)
 678        })
 679        .await
 680    }
 681
 682    /// Adds the given connection to the specified project
 683    /// in the current room.
 684    pub async fn join_project(
 685        &self,
 686        project_id: ProjectId,
 687        connection: ConnectionId,
 688        user_id: UserId,
 689        committer_name: Option<String>,
 690        committer_email: Option<String>,
 691    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 692        self.project_transaction(project_id, move |tx| {
 693            let committer_name = committer_name.clone();
 694            let committer_email = committer_email.clone();
 695            async move {
 696                let (project, role) = self
 697                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 698                    .await?;
 699                self.join_project_internal(
 700                    project,
 701                    user_id,
 702                    committer_name,
 703                    committer_email,
 704                    connection,
 705                    role,
 706                    &tx,
 707                )
 708                .await
 709            }
 710        })
 711        .await
 712    }
 713
 714    async fn join_project_internal(
 715        &self,
 716        project: project::Model,
 717        user_id: UserId,
 718        committer_name: Option<String>,
 719        committer_email: Option<String>,
 720        connection: ConnectionId,
 721        role: ChannelRole,
 722        tx: &DatabaseTransaction,
 723    ) -> Result<(Project, ReplicaId)> {
 724        let mut collaborators = project
 725            .find_related(project_collaborator::Entity)
 726            .all(tx)
 727            .await?;
 728        let replica_ids = collaborators
 729            .iter()
 730            .map(|c| c.replica_id)
 731            .collect::<HashSet<_>>();
 732        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 733        while replica_ids.contains(&replica_id) {
 734            replica_id.0 += 1;
 735        }
 736        let new_collaborator = project_collaborator::ActiveModel {
 737            project_id: ActiveValue::set(project.id),
 738            connection_id: ActiveValue::set(connection.id as i32),
 739            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 740            user_id: ActiveValue::set(user_id),
 741            replica_id: ActiveValue::set(replica_id),
 742            is_host: ActiveValue::set(false),
 743            id: ActiveValue::NotSet,
 744            committer_name: ActiveValue::set(committer_name),
 745            committer_email: ActiveValue::set(committer_email),
 746        }
 747        .insert(tx)
 748        .await?;
 749        collaborators.push(new_collaborator);
 750
 751        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 752        let mut worktrees = db_worktrees
 753            .into_iter()
 754            .map(|db_worktree| {
 755                (
 756                    db_worktree.id as u64,
 757                    Worktree {
 758                        id: db_worktree.id as u64,
 759                        abs_path: db_worktree.abs_path,
 760                        root_name: db_worktree.root_name,
 761                        visible: db_worktree.visible,
 762                        entries: Default::default(),
 763                        diagnostic_summaries: Default::default(),
 764                        settings_files: Default::default(),
 765                        scan_id: db_worktree.scan_id as u64,
 766                        completed_scan_id: db_worktree.completed_scan_id as u64,
 767                        root_repo_common_dir: db_worktree.root_repo_common_dir,
 768                        legacy_repository_entries: Default::default(),
 769                    },
 770                )
 771            })
 772            .collect::<BTreeMap<_, _>>();
 773
 774        // Populate worktree entries.
 775        {
 776            let mut db_entries = worktree_entry::Entity::find()
 777                .filter(
 778                    Condition::all()
 779                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 780                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 781                )
 782                .stream(tx)
 783                .await?;
 784            while let Some(db_entry) = db_entries.next().await {
 785                let db_entry = db_entry?;
 786                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 787                    worktree.entries.push(proto::Entry {
 788                        id: db_entry.id as u64,
 789                        is_dir: db_entry.is_dir,
 790                        path: db_entry.path,
 791                        inode: db_entry.inode as u64,
 792                        mtime: Some(proto::Timestamp {
 793                            seconds: db_entry.mtime_seconds as u64,
 794                            nanos: db_entry.mtime_nanos as u32,
 795                        }),
 796                        canonical_path: db_entry.canonical_path,
 797                        is_ignored: db_entry.is_ignored,
 798                        is_external: db_entry.is_external,
 799                        is_hidden: db_entry.is_hidden,
 800                        // This is only used in the summarization backlog, so if it's None,
 801                        // that just means we won't be able to detect when to resummarize
 802                        // based on total number of backlogged bytes - instead, we'd go
 803                        // on number of files only. That shouldn't be a huge deal in practice.
 804                        size: None,
 805                        is_fifo: db_entry.is_fifo,
 806                    });
 807                }
 808            }
 809        }
 810
 811        // Populate repository entries.
 812        let mut repositories = Vec::new();
 813        {
 814            let db_repository_entries = project_repository::Entity::find()
 815                .filter(
 816                    Condition::all()
 817                        .add(project_repository::Column::ProjectId.eq(project.id))
 818                        .add(project_repository::Column::IsDeleted.eq(false)),
 819                )
 820                .all(tx)
 821                .await?;
 822            for db_repository_entry in db_repository_entries {
 823                let mut repository_statuses = project_repository_statuses::Entity::find()
 824                    .filter(
 825                        Condition::all()
 826                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 827                            .add(
 828                                project_repository_statuses::Column::RepositoryId
 829                                    .eq(db_repository_entry.id),
 830                            )
 831                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 832                    )
 833                    .stream(tx)
 834                    .await?;
 835                let mut updated_statuses = Vec::new();
 836                while let Some(status_entry) = repository_statuses.next().await {
 837                    let status_entry = status_entry?;
 838                    updated_statuses.push(db_status_to_proto(status_entry)?);
 839                }
 840
 841                let current_merge_conflicts = db_repository_entry
 842                    .current_merge_conflicts
 843                    .as_ref()
 844                    .map(|conflicts| serde_json::from_str(conflicts))
 845                    .transpose()?
 846                    .unwrap_or_default();
 847
 848                let branch_summary = db_repository_entry
 849                    .branch_summary
 850                    .as_ref()
 851                    .map(|branch_summary| serde_json::from_str(branch_summary))
 852                    .transpose()?
 853                    .unwrap_or_default();
 854
 855                let head_commit_details = db_repository_entry
 856                    .head_commit_details
 857                    .as_ref()
 858                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 859                    .transpose()?
 860                    .unwrap_or_default();
 861
 862                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 863                    .context("failed to deserialize repository's entry ids")?;
 864
 865                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 866                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 867                        worktree.legacy_repository_entries.insert(
 868                            db_repository_entry.id as u64,
 869                            proto::RepositoryEntry {
 870                                repository_id: db_repository_entry.id as u64,
 871                                updated_statuses,
 872                                removed_statuses: Vec::new(),
 873                                current_merge_conflicts,
 874                                branch_summary,
 875                            },
 876                        );
 877                    }
 878                } else {
 879                    repositories.push(proto::UpdateRepository {
 880                        project_id: db_repository_entry.project_id.0 as u64,
 881                        id: db_repository_entry.id as u64,
 882                        abs_path: db_repository_entry.abs_path.clone(),
 883                        entry_ids,
 884                        updated_statuses,
 885                        removed_statuses: Vec::new(),
 886                        current_merge_conflicts,
 887                        branch_summary,
 888                        head_commit_details,
 889                        scan_id: db_repository_entry.scan_id as u64,
 890                        is_last_update: true,
 891                        merge_message: db_repository_entry.merge_message,
 892                        stash_entries: Vec::new(),
 893                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
 894                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
 895                        original_repo_abs_path: Some(db_repository_entry.abs_path),
 896                        linked_worktrees: db_repository_entry
 897                            .linked_worktrees
 898                            .as_deref()
 899                            .and_then(|s| serde_json::from_str(s).ok())
 900                            .unwrap_or_default(),
 901                    });
 902                }
 903            }
 904        }
 905
 906        // Populate worktree diagnostic summaries.
 907        {
 908            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 909                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 910                .stream(tx)
 911                .await?;
 912            while let Some(db_summary) = db_summaries.next().await {
 913                let db_summary = db_summary?;
 914                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 915                    worktree
 916                        .diagnostic_summaries
 917                        .push(proto::DiagnosticSummary {
 918                            path: db_summary.path,
 919                            language_server_id: db_summary.language_server_id as u64,
 920                            error_count: db_summary.error_count as u32,
 921                            warning_count: db_summary.warning_count as u32,
 922                        });
 923                }
 924            }
 925        }
 926
 927        // Populate worktree settings files
 928        {
 929            let mut db_settings_files = worktree_settings_file::Entity::find()
 930                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 931                .stream(tx)
 932                .await?;
 933            while let Some(db_settings_file) = db_settings_files.next().await {
 934                let db_settings_file = db_settings_file?;
 935                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 936                    worktree.settings_files.push(WorktreeSettingsFile {
 937                        path: db_settings_file.path,
 938                        content: db_settings_file.content,
 939                        kind: db_settings_file.kind,
 940                        outside_worktree: db_settings_file.outside_worktree,
 941                    });
 942                }
 943            }
 944        }
 945
 946        // Populate language servers.
 947        let language_servers = project
 948            .find_related(language_server::Entity)
 949            .all(tx)
 950            .await?;
 951
 952        let path_style = if project.windows_paths {
 953            PathStyle::Windows
 954        } else {
 955            PathStyle::Posix
 956        };
 957        let features: Vec<String> = serde_json::from_str(&project.features).unwrap_or_default();
 958
 959        let project = Project {
 960            id: project.id,
 961            role,
 962            collaborators: collaborators
 963                .into_iter()
 964                .map(|collaborator| ProjectCollaborator {
 965                    connection_id: collaborator.connection(),
 966                    user_id: collaborator.user_id,
 967                    replica_id: collaborator.replica_id,
 968                    is_host: collaborator.is_host,
 969                    committer_name: collaborator.committer_name,
 970                    committer_email: collaborator.committer_email,
 971                })
 972                .collect(),
 973            worktrees,
 974            repositories,
 975            language_servers: language_servers
 976                .into_iter()
 977                .map(|language_server| LanguageServer {
 978                    server: proto::LanguageServer {
 979                        id: language_server.id as u64,
 980                        name: language_server.name,
 981                        worktree_id: language_server.worktree_id.map(|id| id as u64),
 982                    },
 983                    capabilities: language_server.capabilities,
 984                })
 985                .collect(),
 986            path_style,
 987            features,
 988        };
 989        Ok((project, replica_id as ReplicaId))
 990    }
 991
 992    /// Removes the given connection from the specified project.
 993    pub async fn leave_project(
 994        &self,
 995        project_id: ProjectId,
 996        connection: ConnectionId,
 997    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 998        self.project_transaction(project_id, |tx| async move {
 999            let result = project_collaborator::Entity::delete_many()
1000                .filter(
1001                    Condition::all()
1002                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1003                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1004                        .add(
1005                            project_collaborator::Column::ConnectionServerId
1006                                .eq(connection.owner_id as i32),
1007                        ),
1008                )
1009                .exec(&*tx)
1010                .await?;
1011            if result.rows_affected == 0 {
1012                Err(anyhow!("not a collaborator on this project"))?;
1013            }
1014
1015            let project = project::Entity::find_by_id(project_id)
1016                .one(&*tx)
1017                .await?
1018                .context("no such project")?;
1019            let collaborators = project
1020                .find_related(project_collaborator::Entity)
1021                .all(&*tx)
1022                .await?;
1023            let connection_ids: Vec<ConnectionId> = collaborators
1024                .into_iter()
1025                .map(|collaborator| collaborator.connection())
1026                .collect();
1027
1028            follower::Entity::delete_many()
1029                .filter(
1030                    Condition::any()
1031                        .add(
1032                            Condition::all()
1033                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1034                                .add(
1035                                    follower::Column::LeaderConnectionServerId
1036                                        .eq(connection.owner_id),
1037                                )
1038                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1039                        )
1040                        .add(
1041                            Condition::all()
1042                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1043                                .add(
1044                                    follower::Column::FollowerConnectionServerId
1045                                        .eq(connection.owner_id),
1046                                )
1047                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1048                        ),
1049                )
1050                .exec(&*tx)
1051                .await?;
1052
1053            let room = if let Some(room_id) = project.room_id {
1054                Some(self.get_room(room_id, &tx).await?)
1055            } else {
1056                None
1057            };
1058
1059            let left_project = LeftProject {
1060                id: project_id,
1061                should_unshare: connection == project.host_connection()?,
1062                connection_ids,
1063            };
1064            Ok((room, left_project))
1065        })
1066        .await
1067    }
1068
1069    pub async fn check_user_is_project_host(
1070        &self,
1071        project_id: ProjectId,
1072        connection_id: ConnectionId,
1073    ) -> Result<()> {
1074        self.project_transaction(project_id, |tx| async move {
1075            project::Entity::find()
1076                .filter(
1077                    Condition::all()
1078                        .add(project::Column::Id.eq(project_id))
1079                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1080                        .add(
1081                            project::Column::HostConnectionServerId
1082                                .eq(Some(connection_id.owner_id as i32)),
1083                        ),
1084                )
1085                .one(&*tx)
1086                .await?
1087                .context("failed to read project host")?;
1088
1089            Ok(())
1090        })
1091        .await
1092        .map(|guard| guard.into_inner())
1093    }
1094
1095    /// Returns the current project if the given user is authorized to access it with the specified capability.
1096    pub async fn access_project(
1097        &self,
1098        project_id: ProjectId,
1099        connection_id: ConnectionId,
1100        capability: Capability,
1101        tx: &DatabaseTransaction,
1102    ) -> Result<(project::Model, ChannelRole)> {
1103        let project = project::Entity::find_by_id(project_id)
1104            .one(tx)
1105            .await?
1106            .context("no such project")?;
1107
1108        let role_from_room = if let Some(room_id) = project.room_id {
1109            room_participant::Entity::find()
1110                .filter(room_participant::Column::RoomId.eq(room_id))
1111                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1112                .one(tx)
1113                .await?
1114                .and_then(|participant| participant.role)
1115        } else {
1116            None
1117        };
1118
1119        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1120
1121        match capability {
1122            Capability::ReadWrite => {
1123                if !role.can_edit_projects() {
1124                    return Err(anyhow!("not authorized to edit projects"))?;
1125                }
1126            }
1127            Capability::ReadOnly => {
1128                if !role.can_read_projects() {
1129                    return Err(anyhow!("not authorized to read projects"))?;
1130                }
1131            }
1132        }
1133
1134        Ok((project, role))
1135    }
1136
1137    /// Returns the host connection for a read-only request to join a shared project.
1138    pub async fn host_for_read_only_project_request(
1139        &self,
1140        project_id: ProjectId,
1141        connection_id: ConnectionId,
1142    ) -> Result<ConnectionId> {
1143        self.project_transaction(project_id, |tx| async move {
1144            let (project, _) = self
1145                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1146                .await?;
1147            project.host_connection()
1148        })
1149        .await
1150        .map(|guard| guard.into_inner())
1151    }
1152
1153    /// Returns the host connection for a request to join a shared project.
1154    pub async fn host_for_mutating_project_request(
1155        &self,
1156        project_id: ProjectId,
1157        connection_id: ConnectionId,
1158    ) -> Result<ConnectionId> {
1159        self.project_transaction(project_id, |tx| async move {
1160            let (project, _) = self
1161                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1162                .await?;
1163            project.host_connection()
1164        })
1165        .await
1166        .map(|guard| guard.into_inner())
1167    }
1168
1169    pub async fn connections_for_buffer_update(
1170        &self,
1171        project_id: ProjectId,
1172        connection_id: ConnectionId,
1173        capability: Capability,
1174    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1175        self.project_transaction(project_id, |tx| async move {
1176            // Authorize
1177            let (project, _) = self
1178                .access_project(project_id, connection_id, capability, &tx)
1179                .await?;
1180
1181            let host_connection_id = project.host_connection()?;
1182
1183            let collaborators = project_collaborator::Entity::find()
1184                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1185                .all(&*tx)
1186                .await?;
1187
1188            let guest_connection_ids = collaborators
1189                .into_iter()
1190                .filter_map(|collaborator| {
1191                    if collaborator.is_host {
1192                        None
1193                    } else {
1194                        Some(collaborator.connection())
1195                    }
1196                })
1197                .collect();
1198
1199            Ok((host_connection_id, guest_connection_ids))
1200        })
1201        .await
1202    }
1203
1204    /// Returns the connection IDs in the given project.
1205    ///
1206    /// The provided `connection_id` must also be a collaborator in the project,
1207    /// otherwise an error will be returned.
1208    pub async fn project_connection_ids(
1209        &self,
1210        project_id: ProjectId,
1211        connection_id: ConnectionId,
1212        exclude_dev_server: bool,
1213    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1214        self.project_transaction(project_id, |tx| async move {
1215            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1216                .await
1217        })
1218        .await
1219    }
1220
1221    async fn internal_project_connection_ids(
1222        &self,
1223        project_id: ProjectId,
1224        connection_id: ConnectionId,
1225        exclude_dev_server: bool,
1226        tx: &DatabaseTransaction,
1227    ) -> Result<HashSet<ConnectionId>> {
1228        let project = project::Entity::find_by_id(project_id)
1229            .one(tx)
1230            .await?
1231            .context("no such project")?;
1232
1233        let mut collaborators = project_collaborator::Entity::find()
1234            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1235            .stream(tx)
1236            .await?;
1237
1238        let mut connection_ids = HashSet::default();
1239        if let Some(host_connection) = project.host_connection().log_err()
1240            && !exclude_dev_server
1241        {
1242            connection_ids.insert(host_connection);
1243        }
1244
1245        while let Some(collaborator) = collaborators.next().await {
1246            let collaborator = collaborator?;
1247            connection_ids.insert(collaborator.connection());
1248        }
1249
1250        if connection_ids.contains(&connection_id)
1251            || Some(connection_id) == project.host_connection().ok()
1252        {
1253            Ok(connection_ids)
1254        } else {
1255            Err(anyhow!(
1256                "can only send project updates to a project you're in"
1257            ))?
1258        }
1259    }
1260
1261    async fn project_guest_connection_ids(
1262        &self,
1263        project_id: ProjectId,
1264        tx: &DatabaseTransaction,
1265    ) -> Result<Vec<ConnectionId>> {
1266        let mut collaborators = project_collaborator::Entity::find()
1267            .filter(
1268                project_collaborator::Column::ProjectId
1269                    .eq(project_id)
1270                    .and(project_collaborator::Column::IsHost.eq(false)),
1271            )
1272            .stream(tx)
1273            .await?;
1274
1275        let mut guest_connection_ids = Vec::new();
1276        while let Some(collaborator) = collaborators.next().await {
1277            let collaborator = collaborator?;
1278            guest_connection_ids.push(collaborator.connection());
1279        }
1280        Ok(guest_connection_ids)
1281    }
1282
1283    /// Returns the [`RoomId`] for the given project.
1284    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1285        self.transaction(|tx| async move {
1286            Ok(project::Entity::find_by_id(project_id)
1287                .one(&*tx)
1288                .await?
1289                .and_then(|project| project.room_id))
1290        })
1291        .await
1292    }
1293
1294    pub async fn check_room_participants(
1295        &self,
1296        room_id: RoomId,
1297        leader_id: ConnectionId,
1298        follower_id: ConnectionId,
1299    ) -> Result<()> {
1300        self.transaction(|tx| async move {
1301            use room_participant::Column;
1302
1303            let count = room_participant::Entity::find()
1304                .filter(
1305                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1306                        Condition::any()
1307                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1308                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1309                            ))
1310                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1311                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1312                            )),
1313                    ),
1314                )
1315                .count(&*tx)
1316                .await?;
1317
1318            if count < 2 {
1319                Err(anyhow!("not room participants"))?;
1320            }
1321
1322            Ok(())
1323        })
1324        .await
1325    }
1326
1327    /// Adds the given follower connection as a follower of the given leader connection.
1328    pub async fn follow(
1329        &self,
1330        room_id: RoomId,
1331        project_id: ProjectId,
1332        leader_connection: ConnectionId,
1333        follower_connection: ConnectionId,
1334    ) -> Result<TransactionGuard<proto::Room>> {
1335        self.room_transaction(room_id, |tx| async move {
1336            follower::ActiveModel {
1337                room_id: ActiveValue::set(room_id),
1338                project_id: ActiveValue::set(project_id),
1339                leader_connection_server_id: ActiveValue::set(ServerId(
1340                    leader_connection.owner_id as i32,
1341                )),
1342                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1343                follower_connection_server_id: ActiveValue::set(ServerId(
1344                    follower_connection.owner_id as i32,
1345                )),
1346                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1347                ..Default::default()
1348            }
1349            .insert(&*tx)
1350            .await?;
1351
1352            let room = self.get_room(room_id, &tx).await?;
1353            Ok(room)
1354        })
1355        .await
1356    }
1357
1358    /// Removes the given follower connection as a follower of the given leader connection.
1359    pub async fn unfollow(
1360        &self,
1361        room_id: RoomId,
1362        project_id: ProjectId,
1363        leader_connection: ConnectionId,
1364        follower_connection: ConnectionId,
1365    ) -> Result<TransactionGuard<proto::Room>> {
1366        self.room_transaction(room_id, |tx| async move {
1367            follower::Entity::delete_many()
1368                .filter(
1369                    Condition::all()
1370                        .add(follower::Column::RoomId.eq(room_id))
1371                        .add(follower::Column::ProjectId.eq(project_id))
1372                        .add(
1373                            follower::Column::LeaderConnectionServerId
1374                                .eq(leader_connection.owner_id),
1375                        )
1376                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1377                        .add(
1378                            follower::Column::FollowerConnectionServerId
1379                                .eq(follower_connection.owner_id),
1380                        )
1381                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1382                )
1383                .exec(&*tx)
1384                .await?;
1385
1386            let room = self.get_room(room_id, &tx).await?;
1387            Ok(room)
1388        })
1389        .await
1390    }
1391}