projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project { 1 } else { 0 };
  95
  96            project_collaborator::ActiveModel {
  97                project_id: ActiveValue::set(project.id),
  98                connection_id: ActiveValue::set(connection.id as i32),
  99                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 100                user_id: ActiveValue::set(participant.user_id),
 101                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 102                is_host: ActiveValue::set(true),
 103                id: ActiveValue::NotSet,
 104                committer_name: ActiveValue::Set(None),
 105                committer_email: ActiveValue::Set(None),
 106            }
 107            .insert(&*tx)
 108            .await?;
 109
 110            let room = self.get_room(room_id, &tx).await?;
 111            Ok((project.id, room))
 112        })
 113        .await
 114    }
 115
 116    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 117        self.transaction(|tx| async move {
 118            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 119            Ok(())
 120        })
 121        .await
 122    }
 123
 124    /// Unshares the given project.
 125    pub async fn unshare_project(
 126        &self,
 127        project_id: ProjectId,
 128        connection: ConnectionId,
 129    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 130        self.project_transaction(project_id, |tx| async move {
 131            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 132            let project = project::Entity::find_by_id(project_id)
 133                .one(&*tx)
 134                .await?
 135                .context("project not found")?;
 136            let room = if let Some(room_id) = project.room_id {
 137                Some(self.get_room(room_id, &tx).await?)
 138            } else {
 139                None
 140            };
 141            if project.host_connection()? == connection {
 142                return Ok((true, room, guest_connection_ids));
 143            }
 144            Err(anyhow!("cannot unshare a project hosted by another user"))?
 145        })
 146        .await
 147    }
 148
 149    /// Updates the worktrees associated with the given project.
 150    pub async fn update_project(
 151        &self,
 152        project_id: ProjectId,
 153        connection: ConnectionId,
 154        worktrees: &[proto::WorktreeMetadata],
 155    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 156        self.project_transaction(project_id, |tx| async move {
 157            let project = project::Entity::find_by_id(project_id)
 158                .filter(
 159                    Condition::all()
 160                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 161                        .add(
 162                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 163                        ),
 164                )
 165                .one(&*tx)
 166                .await?
 167                .context("no such project")?;
 168
 169            self.update_project_worktrees(project.id, worktrees, &tx)
 170                .await?;
 171
 172            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 173
 174            let room = if let Some(room_id) = project.room_id {
 175                Some(self.get_room(room_id, &tx).await?)
 176            } else {
 177                None
 178            };
 179
 180            Ok((room, guest_connection_ids))
 181        })
 182        .await
 183    }
 184
 185    pub(in crate::db) async fn update_project_worktrees(
 186        &self,
 187        project_id: ProjectId,
 188        worktrees: &[proto::WorktreeMetadata],
 189        tx: &DatabaseTransaction,
 190    ) -> Result<()> {
 191        if !worktrees.is_empty() {
 192            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 193                id: ActiveValue::set(worktree.id as i64),
 194                project_id: ActiveValue::set(project_id),
 195                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 196                root_name: ActiveValue::set(worktree.root_name.clone()),
 197                visible: ActiveValue::set(worktree.visible),
 198                scan_id: ActiveValue::set(0),
 199                completed_scan_id: ActiveValue::set(0),
 200            }))
 201            .on_conflict(
 202                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 203                    .update_column(worktree::Column::RootName)
 204                    .to_owned(),
 205            )
 206            .exec(tx)
 207            .await?;
 208        }
 209
 210        worktree::Entity::delete_many()
 211            .filter(worktree::Column::ProjectId.eq(project_id).and(
 212                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 213            ))
 214            .exec(tx)
 215            .await?;
 216
 217        Ok(())
 218    }
 219
 220    pub async fn update_worktree(
 221        &self,
 222        update: &proto::UpdateWorktree,
 223        connection: ConnectionId,
 224    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 225        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 226            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 227        {
 228            return Err(anyhow!(
 229                "invalid worktree update. removed entries: {}, updated entries: {}",
 230                update.removed_entries.len(),
 231                update.updated_entries.len()
 232            ))?;
 233        }
 234
 235        let project_id = ProjectId::from_proto(update.project_id);
 236        let worktree_id = update.worktree_id as i64;
 237        self.project_transaction(project_id, |tx| async move {
 238            // Ensure the update comes from the host.
 239            let _project = project::Entity::find_by_id(project_id)
 240                .filter(
 241                    Condition::all()
 242                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 243                        .add(
 244                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 245                        ),
 246                )
 247                .one(&*tx)
 248                .await?
 249                .with_context(|| format!("no such project: {project_id}"))?;
 250
 251            // Update metadata.
 252            worktree::Entity::update(worktree::ActiveModel {
 253                id: ActiveValue::set(worktree_id),
 254                project_id: ActiveValue::set(project_id),
 255                root_name: ActiveValue::set(update.root_name.clone()),
 256                scan_id: ActiveValue::set(update.scan_id as i64),
 257                completed_scan_id: if update.is_last_update {
 258                    ActiveValue::set(update.scan_id as i64)
 259                } else {
 260                    ActiveValue::default()
 261                },
 262                abs_path: ActiveValue::set(update.abs_path.clone()),
 263                ..Default::default()
 264            })
 265            .exec(&*tx)
 266            .await?;
 267
 268            if !update.updated_entries.is_empty() {
 269                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 270                    let mtime = entry.mtime.clone().unwrap_or_default();
 271                    worktree_entry::ActiveModel {
 272                        project_id: ActiveValue::set(project_id),
 273                        worktree_id: ActiveValue::set(worktree_id),
 274                        id: ActiveValue::set(entry.id as i64),
 275                        is_dir: ActiveValue::set(entry.is_dir),
 276                        path: ActiveValue::set(entry.path.clone()),
 277                        inode: ActiveValue::set(entry.inode as i64),
 278                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 279                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 280                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 281                        is_ignored: ActiveValue::set(entry.is_ignored),
 282                        git_status: ActiveValue::set(None),
 283                        is_external: ActiveValue::set(entry.is_external),
 284                        is_deleted: ActiveValue::set(false),
 285                        scan_id: ActiveValue::set(update.scan_id as i64),
 286                        is_fifo: ActiveValue::set(entry.is_fifo),
 287                    }
 288                }))
 289                .on_conflict(
 290                    OnConflict::columns([
 291                        worktree_entry::Column::ProjectId,
 292                        worktree_entry::Column::WorktreeId,
 293                        worktree_entry::Column::Id,
 294                    ])
 295                    .update_columns([
 296                        worktree_entry::Column::IsDir,
 297                        worktree_entry::Column::Path,
 298                        worktree_entry::Column::Inode,
 299                        worktree_entry::Column::MtimeSeconds,
 300                        worktree_entry::Column::MtimeNanos,
 301                        worktree_entry::Column::CanonicalPath,
 302                        worktree_entry::Column::IsIgnored,
 303                        worktree_entry::Column::ScanId,
 304                    ])
 305                    .to_owned(),
 306                )
 307                .exec(&*tx)
 308                .await?;
 309            }
 310
 311            if !update.removed_entries.is_empty() {
 312                worktree_entry::Entity::update_many()
 313                    .filter(
 314                        worktree_entry::Column::ProjectId
 315                            .eq(project_id)
 316                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 317                            .and(
 318                                worktree_entry::Column::Id
 319                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 320                            ),
 321                    )
 322                    .set(worktree_entry::ActiveModel {
 323                        is_deleted: ActiveValue::Set(true),
 324                        scan_id: ActiveValue::Set(update.scan_id as i64),
 325                        ..Default::default()
 326                    })
 327                    .exec(&*tx)
 328                    .await?;
 329            }
 330
 331            // Backward-compatibility for old Zed clients.
 332            //
 333            // Remove this block when Zed 1.80 stable has been out for a week.
 334            {
 335                if !update.updated_repositories.is_empty() {
 336                    project_repository::Entity::insert_many(
 337                        update.updated_repositories.iter().map(|repository| {
 338                            project_repository::ActiveModel {
 339                                project_id: ActiveValue::set(project_id),
 340                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 341                                id: ActiveValue::set(repository.repository_id as i64),
 342                                scan_id: ActiveValue::set(update.scan_id as i64),
 343                                is_deleted: ActiveValue::set(false),
 344                                branch_summary: ActiveValue::Set(
 345                                    repository
 346                                        .branch_summary
 347                                        .as_ref()
 348                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 349                                ),
 350                                current_merge_conflicts: ActiveValue::Set(Some(
 351                                    serde_json::to_string(&repository.current_merge_conflicts)
 352                                        .unwrap(),
 353                                )),
 354                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 355                                abs_path: ActiveValue::set(String::new()),
 356                                entry_ids: ActiveValue::set("[]".into()),
 357                                head_commit_details: ActiveValue::set(None),
 358                                merge_message: ActiveValue::set(None),
 359                            }
 360                        }),
 361                    )
 362                    .on_conflict(
 363                        OnConflict::columns([
 364                            project_repository::Column::ProjectId,
 365                            project_repository::Column::Id,
 366                        ])
 367                        .update_columns([
 368                            project_repository::Column::ScanId,
 369                            project_repository::Column::BranchSummary,
 370                            project_repository::Column::CurrentMergeConflicts,
 371                        ])
 372                        .to_owned(),
 373                    )
 374                    .exec(&*tx)
 375                    .await?;
 376
 377                    let has_any_statuses = update
 378                        .updated_repositories
 379                        .iter()
 380                        .any(|repository| !repository.updated_statuses.is_empty());
 381
 382                    if has_any_statuses {
 383                        project_repository_statuses::Entity::insert_many(
 384                            update.updated_repositories.iter().flat_map(
 385                                |repository: &proto::RepositoryEntry| {
 386                                    repository.updated_statuses.iter().map(|status_entry| {
 387                                        let (repo_path, status_kind, first_status, second_status) =
 388                                            proto_status_to_db(status_entry.clone());
 389                                        project_repository_statuses::ActiveModel {
 390                                            project_id: ActiveValue::set(project_id),
 391                                            repository_id: ActiveValue::set(
 392                                                repository.repository_id as i64,
 393                                            ),
 394                                            scan_id: ActiveValue::set(update.scan_id as i64),
 395                                            is_deleted: ActiveValue::set(false),
 396                                            repo_path: ActiveValue::set(repo_path),
 397                                            status: ActiveValue::set(0),
 398                                            status_kind: ActiveValue::set(status_kind),
 399                                            first_status: ActiveValue::set(first_status),
 400                                            second_status: ActiveValue::set(second_status),
 401                                        }
 402                                    })
 403                                },
 404                            ),
 405                        )
 406                        .on_conflict(
 407                            OnConflict::columns([
 408                                project_repository_statuses::Column::ProjectId,
 409                                project_repository_statuses::Column::RepositoryId,
 410                                project_repository_statuses::Column::RepoPath,
 411                            ])
 412                            .update_columns([
 413                                project_repository_statuses::Column::ScanId,
 414                                project_repository_statuses::Column::StatusKind,
 415                                project_repository_statuses::Column::FirstStatus,
 416                                project_repository_statuses::Column::SecondStatus,
 417                            ])
 418                            .to_owned(),
 419                        )
 420                        .exec(&*tx)
 421                        .await?;
 422                    }
 423
 424                    for repo in &update.updated_repositories {
 425                        if !repo.removed_statuses.is_empty() {
 426                            project_repository_statuses::Entity::update_many()
 427                                .filter(
 428                                    project_repository_statuses::Column::ProjectId
 429                                        .eq(project_id)
 430                                        .and(
 431                                            project_repository_statuses::Column::RepositoryId
 432                                                .eq(repo.repository_id),
 433                                        )
 434                                        .and(
 435                                            project_repository_statuses::Column::RepoPath
 436                                                .is_in(repo.removed_statuses.iter()),
 437                                        ),
 438                                )
 439                                .set(project_repository_statuses::ActiveModel {
 440                                    is_deleted: ActiveValue::Set(true),
 441                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 442                                    ..Default::default()
 443                                })
 444                                .exec(&*tx)
 445                                .await?;
 446                        }
 447                    }
 448                }
 449
 450                if !update.removed_repositories.is_empty() {
 451                    project_repository::Entity::update_many()
 452                        .filter(
 453                            project_repository::Column::ProjectId
 454                                .eq(project_id)
 455                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 456                                .and(project_repository::Column::Id.is_in(
 457                                    update.removed_repositories.iter().map(|id| *id as i64),
 458                                )),
 459                        )
 460                        .set(project_repository::ActiveModel {
 461                            is_deleted: ActiveValue::Set(true),
 462                            scan_id: ActiveValue::Set(update.scan_id as i64),
 463                            ..Default::default()
 464                        })
 465                        .exec(&*tx)
 466                        .await?;
 467                }
 468            }
 469
 470            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 471            Ok(connection_ids)
 472        })
 473        .await
 474    }
 475
 476    pub async fn update_repository(
 477        &self,
 478        update: &proto::UpdateRepository,
 479        _connection: ConnectionId,
 480    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 481        let project_id = ProjectId::from_proto(update.project_id);
 482        let repository_id = update.id as i64;
 483        self.project_transaction(project_id, |tx| async move {
 484            project_repository::Entity::insert(project_repository::ActiveModel {
 485                project_id: ActiveValue::set(project_id),
 486                id: ActiveValue::set(repository_id),
 487                legacy_worktree_id: ActiveValue::set(None),
 488                abs_path: ActiveValue::set(update.abs_path.clone()),
 489                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 490                scan_id: ActiveValue::set(update.scan_id as i64),
 491                is_deleted: ActiveValue::set(false),
 492                branch_summary: ActiveValue::Set(
 493                    update
 494                        .branch_summary
 495                        .as_ref()
 496                        .map(|summary| serde_json::to_string(summary).unwrap()),
 497                ),
 498                head_commit_details: ActiveValue::Set(
 499                    update
 500                        .head_commit_details
 501                        .as_ref()
 502                        .map(|details| serde_json::to_string(details).unwrap()),
 503                ),
 504                current_merge_conflicts: ActiveValue::Set(Some(
 505                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 506                )),
 507                merge_message: ActiveValue::set(update.merge_message.clone()),
 508            })
 509            .on_conflict(
 510                OnConflict::columns([
 511                    project_repository::Column::ProjectId,
 512                    project_repository::Column::Id,
 513                ])
 514                .update_columns([
 515                    project_repository::Column::ScanId,
 516                    project_repository::Column::BranchSummary,
 517                    project_repository::Column::EntryIds,
 518                    project_repository::Column::AbsPath,
 519                    project_repository::Column::CurrentMergeConflicts,
 520                    project_repository::Column::HeadCommitDetails,
 521                    project_repository::Column::MergeMessage,
 522                ])
 523                .to_owned(),
 524            )
 525            .exec(&*tx)
 526            .await?;
 527
 528            let has_any_statuses = !update.updated_statuses.is_empty();
 529
 530            if has_any_statuses {
 531                project_repository_statuses::Entity::insert_many(
 532                    update.updated_statuses.iter().map(|status_entry| {
 533                        let (repo_path, status_kind, first_status, second_status) =
 534                            proto_status_to_db(status_entry.clone());
 535                        project_repository_statuses::ActiveModel {
 536                            project_id: ActiveValue::set(project_id),
 537                            repository_id: ActiveValue::set(repository_id),
 538                            scan_id: ActiveValue::set(update.scan_id as i64),
 539                            is_deleted: ActiveValue::set(false),
 540                            repo_path: ActiveValue::set(repo_path),
 541                            status: ActiveValue::set(0),
 542                            status_kind: ActiveValue::set(status_kind),
 543                            first_status: ActiveValue::set(first_status),
 544                            second_status: ActiveValue::set(second_status),
 545                        }
 546                    }),
 547                )
 548                .on_conflict(
 549                    OnConflict::columns([
 550                        project_repository_statuses::Column::ProjectId,
 551                        project_repository_statuses::Column::RepositoryId,
 552                        project_repository_statuses::Column::RepoPath,
 553                    ])
 554                    .update_columns([
 555                        project_repository_statuses::Column::ScanId,
 556                        project_repository_statuses::Column::StatusKind,
 557                        project_repository_statuses::Column::FirstStatus,
 558                        project_repository_statuses::Column::SecondStatus,
 559                    ])
 560                    .to_owned(),
 561                )
 562                .exec(&*tx)
 563                .await?;
 564            }
 565
 566            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 567
 568            if has_any_removed_statuses {
 569                project_repository_statuses::Entity::update_many()
 570                    .filter(
 571                        project_repository_statuses::Column::ProjectId
 572                            .eq(project_id)
 573                            .and(
 574                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 575                            )
 576                            .and(
 577                                project_repository_statuses::Column::RepoPath
 578                                    .is_in(update.removed_statuses.iter()),
 579                            ),
 580                    )
 581                    .set(project_repository_statuses::ActiveModel {
 582                        is_deleted: ActiveValue::Set(true),
 583                        scan_id: ActiveValue::Set(update.scan_id as i64),
 584                        ..Default::default()
 585                    })
 586                    .exec(&*tx)
 587                    .await?;
 588            }
 589
 590            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 591            Ok(connection_ids)
 592        })
 593        .await
 594    }
 595
 596    pub async fn remove_repository(
 597        &self,
 598        remove: &proto::RemoveRepository,
 599        _connection: ConnectionId,
 600    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 601        let project_id = ProjectId::from_proto(remove.project_id);
 602        let repository_id = remove.id as i64;
 603        self.project_transaction(project_id, |tx| async move {
 604            project_repository::Entity::update_many()
 605                .filter(
 606                    project_repository::Column::ProjectId
 607                        .eq(project_id)
 608                        .and(project_repository::Column::Id.eq(repository_id)),
 609                )
 610                .set(project_repository::ActiveModel {
 611                    is_deleted: ActiveValue::Set(true),
 612                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 613                    ..Default::default()
 614                })
 615                .exec(&*tx)
 616                .await?;
 617
 618            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 619            Ok(connection_ids)
 620        })
 621        .await
 622    }
 623
 624    /// Updates the diagnostic summary for the given connection.
 625    pub async fn update_diagnostic_summary(
 626        &self,
 627        update: &proto::UpdateDiagnosticSummary,
 628        connection: ConnectionId,
 629    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 630        let project_id = ProjectId::from_proto(update.project_id);
 631        let worktree_id = update.worktree_id as i64;
 632        self.project_transaction(project_id, |tx| async move {
 633            let summary = update.summary.as_ref().context("invalid summary")?;
 634
 635            // Ensure the update comes from the host.
 636            let project = project::Entity::find_by_id(project_id)
 637                .one(&*tx)
 638                .await?
 639                .context("no such project")?;
 640            if project.host_connection()? != connection {
 641                return Err(anyhow!("can't update a project hosted by someone else"))?;
 642            }
 643
 644            // Update summary.
 645            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 646                project_id: ActiveValue::set(project_id),
 647                worktree_id: ActiveValue::set(worktree_id),
 648                path: ActiveValue::set(summary.path.clone()),
 649                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 650                error_count: ActiveValue::set(summary.error_count as i32),
 651                warning_count: ActiveValue::set(summary.warning_count as i32),
 652            })
 653            .on_conflict(
 654                OnConflict::columns([
 655                    worktree_diagnostic_summary::Column::ProjectId,
 656                    worktree_diagnostic_summary::Column::WorktreeId,
 657                    worktree_diagnostic_summary::Column::Path,
 658                ])
 659                .update_columns([
 660                    worktree_diagnostic_summary::Column::LanguageServerId,
 661                    worktree_diagnostic_summary::Column::ErrorCount,
 662                    worktree_diagnostic_summary::Column::WarningCount,
 663                ])
 664                .to_owned(),
 665            )
 666            .exec(&*tx)
 667            .await?;
 668
 669            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 670            Ok(connection_ids)
 671        })
 672        .await
 673    }
 674
 675    /// Starts the language server for the given connection.
 676    pub async fn start_language_server(
 677        &self,
 678        update: &proto::StartLanguageServer,
 679        connection: ConnectionId,
 680    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 681        let project_id = ProjectId::from_proto(update.project_id);
 682        self.project_transaction(project_id, |tx| async move {
 683            let server = update.server.as_ref().context("invalid language server")?;
 684
 685            // Ensure the update comes from the host.
 686            let project = project::Entity::find_by_id(project_id)
 687                .one(&*tx)
 688                .await?
 689                .context("no such project")?;
 690            if project.host_connection()? != connection {
 691                return Err(anyhow!("can't update a project hosted by someone else"))?;
 692            }
 693
 694            // Add the newly-started language server.
 695            language_server::Entity::insert(language_server::ActiveModel {
 696                project_id: ActiveValue::set(project_id),
 697                id: ActiveValue::set(server.id as i64),
 698                name: ActiveValue::set(server.name.clone()),
 699                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 700                capabilities: ActiveValue::set(update.capabilities.clone()),
 701            })
 702            .on_conflict(
 703                OnConflict::columns([
 704                    language_server::Column::ProjectId,
 705                    language_server::Column::Id,
 706                ])
 707                .update_columns([
 708                    language_server::Column::Name,
 709                    language_server::Column::Capabilities,
 710                    language_server::Column::WorktreeId,
 711                ])
 712                .to_owned(),
 713            )
 714            .exec(&*tx)
 715            .await?;
 716
 717            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 718            Ok(connection_ids)
 719        })
 720        .await
 721    }
 722
 723    /// Updates the worktree settings for the given connection.
 724    pub async fn update_worktree_settings(
 725        &self,
 726        update: &proto::UpdateWorktreeSettings,
 727        connection: ConnectionId,
 728    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 729        let project_id = ProjectId::from_proto(update.project_id);
 730        let kind = match update.kind {
 731            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 732                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 733            None => proto::LocalSettingsKind::Settings,
 734        };
 735        let kind = LocalSettingsKind::from_proto(kind);
 736        self.project_transaction(project_id, |tx| async move {
 737            // Ensure the update comes from the host.
 738            let project = project::Entity::find_by_id(project_id)
 739                .one(&*tx)
 740                .await?
 741                .context("no such project")?;
 742            if project.host_connection()? != connection {
 743                return Err(anyhow!("can't update a project hosted by someone else"))?;
 744            }
 745
 746            if let Some(content) = &update.content {
 747                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 748                    project_id: ActiveValue::Set(project_id),
 749                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 750                    path: ActiveValue::Set(update.path.clone()),
 751                    content: ActiveValue::Set(content.clone()),
 752                    kind: ActiveValue::Set(kind),
 753                })
 754                .on_conflict(
 755                    OnConflict::columns([
 756                        worktree_settings_file::Column::ProjectId,
 757                        worktree_settings_file::Column::WorktreeId,
 758                        worktree_settings_file::Column::Path,
 759                    ])
 760                    .update_column(worktree_settings_file::Column::Content)
 761                    .to_owned(),
 762                )
 763                .exec(&*tx)
 764                .await?;
 765            } else {
 766                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 767                    project_id: ActiveValue::Set(project_id),
 768                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 769                    path: ActiveValue::Set(update.path.clone()),
 770                    ..Default::default()
 771                })
 772                .exec(&*tx)
 773                .await?;
 774            }
 775
 776            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 777            Ok(connection_ids)
 778        })
 779        .await
 780    }
 781
 782    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 783        self.transaction(|tx| async move {
 784            Ok(project::Entity::find_by_id(id)
 785                .one(&*tx)
 786                .await?
 787                .context("no such project")?)
 788        })
 789        .await
 790    }
 791
 792    /// Adds the given connection to the specified project
 793    /// in the current room.
 794    pub async fn join_project(
 795        &self,
 796        project_id: ProjectId,
 797        connection: ConnectionId,
 798        user_id: UserId,
 799        committer_name: Option<String>,
 800        committer_email: Option<String>,
 801    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 802        self.project_transaction(project_id, move |tx| {
 803            let committer_name = committer_name.clone();
 804            let committer_email = committer_email.clone();
 805            async move {
 806                let (project, role) = self
 807                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 808                    .await?;
 809                self.join_project_internal(
 810                    project,
 811                    user_id,
 812                    committer_name,
 813                    committer_email,
 814                    connection,
 815                    role,
 816                    &tx,
 817                )
 818                .await
 819            }
 820        })
 821        .await
 822    }
 823
 824    async fn join_project_internal(
 825        &self,
 826        project: project::Model,
 827        user_id: UserId,
 828        committer_name: Option<String>,
 829        committer_email: Option<String>,
 830        connection: ConnectionId,
 831        role: ChannelRole,
 832        tx: &DatabaseTransaction,
 833    ) -> Result<(Project, ReplicaId)> {
 834        let mut collaborators = project
 835            .find_related(project_collaborator::Entity)
 836            .all(tx)
 837            .await?;
 838        let replica_ids = collaborators
 839            .iter()
 840            .map(|c| c.replica_id)
 841            .collect::<HashSet<_>>();
 842        let mut replica_id = ReplicaId(1);
 843        while replica_ids.contains(&replica_id) {
 844            replica_id.0 += 1;
 845        }
 846        let new_collaborator = project_collaborator::ActiveModel {
 847            project_id: ActiveValue::set(project.id),
 848            connection_id: ActiveValue::set(connection.id as i32),
 849            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 850            user_id: ActiveValue::set(user_id),
 851            replica_id: ActiveValue::set(replica_id),
 852            is_host: ActiveValue::set(false),
 853            id: ActiveValue::NotSet,
 854            committer_name: ActiveValue::set(committer_name),
 855            committer_email: ActiveValue::set(committer_email),
 856        }
 857        .insert(tx)
 858        .await?;
 859        collaborators.push(new_collaborator);
 860
 861        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 862        let mut worktrees = db_worktrees
 863            .into_iter()
 864            .map(|db_worktree| {
 865                (
 866                    db_worktree.id as u64,
 867                    Worktree {
 868                        id: db_worktree.id as u64,
 869                        abs_path: db_worktree.abs_path,
 870                        root_name: db_worktree.root_name,
 871                        visible: db_worktree.visible,
 872                        entries: Default::default(),
 873                        diagnostic_summaries: Default::default(),
 874                        settings_files: Default::default(),
 875                        scan_id: db_worktree.scan_id as u64,
 876                        completed_scan_id: db_worktree.completed_scan_id as u64,
 877                        legacy_repository_entries: Default::default(),
 878                    },
 879                )
 880            })
 881            .collect::<BTreeMap<_, _>>();
 882
 883        // Populate worktree entries.
 884        {
 885            let mut db_entries = worktree_entry::Entity::find()
 886                .filter(
 887                    Condition::all()
 888                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 889                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 890                )
 891                .stream(tx)
 892                .await?;
 893            while let Some(db_entry) = db_entries.next().await {
 894                let db_entry = db_entry?;
 895                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 896                    worktree.entries.push(proto::Entry {
 897                        id: db_entry.id as u64,
 898                        is_dir: db_entry.is_dir,
 899                        path: db_entry.path,
 900                        inode: db_entry.inode as u64,
 901                        mtime: Some(proto::Timestamp {
 902                            seconds: db_entry.mtime_seconds as u64,
 903                            nanos: db_entry.mtime_nanos as u32,
 904                        }),
 905                        canonical_path: db_entry.canonical_path,
 906                        is_ignored: db_entry.is_ignored,
 907                        is_external: db_entry.is_external,
 908                        // This is only used in the summarization backlog, so if it's None,
 909                        // that just means we won't be able to detect when to resummarize
 910                        // based on total number of backlogged bytes - instead, we'd go
 911                        // on number of files only. That shouldn't be a huge deal in practice.
 912                        size: None,
 913                        is_fifo: db_entry.is_fifo,
 914                    });
 915                }
 916            }
 917        }
 918
 919        // Populate repository entries.
 920        let mut repositories = Vec::new();
 921        {
 922            let db_repository_entries = project_repository::Entity::find()
 923                .filter(
 924                    Condition::all()
 925                        .add(project_repository::Column::ProjectId.eq(project.id))
 926                        .add(project_repository::Column::IsDeleted.eq(false)),
 927                )
 928                .all(tx)
 929                .await?;
 930            for db_repository_entry in db_repository_entries {
 931                let mut repository_statuses = project_repository_statuses::Entity::find()
 932                    .filter(
 933                        Condition::all()
 934                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 935                            .add(
 936                                project_repository_statuses::Column::RepositoryId
 937                                    .eq(db_repository_entry.id),
 938                            )
 939                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 940                    )
 941                    .stream(tx)
 942                    .await?;
 943                let mut updated_statuses = Vec::new();
 944                while let Some(status_entry) = repository_statuses.next().await {
 945                    let status_entry = status_entry?;
 946                    updated_statuses.push(db_status_to_proto(status_entry)?);
 947                }
 948
 949                let current_merge_conflicts = db_repository_entry
 950                    .current_merge_conflicts
 951                    .as_ref()
 952                    .map(|conflicts| serde_json::from_str(conflicts))
 953                    .transpose()?
 954                    .unwrap_or_default();
 955
 956                let branch_summary = db_repository_entry
 957                    .branch_summary
 958                    .as_ref()
 959                    .map(|branch_summary| serde_json::from_str(branch_summary))
 960                    .transpose()?
 961                    .unwrap_or_default();
 962
 963                let head_commit_details = db_repository_entry
 964                    .head_commit_details
 965                    .as_ref()
 966                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 967                    .transpose()?
 968                    .unwrap_or_default();
 969
 970                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 971                    .context("failed to deserialize repository's entry ids")?;
 972
 973                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 974                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 975                        worktree.legacy_repository_entries.insert(
 976                            db_repository_entry.id as u64,
 977                            proto::RepositoryEntry {
 978                                repository_id: db_repository_entry.id as u64,
 979                                updated_statuses,
 980                                removed_statuses: Vec::new(),
 981                                current_merge_conflicts,
 982                                branch_summary,
 983                            },
 984                        );
 985                    }
 986                } else {
 987                    repositories.push(proto::UpdateRepository {
 988                        project_id: db_repository_entry.project_id.0 as u64,
 989                        id: db_repository_entry.id as u64,
 990                        abs_path: db_repository_entry.abs_path,
 991                        entry_ids,
 992                        updated_statuses,
 993                        removed_statuses: Vec::new(),
 994                        current_merge_conflicts,
 995                        branch_summary,
 996                        head_commit_details,
 997                        scan_id: db_repository_entry.scan_id as u64,
 998                        is_last_update: true,
 999                        merge_message: db_repository_entry.merge_message,
1000                        stash_entries: Vec::new(),
1001                    });
1002                }
1003            }
1004        }
1005
1006        // Populate worktree diagnostic summaries.
1007        {
1008            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1009                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1010                .stream(tx)
1011                .await?;
1012            while let Some(db_summary) = db_summaries.next().await {
1013                let db_summary = db_summary?;
1014                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1015                    worktree
1016                        .diagnostic_summaries
1017                        .push(proto::DiagnosticSummary {
1018                            path: db_summary.path,
1019                            language_server_id: db_summary.language_server_id as u64,
1020                            error_count: db_summary.error_count as u32,
1021                            warning_count: db_summary.warning_count as u32,
1022                        });
1023                }
1024            }
1025        }
1026
1027        // Populate worktree settings files
1028        {
1029            let mut db_settings_files = worktree_settings_file::Entity::find()
1030                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1031                .stream(tx)
1032                .await?;
1033            while let Some(db_settings_file) = db_settings_files.next().await {
1034                let db_settings_file = db_settings_file?;
1035                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1036                    worktree.settings_files.push(WorktreeSettingsFile {
1037                        path: db_settings_file.path,
1038                        content: db_settings_file.content,
1039                        kind: db_settings_file.kind,
1040                    });
1041                }
1042            }
1043        }
1044
1045        // Populate language servers.
1046        let language_servers = project
1047            .find_related(language_server::Entity)
1048            .all(tx)
1049            .await?;
1050
1051        let path_style = if project.windows_paths {
1052            PathStyle::Windows
1053        } else {
1054            PathStyle::Posix
1055        };
1056
1057        let project = Project {
1058            id: project.id,
1059            role,
1060            collaborators: collaborators
1061                .into_iter()
1062                .map(|collaborator| ProjectCollaborator {
1063                    connection_id: collaborator.connection(),
1064                    user_id: collaborator.user_id,
1065                    replica_id: collaborator.replica_id,
1066                    is_host: collaborator.is_host,
1067                    committer_name: collaborator.committer_name,
1068                    committer_email: collaborator.committer_email,
1069                })
1070                .collect(),
1071            worktrees,
1072            repositories,
1073            language_servers: language_servers
1074                .into_iter()
1075                .map(|language_server| LanguageServer {
1076                    server: proto::LanguageServer {
1077                        id: language_server.id as u64,
1078                        name: language_server.name,
1079                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1080                    },
1081                    capabilities: language_server.capabilities,
1082                })
1083                .collect(),
1084            path_style,
1085        };
1086        Ok((project, replica_id as ReplicaId))
1087    }
1088
1089    /// Removes the given connection from the specified project.
1090    pub async fn leave_project(
1091        &self,
1092        project_id: ProjectId,
1093        connection: ConnectionId,
1094    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1095        self.project_transaction(project_id, |tx| async move {
1096            let result = project_collaborator::Entity::delete_many()
1097                .filter(
1098                    Condition::all()
1099                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1100                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1101                        .add(
1102                            project_collaborator::Column::ConnectionServerId
1103                                .eq(connection.owner_id as i32),
1104                        ),
1105                )
1106                .exec(&*tx)
1107                .await?;
1108            if result.rows_affected == 0 {
1109                Err(anyhow!("not a collaborator on this project"))?;
1110            }
1111
1112            let project = project::Entity::find_by_id(project_id)
1113                .one(&*tx)
1114                .await?
1115                .context("no such project")?;
1116            let collaborators = project
1117                .find_related(project_collaborator::Entity)
1118                .all(&*tx)
1119                .await?;
1120            let connection_ids: Vec<ConnectionId> = collaborators
1121                .into_iter()
1122                .map(|collaborator| collaborator.connection())
1123                .collect();
1124
1125            follower::Entity::delete_many()
1126                .filter(
1127                    Condition::any()
1128                        .add(
1129                            Condition::all()
1130                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1131                                .add(
1132                                    follower::Column::LeaderConnectionServerId
1133                                        .eq(connection.owner_id),
1134                                )
1135                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1136                        )
1137                        .add(
1138                            Condition::all()
1139                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1140                                .add(
1141                                    follower::Column::FollowerConnectionServerId
1142                                        .eq(connection.owner_id),
1143                                )
1144                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1145                        ),
1146                )
1147                .exec(&*tx)
1148                .await?;
1149
1150            let room = if let Some(room_id) = project.room_id {
1151                Some(self.get_room(room_id, &tx).await?)
1152            } else {
1153                None
1154            };
1155
1156            let left_project = LeftProject {
1157                id: project_id,
1158                should_unshare: connection == project.host_connection()?,
1159                connection_ids,
1160            };
1161            Ok((room, left_project))
1162        })
1163        .await
1164    }
1165
1166    pub async fn check_user_is_project_host(
1167        &self,
1168        project_id: ProjectId,
1169        connection_id: ConnectionId,
1170    ) -> Result<()> {
1171        self.project_transaction(project_id, |tx| async move {
1172            project::Entity::find()
1173                .filter(
1174                    Condition::all()
1175                        .add(project::Column::Id.eq(project_id))
1176                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1177                        .add(
1178                            project::Column::HostConnectionServerId
1179                                .eq(Some(connection_id.owner_id as i32)),
1180                        ),
1181                )
1182                .one(&*tx)
1183                .await?
1184                .context("failed to read project host")?;
1185
1186            Ok(())
1187        })
1188        .await
1189        .map(|guard| guard.into_inner())
1190    }
1191
1192    /// Returns the current project if the given user is authorized to access it with the specified capability.
1193    pub async fn access_project(
1194        &self,
1195        project_id: ProjectId,
1196        connection_id: ConnectionId,
1197        capability: Capability,
1198        tx: &DatabaseTransaction,
1199    ) -> Result<(project::Model, ChannelRole)> {
1200        let project = project::Entity::find_by_id(project_id)
1201            .one(tx)
1202            .await?
1203            .context("no such project")?;
1204
1205        let role_from_room = if let Some(room_id) = project.room_id {
1206            room_participant::Entity::find()
1207                .filter(room_participant::Column::RoomId.eq(room_id))
1208                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1209                .one(tx)
1210                .await?
1211                .and_then(|participant| participant.role)
1212        } else {
1213            None
1214        };
1215
1216        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1217
1218        match capability {
1219            Capability::ReadWrite => {
1220                if !role.can_edit_projects() {
1221                    return Err(anyhow!("not authorized to edit projects"))?;
1222                }
1223            }
1224            Capability::ReadOnly => {
1225                if !role.can_read_projects() {
1226                    return Err(anyhow!("not authorized to read projects"))?;
1227                }
1228            }
1229        }
1230
1231        Ok((project, role))
1232    }
1233
1234    /// Returns the host connection for a read-only request to join a shared project.
1235    pub async fn host_for_read_only_project_request(
1236        &self,
1237        project_id: ProjectId,
1238        connection_id: ConnectionId,
1239    ) -> Result<ConnectionId> {
1240        self.project_transaction(project_id, |tx| async move {
1241            let (project, _) = self
1242                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1243                .await?;
1244            project.host_connection()
1245        })
1246        .await
1247        .map(|guard| guard.into_inner())
1248    }
1249
1250    /// Returns the host connection for a request to join a shared project.
1251    pub async fn host_for_mutating_project_request(
1252        &self,
1253        project_id: ProjectId,
1254        connection_id: ConnectionId,
1255    ) -> Result<ConnectionId> {
1256        self.project_transaction(project_id, |tx| async move {
1257            let (project, _) = self
1258                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1259                .await?;
1260            project.host_connection()
1261        })
1262        .await
1263        .map(|guard| guard.into_inner())
1264    }
1265
1266    pub async fn connections_for_buffer_update(
1267        &self,
1268        project_id: ProjectId,
1269        connection_id: ConnectionId,
1270        capability: Capability,
1271    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1272        self.project_transaction(project_id, |tx| async move {
1273            // Authorize
1274            let (project, _) = self
1275                .access_project(project_id, connection_id, capability, &tx)
1276                .await?;
1277
1278            let host_connection_id = project.host_connection()?;
1279
1280            let collaborators = project_collaborator::Entity::find()
1281                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1282                .all(&*tx)
1283                .await?;
1284
1285            let guest_connection_ids = collaborators
1286                .into_iter()
1287                .filter_map(|collaborator| {
1288                    if collaborator.is_host {
1289                        None
1290                    } else {
1291                        Some(collaborator.connection())
1292                    }
1293                })
1294                .collect();
1295
1296            Ok((host_connection_id, guest_connection_ids))
1297        })
1298        .await
1299    }
1300
1301    /// Returns the connection IDs in the given project.
1302    ///
1303    /// The provided `connection_id` must also be a collaborator in the project,
1304    /// otherwise an error will be returned.
1305    pub async fn project_connection_ids(
1306        &self,
1307        project_id: ProjectId,
1308        connection_id: ConnectionId,
1309        exclude_dev_server: bool,
1310    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1311        self.project_transaction(project_id, |tx| async move {
1312            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1313                .await
1314        })
1315        .await
1316    }
1317
1318    async fn internal_project_connection_ids(
1319        &self,
1320        project_id: ProjectId,
1321        connection_id: ConnectionId,
1322        exclude_dev_server: bool,
1323        tx: &DatabaseTransaction,
1324    ) -> Result<HashSet<ConnectionId>> {
1325        let project = project::Entity::find_by_id(project_id)
1326            .one(tx)
1327            .await?
1328            .context("no such project")?;
1329
1330        let mut collaborators = project_collaborator::Entity::find()
1331            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1332            .stream(tx)
1333            .await?;
1334
1335        let mut connection_ids = HashSet::default();
1336        if let Some(host_connection) = project.host_connection().log_err()
1337            && !exclude_dev_server
1338        {
1339            connection_ids.insert(host_connection);
1340        }
1341
1342        while let Some(collaborator) = collaborators.next().await {
1343            let collaborator = collaborator?;
1344            connection_ids.insert(collaborator.connection());
1345        }
1346
1347        if connection_ids.contains(&connection_id)
1348            || Some(connection_id) == project.host_connection().ok()
1349        {
1350            Ok(connection_ids)
1351        } else {
1352            Err(anyhow!(
1353                "can only send project updates to a project you're in"
1354            ))?
1355        }
1356    }
1357
1358    async fn project_guest_connection_ids(
1359        &self,
1360        project_id: ProjectId,
1361        tx: &DatabaseTransaction,
1362    ) -> Result<Vec<ConnectionId>> {
1363        let mut collaborators = project_collaborator::Entity::find()
1364            .filter(
1365                project_collaborator::Column::ProjectId
1366                    .eq(project_id)
1367                    .and(project_collaborator::Column::IsHost.eq(false)),
1368            )
1369            .stream(tx)
1370            .await?;
1371
1372        let mut guest_connection_ids = Vec::new();
1373        while let Some(collaborator) = collaborators.next().await {
1374            let collaborator = collaborator?;
1375            guest_connection_ids.push(collaborator.connection());
1376        }
1377        Ok(guest_connection_ids)
1378    }
1379
1380    /// Returns the [`RoomId`] for the given project.
1381    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1382        self.transaction(|tx| async move {
1383            Ok(project::Entity::find_by_id(project_id)
1384                .one(&*tx)
1385                .await?
1386                .and_then(|project| project.room_id))
1387        })
1388        .await
1389    }
1390
1391    pub async fn check_room_participants(
1392        &self,
1393        room_id: RoomId,
1394        leader_id: ConnectionId,
1395        follower_id: ConnectionId,
1396    ) -> Result<()> {
1397        self.transaction(|tx| async move {
1398            use room_participant::Column;
1399
1400            let count = room_participant::Entity::find()
1401                .filter(
1402                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1403                        Condition::any()
1404                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1405                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1406                            ))
1407                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1408                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1409                            )),
1410                    ),
1411                )
1412                .count(&*tx)
1413                .await?;
1414
1415            if count < 2 {
1416                Err(anyhow!("not room participants"))?;
1417            }
1418
1419            Ok(())
1420        })
1421        .await
1422    }
1423
1424    /// Adds the given follower connection as a follower of the given leader connection.
1425    pub async fn follow(
1426        &self,
1427        room_id: RoomId,
1428        project_id: ProjectId,
1429        leader_connection: ConnectionId,
1430        follower_connection: ConnectionId,
1431    ) -> Result<TransactionGuard<proto::Room>> {
1432        self.room_transaction(room_id, |tx| async move {
1433            follower::ActiveModel {
1434                room_id: ActiveValue::set(room_id),
1435                project_id: ActiveValue::set(project_id),
1436                leader_connection_server_id: ActiveValue::set(ServerId(
1437                    leader_connection.owner_id as i32,
1438                )),
1439                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1440                follower_connection_server_id: ActiveValue::set(ServerId(
1441                    follower_connection.owner_id as i32,
1442                )),
1443                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1444                ..Default::default()
1445            }
1446            .insert(&*tx)
1447            .await?;
1448
1449            let room = self.get_room(room_id, &tx).await?;
1450            Ok(room)
1451        })
1452        .await
1453    }
1454
1455    /// Removes the given follower connection as a follower of the given leader connection.
1456    pub async fn unfollow(
1457        &self,
1458        room_id: RoomId,
1459        project_id: ProjectId,
1460        leader_connection: ConnectionId,
1461        follower_connection: ConnectionId,
1462    ) -> Result<TransactionGuard<proto::Room>> {
1463        self.room_transaction(room_id, |tx| async move {
1464            follower::Entity::delete_many()
1465                .filter(
1466                    Condition::all()
1467                        .add(follower::Column::RoomId.eq(room_id))
1468                        .add(follower::Column::ProjectId.eq(project_id))
1469                        .add(
1470                            follower::Column::LeaderConnectionServerId
1471                                .eq(leader_connection.owner_id),
1472                        )
1473                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1474                        .add(
1475                            follower::Column::FollowerConnectionServerId
1476                                .eq(follower_connection.owner_id),
1477                        )
1478                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1479                )
1480                .exec(&*tx)
1481                .await?;
1482
1483            let room = self.get_room(room_id, &tx).await?;
1484            Ok(room)
1485        })
1486        .await
1487    }
1488}