projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project {
  95                clock::ReplicaId::REMOTE_SERVER
  96            } else {
  97                clock::ReplicaId::LOCAL
  98            };
  99
 100            project_collaborator::ActiveModel {
 101                project_id: ActiveValue::set(project.id),
 102                connection_id: ActiveValue::set(connection.id as i32),
 103                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 104                user_id: ActiveValue::set(participant.user_id),
 105                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 106                is_host: ActiveValue::set(true),
 107                id: ActiveValue::NotSet,
 108                committer_name: ActiveValue::Set(None),
 109                committer_email: ActiveValue::Set(None),
 110            }
 111            .insert(&*tx)
 112            .await?;
 113
 114            let room = self.get_room(room_id, &tx).await?;
 115            Ok((project.id, room))
 116        })
 117        .await
 118    }
 119
 120    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 121        self.transaction(|tx| async move {
 122            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 123            Ok(())
 124        })
 125        .await
 126    }
 127
 128    /// Unshares the given project.
 129    pub async fn unshare_project(
 130        &self,
 131        project_id: ProjectId,
 132        connection: ConnectionId,
 133    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 134        self.project_transaction(project_id, |tx| async move {
 135            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 136            let project = project::Entity::find_by_id(project_id)
 137                .one(&*tx)
 138                .await?
 139                .context("project not found")?;
 140            let room = if let Some(room_id) = project.room_id {
 141                Some(self.get_room(room_id, &tx).await?)
 142            } else {
 143                None
 144            };
 145            if project.host_connection()? == connection {
 146                return Ok((true, room, guest_connection_ids));
 147            }
 148            Err(anyhow!("cannot unshare a project hosted by another user"))?
 149        })
 150        .await
 151    }
 152
 153    /// Updates the worktrees associated with the given project.
 154    pub async fn update_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        worktrees: &[proto::WorktreeMetadata],
 159    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let project = project::Entity::find_by_id(project_id)
 162                .filter(
 163                    Condition::all()
 164                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 165                        .add(
 166                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 167                        ),
 168                )
 169                .one(&*tx)
 170                .await?
 171                .context("no such project")?;
 172
 173            self.update_project_worktrees(project.id, worktrees, &tx)
 174                .await?;
 175
 176            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 177
 178            let room = if let Some(room_id) = project.room_id {
 179                Some(self.get_room(room_id, &tx).await?)
 180            } else {
 181                None
 182            };
 183
 184            Ok((room, guest_connection_ids))
 185        })
 186        .await
 187    }
 188
 189    pub(in crate::db) async fn update_project_worktrees(
 190        &self,
 191        project_id: ProjectId,
 192        worktrees: &[proto::WorktreeMetadata],
 193        tx: &DatabaseTransaction,
 194    ) -> Result<()> {
 195        if !worktrees.is_empty() {
 196            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 197                id: ActiveValue::set(worktree.id as i64),
 198                project_id: ActiveValue::set(project_id),
 199                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 200                root_name: ActiveValue::set(worktree.root_name.clone()),
 201                visible: ActiveValue::set(worktree.visible),
 202                scan_id: ActiveValue::set(0),
 203                completed_scan_id: ActiveValue::set(0),
 204            }))
 205            .on_conflict(
 206                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 207                    .update_column(worktree::Column::RootName)
 208                    .to_owned(),
 209            )
 210            .exec(tx)
 211            .await?;
 212        }
 213
 214        worktree::Entity::delete_many()
 215            .filter(worktree::Column::ProjectId.eq(project_id).and(
 216                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 217            ))
 218            .exec(tx)
 219            .await?;
 220
 221        Ok(())
 222    }
 223
 224    pub async fn update_worktree(
 225        &self,
 226        update: &proto::UpdateWorktree,
 227        connection: ConnectionId,
 228    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 229        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 230            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 231        {
 232            return Err(anyhow!(
 233                "invalid worktree update. removed entries: {}, updated entries: {}",
 234                update.removed_entries.len(),
 235                update.updated_entries.len()
 236            ))?;
 237        }
 238
 239        let project_id = ProjectId::from_proto(update.project_id);
 240        let worktree_id = update.worktree_id as i64;
 241        self.project_transaction(project_id, |tx| async move {
 242            // Ensure the update comes from the host.
 243            let _project = project::Entity::find_by_id(project_id)
 244                .filter(
 245                    Condition::all()
 246                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 247                        .add(
 248                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 249                        ),
 250                )
 251                .one(&*tx)
 252                .await?
 253                .with_context(|| format!("no such project: {project_id}"))?;
 254
 255            // Update metadata.
 256            worktree::Entity::update(worktree::ActiveModel {
 257                id: ActiveValue::set(worktree_id),
 258                project_id: ActiveValue::set(project_id),
 259                root_name: ActiveValue::set(update.root_name.clone()),
 260                scan_id: ActiveValue::set(update.scan_id as i64),
 261                completed_scan_id: if update.is_last_update {
 262                    ActiveValue::set(update.scan_id as i64)
 263                } else {
 264                    ActiveValue::default()
 265                },
 266                abs_path: ActiveValue::set(update.abs_path.clone()),
 267                ..Default::default()
 268            })
 269            .exec(&*tx)
 270            .await?;
 271
 272            if !update.updated_entries.is_empty() {
 273                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 274                    let mtime = entry.mtime.clone().unwrap_or_default();
 275                    worktree_entry::ActiveModel {
 276                        project_id: ActiveValue::set(project_id),
 277                        worktree_id: ActiveValue::set(worktree_id),
 278                        id: ActiveValue::set(entry.id as i64),
 279                        is_dir: ActiveValue::set(entry.is_dir),
 280                        path: ActiveValue::set(entry.path.clone()),
 281                        inode: ActiveValue::set(entry.inode as i64),
 282                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 283                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 284                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 285                        is_ignored: ActiveValue::set(entry.is_ignored),
 286                        git_status: ActiveValue::set(None),
 287                        is_external: ActiveValue::set(entry.is_external),
 288                        is_deleted: ActiveValue::set(false),
 289                        is_hidden: ActiveValue::set(entry.is_hidden),
 290                        scan_id: ActiveValue::set(update.scan_id as i64),
 291                        is_fifo: ActiveValue::set(entry.is_fifo),
 292                    }
 293                }))
 294                .on_conflict(
 295                    OnConflict::columns([
 296                        worktree_entry::Column::ProjectId,
 297                        worktree_entry::Column::WorktreeId,
 298                        worktree_entry::Column::Id,
 299                    ])
 300                    .update_columns([
 301                        worktree_entry::Column::IsDir,
 302                        worktree_entry::Column::Path,
 303                        worktree_entry::Column::Inode,
 304                        worktree_entry::Column::MtimeSeconds,
 305                        worktree_entry::Column::MtimeNanos,
 306                        worktree_entry::Column::CanonicalPath,
 307                        worktree_entry::Column::IsIgnored,
 308                        worktree_entry::Column::IsHidden,
 309                        worktree_entry::Column::ScanId,
 310                    ])
 311                    .to_owned(),
 312                )
 313                .exec(&*tx)
 314                .await?;
 315            }
 316
 317            if !update.removed_entries.is_empty() {
 318                worktree_entry::Entity::update_many()
 319                    .filter(
 320                        worktree_entry::Column::ProjectId
 321                            .eq(project_id)
 322                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 323                            .and(
 324                                worktree_entry::Column::Id
 325                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 326                            ),
 327                    )
 328                    .set(worktree_entry::ActiveModel {
 329                        is_deleted: ActiveValue::Set(true),
 330                        scan_id: ActiveValue::Set(update.scan_id as i64),
 331                        ..Default::default()
 332                    })
 333                    .exec(&*tx)
 334                    .await?;
 335            }
 336
 337            // Backward-compatibility for old Zed clients.
 338            //
 339            // Remove this block when Zed 1.80 stable has been out for a week.
 340            {
 341                if !update.updated_repositories.is_empty() {
 342                    project_repository::Entity::insert_many(
 343                        update.updated_repositories.iter().map(|repository| {
 344                            project_repository::ActiveModel {
 345                                project_id: ActiveValue::set(project_id),
 346                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 347                                id: ActiveValue::set(repository.repository_id as i64),
 348                                scan_id: ActiveValue::set(update.scan_id as i64),
 349                                is_deleted: ActiveValue::set(false),
 350                                branch_summary: ActiveValue::Set(
 351                                    repository
 352                                        .branch_summary
 353                                        .as_ref()
 354                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 355                                ),
 356                                current_merge_conflicts: ActiveValue::Set(Some(
 357                                    serde_json::to_string(&repository.current_merge_conflicts)
 358                                        .unwrap(),
 359                                )),
 360                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 361                                abs_path: ActiveValue::set(String::new()),
 362                                entry_ids: ActiveValue::set("[]".into()),
 363                                head_commit_details: ActiveValue::set(None),
 364                                merge_message: ActiveValue::set(None),
 365                            }
 366                        }),
 367                    )
 368                    .on_conflict(
 369                        OnConflict::columns([
 370                            project_repository::Column::ProjectId,
 371                            project_repository::Column::Id,
 372                        ])
 373                        .update_columns([
 374                            project_repository::Column::ScanId,
 375                            project_repository::Column::BranchSummary,
 376                            project_repository::Column::CurrentMergeConflicts,
 377                        ])
 378                        .to_owned(),
 379                    )
 380                    .exec(&*tx)
 381                    .await?;
 382
 383                    let has_any_statuses = update
 384                        .updated_repositories
 385                        .iter()
 386                        .any(|repository| !repository.updated_statuses.is_empty());
 387
 388                    if has_any_statuses {
 389                        project_repository_statuses::Entity::insert_many(
 390                            update.updated_repositories.iter().flat_map(
 391                                |repository: &proto::RepositoryEntry| {
 392                                    repository.updated_statuses.iter().map(|status_entry| {
 393                                        let (repo_path, status_kind, first_status, second_status) =
 394                                            proto_status_to_db(status_entry.clone());
 395                                        project_repository_statuses::ActiveModel {
 396                                            project_id: ActiveValue::set(project_id),
 397                                            repository_id: ActiveValue::set(
 398                                                repository.repository_id as i64,
 399                                            ),
 400                                            scan_id: ActiveValue::set(update.scan_id as i64),
 401                                            is_deleted: ActiveValue::set(false),
 402                                            repo_path: ActiveValue::set(repo_path),
 403                                            status: ActiveValue::set(0),
 404                                            status_kind: ActiveValue::set(status_kind),
 405                                            first_status: ActiveValue::set(first_status),
 406                                            second_status: ActiveValue::set(second_status),
 407                                        }
 408                                    })
 409                                },
 410                            ),
 411                        )
 412                        .on_conflict(
 413                            OnConflict::columns([
 414                                project_repository_statuses::Column::ProjectId,
 415                                project_repository_statuses::Column::RepositoryId,
 416                                project_repository_statuses::Column::RepoPath,
 417                            ])
 418                            .update_columns([
 419                                project_repository_statuses::Column::ScanId,
 420                                project_repository_statuses::Column::StatusKind,
 421                                project_repository_statuses::Column::FirstStatus,
 422                                project_repository_statuses::Column::SecondStatus,
 423                            ])
 424                            .to_owned(),
 425                        )
 426                        .exec(&*tx)
 427                        .await?;
 428                    }
 429
 430                    for repo in &update.updated_repositories {
 431                        if !repo.removed_statuses.is_empty() {
 432                            project_repository_statuses::Entity::update_many()
 433                                .filter(
 434                                    project_repository_statuses::Column::ProjectId
 435                                        .eq(project_id)
 436                                        .and(
 437                                            project_repository_statuses::Column::RepositoryId
 438                                                .eq(repo.repository_id),
 439                                        )
 440                                        .and(
 441                                            project_repository_statuses::Column::RepoPath
 442                                                .is_in(repo.removed_statuses.iter()),
 443                                        ),
 444                                )
 445                                .set(project_repository_statuses::ActiveModel {
 446                                    is_deleted: ActiveValue::Set(true),
 447                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 448                                    ..Default::default()
 449                                })
 450                                .exec(&*tx)
 451                                .await?;
 452                        }
 453                    }
 454                }
 455
 456                if !update.removed_repositories.is_empty() {
 457                    project_repository::Entity::update_many()
 458                        .filter(
 459                            project_repository::Column::ProjectId
 460                                .eq(project_id)
 461                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 462                                .and(project_repository::Column::Id.is_in(
 463                                    update.removed_repositories.iter().map(|id| *id as i64),
 464                                )),
 465                        )
 466                        .set(project_repository::ActiveModel {
 467                            is_deleted: ActiveValue::Set(true),
 468                            scan_id: ActiveValue::Set(update.scan_id as i64),
 469                            ..Default::default()
 470                        })
 471                        .exec(&*tx)
 472                        .await?;
 473                }
 474            }
 475
 476            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 477            Ok(connection_ids)
 478        })
 479        .await
 480    }
 481
 482    pub async fn update_repository(
 483        &self,
 484        update: &proto::UpdateRepository,
 485        _connection: ConnectionId,
 486    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 487        let project_id = ProjectId::from_proto(update.project_id);
 488        let repository_id = update.id as i64;
 489        self.project_transaction(project_id, |tx| async move {
 490            project_repository::Entity::insert(project_repository::ActiveModel {
 491                project_id: ActiveValue::set(project_id),
 492                id: ActiveValue::set(repository_id),
 493                legacy_worktree_id: ActiveValue::set(None),
 494                abs_path: ActiveValue::set(update.abs_path.clone()),
 495                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 496                scan_id: ActiveValue::set(update.scan_id as i64),
 497                is_deleted: ActiveValue::set(false),
 498                branch_summary: ActiveValue::Set(
 499                    update
 500                        .branch_summary
 501                        .as_ref()
 502                        .map(|summary| serde_json::to_string(summary).unwrap()),
 503                ),
 504                head_commit_details: ActiveValue::Set(
 505                    update
 506                        .head_commit_details
 507                        .as_ref()
 508                        .map(|details| serde_json::to_string(details).unwrap()),
 509                ),
 510                current_merge_conflicts: ActiveValue::Set(Some(
 511                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 512                )),
 513                merge_message: ActiveValue::set(update.merge_message.clone()),
 514            })
 515            .on_conflict(
 516                OnConflict::columns([
 517                    project_repository::Column::ProjectId,
 518                    project_repository::Column::Id,
 519                ])
 520                .update_columns([
 521                    project_repository::Column::ScanId,
 522                    project_repository::Column::BranchSummary,
 523                    project_repository::Column::EntryIds,
 524                    project_repository::Column::AbsPath,
 525                    project_repository::Column::CurrentMergeConflicts,
 526                    project_repository::Column::HeadCommitDetails,
 527                    project_repository::Column::MergeMessage,
 528                ])
 529                .to_owned(),
 530            )
 531            .exec(&*tx)
 532            .await?;
 533
 534            let has_any_statuses = !update.updated_statuses.is_empty();
 535
 536            if has_any_statuses {
 537                project_repository_statuses::Entity::insert_many(
 538                    update.updated_statuses.iter().map(|status_entry| {
 539                        let (repo_path, status_kind, first_status, second_status) =
 540                            proto_status_to_db(status_entry.clone());
 541                        project_repository_statuses::ActiveModel {
 542                            project_id: ActiveValue::set(project_id),
 543                            repository_id: ActiveValue::set(repository_id),
 544                            scan_id: ActiveValue::set(update.scan_id as i64),
 545                            is_deleted: ActiveValue::set(false),
 546                            repo_path: ActiveValue::set(repo_path),
 547                            status: ActiveValue::set(0),
 548                            status_kind: ActiveValue::set(status_kind),
 549                            first_status: ActiveValue::set(first_status),
 550                            second_status: ActiveValue::set(second_status),
 551                        }
 552                    }),
 553                )
 554                .on_conflict(
 555                    OnConflict::columns([
 556                        project_repository_statuses::Column::ProjectId,
 557                        project_repository_statuses::Column::RepositoryId,
 558                        project_repository_statuses::Column::RepoPath,
 559                    ])
 560                    .update_columns([
 561                        project_repository_statuses::Column::ScanId,
 562                        project_repository_statuses::Column::StatusKind,
 563                        project_repository_statuses::Column::FirstStatus,
 564                        project_repository_statuses::Column::SecondStatus,
 565                    ])
 566                    .to_owned(),
 567                )
 568                .exec(&*tx)
 569                .await?;
 570            }
 571
 572            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 573
 574            if has_any_removed_statuses {
 575                project_repository_statuses::Entity::update_many()
 576                    .filter(
 577                        project_repository_statuses::Column::ProjectId
 578                            .eq(project_id)
 579                            .and(
 580                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 581                            )
 582                            .and(
 583                                project_repository_statuses::Column::RepoPath
 584                                    .is_in(update.removed_statuses.iter()),
 585                            ),
 586                    )
 587                    .set(project_repository_statuses::ActiveModel {
 588                        is_deleted: ActiveValue::Set(true),
 589                        scan_id: ActiveValue::Set(update.scan_id as i64),
 590                        ..Default::default()
 591                    })
 592                    .exec(&*tx)
 593                    .await?;
 594            }
 595
 596            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 597            Ok(connection_ids)
 598        })
 599        .await
 600    }
 601
 602    pub async fn remove_repository(
 603        &self,
 604        remove: &proto::RemoveRepository,
 605        _connection: ConnectionId,
 606    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 607        let project_id = ProjectId::from_proto(remove.project_id);
 608        let repository_id = remove.id as i64;
 609        self.project_transaction(project_id, |tx| async move {
 610            project_repository::Entity::update_many()
 611                .filter(
 612                    project_repository::Column::ProjectId
 613                        .eq(project_id)
 614                        .and(project_repository::Column::Id.eq(repository_id)),
 615                )
 616                .set(project_repository::ActiveModel {
 617                    is_deleted: ActiveValue::Set(true),
 618                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 619                    ..Default::default()
 620                })
 621                .exec(&*tx)
 622                .await?;
 623
 624            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 625            Ok(connection_ids)
 626        })
 627        .await
 628    }
 629
 630    /// Updates the diagnostic summary for the given connection.
 631    pub async fn update_diagnostic_summary(
 632        &self,
 633        update: &proto::UpdateDiagnosticSummary,
 634        connection: ConnectionId,
 635    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 636        let project_id = ProjectId::from_proto(update.project_id);
 637        let worktree_id = update.worktree_id as i64;
 638        self.project_transaction(project_id, |tx| async move {
 639            let summary = update.summary.as_ref().context("invalid summary")?;
 640
 641            // Ensure the update comes from the host.
 642            let project = project::Entity::find_by_id(project_id)
 643                .one(&*tx)
 644                .await?
 645                .context("no such project")?;
 646            if project.host_connection()? != connection {
 647                return Err(anyhow!("can't update a project hosted by someone else"))?;
 648            }
 649
 650            // Update summary.
 651            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 652                project_id: ActiveValue::set(project_id),
 653                worktree_id: ActiveValue::set(worktree_id),
 654                path: ActiveValue::set(summary.path.clone()),
 655                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 656                error_count: ActiveValue::set(summary.error_count as i32),
 657                warning_count: ActiveValue::set(summary.warning_count as i32),
 658            })
 659            .on_conflict(
 660                OnConflict::columns([
 661                    worktree_diagnostic_summary::Column::ProjectId,
 662                    worktree_diagnostic_summary::Column::WorktreeId,
 663                    worktree_diagnostic_summary::Column::Path,
 664                ])
 665                .update_columns([
 666                    worktree_diagnostic_summary::Column::LanguageServerId,
 667                    worktree_diagnostic_summary::Column::ErrorCount,
 668                    worktree_diagnostic_summary::Column::WarningCount,
 669                ])
 670                .to_owned(),
 671            )
 672            .exec(&*tx)
 673            .await?;
 674
 675            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 676            Ok(connection_ids)
 677        })
 678        .await
 679    }
 680
 681    /// Starts the language server for the given connection.
 682    pub async fn start_language_server(
 683        &self,
 684        update: &proto::StartLanguageServer,
 685        connection: ConnectionId,
 686    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 687        let project_id = ProjectId::from_proto(update.project_id);
 688        self.project_transaction(project_id, |tx| async move {
 689            let server = update.server.as_ref().context("invalid language server")?;
 690
 691            // Ensure the update comes from the host.
 692            let project = project::Entity::find_by_id(project_id)
 693                .one(&*tx)
 694                .await?
 695                .context("no such project")?;
 696            if project.host_connection()? != connection {
 697                return Err(anyhow!("can't update a project hosted by someone else"))?;
 698            }
 699
 700            // Add the newly-started language server.
 701            language_server::Entity::insert(language_server::ActiveModel {
 702                project_id: ActiveValue::set(project_id),
 703                id: ActiveValue::set(server.id as i64),
 704                name: ActiveValue::set(server.name.clone()),
 705                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 706                capabilities: ActiveValue::set(update.capabilities.clone()),
 707            })
 708            .on_conflict(
 709                OnConflict::columns([
 710                    language_server::Column::ProjectId,
 711                    language_server::Column::Id,
 712                ])
 713                .update_columns([
 714                    language_server::Column::Name,
 715                    language_server::Column::Capabilities,
 716                    language_server::Column::WorktreeId,
 717                ])
 718                .to_owned(),
 719            )
 720            .exec(&*tx)
 721            .await?;
 722
 723            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 724            Ok(connection_ids)
 725        })
 726        .await
 727    }
 728
 729    /// Updates the worktree settings for the given connection.
 730    pub async fn update_worktree_settings(
 731        &self,
 732        update: &proto::UpdateWorktreeSettings,
 733        connection: ConnectionId,
 734    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 735        let project_id = ProjectId::from_proto(update.project_id);
 736        let kind = match update.kind {
 737            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 738                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 739            None => proto::LocalSettingsKind::Settings,
 740        };
 741        let kind = LocalSettingsKind::from_proto(kind);
 742        self.project_transaction(project_id, |tx| async move {
 743            // Ensure the update comes from the host.
 744            let project = project::Entity::find_by_id(project_id)
 745                .one(&*tx)
 746                .await?
 747                .context("no such project")?;
 748            if project.host_connection()? != connection {
 749                return Err(anyhow!("can't update a project hosted by someone else"))?;
 750            }
 751
 752            if let Some(content) = &update.content {
 753                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 754                    project_id: ActiveValue::Set(project_id),
 755                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 756                    path: ActiveValue::Set(update.path.clone()),
 757                    content: ActiveValue::Set(content.clone()),
 758                    kind: ActiveValue::Set(kind),
 759                })
 760                .on_conflict(
 761                    OnConflict::columns([
 762                        worktree_settings_file::Column::ProjectId,
 763                        worktree_settings_file::Column::WorktreeId,
 764                        worktree_settings_file::Column::Path,
 765                    ])
 766                    .update_column(worktree_settings_file::Column::Content)
 767                    .to_owned(),
 768                )
 769                .exec(&*tx)
 770                .await?;
 771            } else {
 772                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 773                    project_id: ActiveValue::Set(project_id),
 774                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 775                    path: ActiveValue::Set(update.path.clone()),
 776                    ..Default::default()
 777                })
 778                .exec(&*tx)
 779                .await?;
 780            }
 781
 782            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 783            Ok(connection_ids)
 784        })
 785        .await
 786    }
 787
 788    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 789        self.transaction(|tx| async move {
 790            Ok(project::Entity::find_by_id(id)
 791                .one(&*tx)
 792                .await?
 793                .context("no such project")?)
 794        })
 795        .await
 796    }
 797
 798    /// Adds the given connection to the specified project
 799    /// in the current room.
 800    pub async fn join_project(
 801        &self,
 802        project_id: ProjectId,
 803        connection: ConnectionId,
 804        user_id: UserId,
 805        committer_name: Option<String>,
 806        committer_email: Option<String>,
 807    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 808        self.project_transaction(project_id, move |tx| {
 809            let committer_name = committer_name.clone();
 810            let committer_email = committer_email.clone();
 811            async move {
 812                let (project, role) = self
 813                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 814                    .await?;
 815                self.join_project_internal(
 816                    project,
 817                    user_id,
 818                    committer_name,
 819                    committer_email,
 820                    connection,
 821                    role,
 822                    &tx,
 823                )
 824                .await
 825            }
 826        })
 827        .await
 828    }
 829
 830    async fn join_project_internal(
 831        &self,
 832        project: project::Model,
 833        user_id: UserId,
 834        committer_name: Option<String>,
 835        committer_email: Option<String>,
 836        connection: ConnectionId,
 837        role: ChannelRole,
 838        tx: &DatabaseTransaction,
 839    ) -> Result<(Project, ReplicaId)> {
 840        let mut collaborators = project
 841            .find_related(project_collaborator::Entity)
 842            .all(tx)
 843            .await?;
 844        let replica_ids = collaborators
 845            .iter()
 846            .map(|c| c.replica_id)
 847            .collect::<HashSet<_>>();
 848        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 849        while replica_ids.contains(&replica_id) {
 850            replica_id.0 += 1;
 851        }
 852        let new_collaborator = project_collaborator::ActiveModel {
 853            project_id: ActiveValue::set(project.id),
 854            connection_id: ActiveValue::set(connection.id as i32),
 855            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 856            user_id: ActiveValue::set(user_id),
 857            replica_id: ActiveValue::set(replica_id),
 858            is_host: ActiveValue::set(false),
 859            id: ActiveValue::NotSet,
 860            committer_name: ActiveValue::set(committer_name),
 861            committer_email: ActiveValue::set(committer_email),
 862        }
 863        .insert(tx)
 864        .await?;
 865        collaborators.push(new_collaborator);
 866
 867        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 868        let mut worktrees = db_worktrees
 869            .into_iter()
 870            .map(|db_worktree| {
 871                (
 872                    db_worktree.id as u64,
 873                    Worktree {
 874                        id: db_worktree.id as u64,
 875                        abs_path: db_worktree.abs_path,
 876                        root_name: db_worktree.root_name,
 877                        visible: db_worktree.visible,
 878                        entries: Default::default(),
 879                        diagnostic_summaries: Default::default(),
 880                        settings_files: Default::default(),
 881                        scan_id: db_worktree.scan_id as u64,
 882                        completed_scan_id: db_worktree.completed_scan_id as u64,
 883                        legacy_repository_entries: Default::default(),
 884                    },
 885                )
 886            })
 887            .collect::<BTreeMap<_, _>>();
 888
 889        // Populate worktree entries.
 890        {
 891            let mut db_entries = worktree_entry::Entity::find()
 892                .filter(
 893                    Condition::all()
 894                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 895                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 896                )
 897                .stream(tx)
 898                .await?;
 899            while let Some(db_entry) = db_entries.next().await {
 900                let db_entry = db_entry?;
 901                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 902                    worktree.entries.push(proto::Entry {
 903                        id: db_entry.id as u64,
 904                        is_dir: db_entry.is_dir,
 905                        path: db_entry.path,
 906                        inode: db_entry.inode as u64,
 907                        mtime: Some(proto::Timestamp {
 908                            seconds: db_entry.mtime_seconds as u64,
 909                            nanos: db_entry.mtime_nanos as u32,
 910                        }),
 911                        canonical_path: db_entry.canonical_path,
 912                        is_ignored: db_entry.is_ignored,
 913                        is_external: db_entry.is_external,
 914                        is_hidden: db_entry.is_hidden,
 915                        // This is only used in the summarization backlog, so if it's None,
 916                        // that just means we won't be able to detect when to resummarize
 917                        // based on total number of backlogged bytes - instead, we'd go
 918                        // on number of files only. That shouldn't be a huge deal in practice.
 919                        size: None,
 920                        is_fifo: db_entry.is_fifo,
 921                    });
 922                }
 923            }
 924        }
 925
 926        // Populate repository entries.
 927        let mut repositories = Vec::new();
 928        {
 929            let db_repository_entries = project_repository::Entity::find()
 930                .filter(
 931                    Condition::all()
 932                        .add(project_repository::Column::ProjectId.eq(project.id))
 933                        .add(project_repository::Column::IsDeleted.eq(false)),
 934                )
 935                .all(tx)
 936                .await?;
 937            for db_repository_entry in db_repository_entries {
 938                let mut repository_statuses = project_repository_statuses::Entity::find()
 939                    .filter(
 940                        Condition::all()
 941                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 942                            .add(
 943                                project_repository_statuses::Column::RepositoryId
 944                                    .eq(db_repository_entry.id),
 945                            )
 946                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 947                    )
 948                    .stream(tx)
 949                    .await?;
 950                let mut updated_statuses = Vec::new();
 951                while let Some(status_entry) = repository_statuses.next().await {
 952                    let status_entry = status_entry?;
 953                    updated_statuses.push(db_status_to_proto(status_entry)?);
 954                }
 955
 956                let current_merge_conflicts = db_repository_entry
 957                    .current_merge_conflicts
 958                    .as_ref()
 959                    .map(|conflicts| serde_json::from_str(conflicts))
 960                    .transpose()?
 961                    .unwrap_or_default();
 962
 963                let branch_summary = db_repository_entry
 964                    .branch_summary
 965                    .as_ref()
 966                    .map(|branch_summary| serde_json::from_str(branch_summary))
 967                    .transpose()?
 968                    .unwrap_or_default();
 969
 970                let head_commit_details = db_repository_entry
 971                    .head_commit_details
 972                    .as_ref()
 973                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 974                    .transpose()?
 975                    .unwrap_or_default();
 976
 977                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 978                    .context("failed to deserialize repository's entry ids")?;
 979
 980                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 981                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 982                        worktree.legacy_repository_entries.insert(
 983                            db_repository_entry.id as u64,
 984                            proto::RepositoryEntry {
 985                                repository_id: db_repository_entry.id as u64,
 986                                updated_statuses,
 987                                removed_statuses: Vec::new(),
 988                                current_merge_conflicts,
 989                                branch_summary,
 990                            },
 991                        );
 992                    }
 993                } else {
 994                    repositories.push(proto::UpdateRepository {
 995                        project_id: db_repository_entry.project_id.0 as u64,
 996                        id: db_repository_entry.id as u64,
 997                        abs_path: db_repository_entry.abs_path,
 998                        entry_ids,
 999                        updated_statuses,
1000                        removed_statuses: Vec::new(),
1001                        current_merge_conflicts,
1002                        branch_summary,
1003                        head_commit_details,
1004                        scan_id: db_repository_entry.scan_id as u64,
1005                        is_last_update: true,
1006                        merge_message: db_repository_entry.merge_message,
1007                        stash_entries: Vec::new(),
1008                        renamed_paths: Default::default(),
1009                    });
1010                }
1011            }
1012        }
1013
1014        // Populate worktree diagnostic summaries.
1015        {
1016            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1017                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1018                .stream(tx)
1019                .await?;
1020            while let Some(db_summary) = db_summaries.next().await {
1021                let db_summary = db_summary?;
1022                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1023                    worktree
1024                        .diagnostic_summaries
1025                        .push(proto::DiagnosticSummary {
1026                            path: db_summary.path,
1027                            language_server_id: db_summary.language_server_id as u64,
1028                            error_count: db_summary.error_count as u32,
1029                            warning_count: db_summary.warning_count as u32,
1030                        });
1031                }
1032            }
1033        }
1034
1035        // Populate worktree settings files
1036        {
1037            let mut db_settings_files = worktree_settings_file::Entity::find()
1038                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1039                .stream(tx)
1040                .await?;
1041            while let Some(db_settings_file) = db_settings_files.next().await {
1042                let db_settings_file = db_settings_file?;
1043                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1044                    worktree.settings_files.push(WorktreeSettingsFile {
1045                        path: db_settings_file.path,
1046                        content: db_settings_file.content,
1047                        kind: db_settings_file.kind,
1048                    });
1049                }
1050            }
1051        }
1052
1053        // Populate language servers.
1054        let language_servers = project
1055            .find_related(language_server::Entity)
1056            .all(tx)
1057            .await?;
1058
1059        let path_style = if project.windows_paths {
1060            PathStyle::Windows
1061        } else {
1062            PathStyle::Posix
1063        };
1064
1065        let project = Project {
1066            id: project.id,
1067            role,
1068            collaborators: collaborators
1069                .into_iter()
1070                .map(|collaborator| ProjectCollaborator {
1071                    connection_id: collaborator.connection(),
1072                    user_id: collaborator.user_id,
1073                    replica_id: collaborator.replica_id,
1074                    is_host: collaborator.is_host,
1075                    committer_name: collaborator.committer_name,
1076                    committer_email: collaborator.committer_email,
1077                })
1078                .collect(),
1079            worktrees,
1080            repositories,
1081            language_servers: language_servers
1082                .into_iter()
1083                .map(|language_server| LanguageServer {
1084                    server: proto::LanguageServer {
1085                        id: language_server.id as u64,
1086                        name: language_server.name,
1087                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1088                    },
1089                    capabilities: language_server.capabilities,
1090                })
1091                .collect(),
1092            path_style,
1093        };
1094        Ok((project, replica_id as ReplicaId))
1095    }
1096
1097    /// Removes the given connection from the specified project.
1098    pub async fn leave_project(
1099        &self,
1100        project_id: ProjectId,
1101        connection: ConnectionId,
1102    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1103        self.project_transaction(project_id, |tx| async move {
1104            let result = project_collaborator::Entity::delete_many()
1105                .filter(
1106                    Condition::all()
1107                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1108                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1109                        .add(
1110                            project_collaborator::Column::ConnectionServerId
1111                                .eq(connection.owner_id as i32),
1112                        ),
1113                )
1114                .exec(&*tx)
1115                .await?;
1116            if result.rows_affected == 0 {
1117                Err(anyhow!("not a collaborator on this project"))?;
1118            }
1119
1120            let project = project::Entity::find_by_id(project_id)
1121                .one(&*tx)
1122                .await?
1123                .context("no such project")?;
1124            let collaborators = project
1125                .find_related(project_collaborator::Entity)
1126                .all(&*tx)
1127                .await?;
1128            let connection_ids: Vec<ConnectionId> = collaborators
1129                .into_iter()
1130                .map(|collaborator| collaborator.connection())
1131                .collect();
1132
1133            follower::Entity::delete_many()
1134                .filter(
1135                    Condition::any()
1136                        .add(
1137                            Condition::all()
1138                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1139                                .add(
1140                                    follower::Column::LeaderConnectionServerId
1141                                        .eq(connection.owner_id),
1142                                )
1143                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1144                        )
1145                        .add(
1146                            Condition::all()
1147                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1148                                .add(
1149                                    follower::Column::FollowerConnectionServerId
1150                                        .eq(connection.owner_id),
1151                                )
1152                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1153                        ),
1154                )
1155                .exec(&*tx)
1156                .await?;
1157
1158            let room = if let Some(room_id) = project.room_id {
1159                Some(self.get_room(room_id, &tx).await?)
1160            } else {
1161                None
1162            };
1163
1164            let left_project = LeftProject {
1165                id: project_id,
1166                should_unshare: connection == project.host_connection()?,
1167                connection_ids,
1168            };
1169            Ok((room, left_project))
1170        })
1171        .await
1172    }
1173
1174    pub async fn check_user_is_project_host(
1175        &self,
1176        project_id: ProjectId,
1177        connection_id: ConnectionId,
1178    ) -> Result<()> {
1179        self.project_transaction(project_id, |tx| async move {
1180            project::Entity::find()
1181                .filter(
1182                    Condition::all()
1183                        .add(project::Column::Id.eq(project_id))
1184                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1185                        .add(
1186                            project::Column::HostConnectionServerId
1187                                .eq(Some(connection_id.owner_id as i32)),
1188                        ),
1189                )
1190                .one(&*tx)
1191                .await?
1192                .context("failed to read project host")?;
1193
1194            Ok(())
1195        })
1196        .await
1197        .map(|guard| guard.into_inner())
1198    }
1199
1200    /// Returns the current project if the given user is authorized to access it with the specified capability.
1201    pub async fn access_project(
1202        &self,
1203        project_id: ProjectId,
1204        connection_id: ConnectionId,
1205        capability: Capability,
1206        tx: &DatabaseTransaction,
1207    ) -> Result<(project::Model, ChannelRole)> {
1208        let project = project::Entity::find_by_id(project_id)
1209            .one(tx)
1210            .await?
1211            .context("no such project")?;
1212
1213        let role_from_room = if let Some(room_id) = project.room_id {
1214            room_participant::Entity::find()
1215                .filter(room_participant::Column::RoomId.eq(room_id))
1216                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1217                .one(tx)
1218                .await?
1219                .and_then(|participant| participant.role)
1220        } else {
1221            None
1222        };
1223
1224        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1225
1226        match capability {
1227            Capability::ReadWrite => {
1228                if !role.can_edit_projects() {
1229                    return Err(anyhow!("not authorized to edit projects"))?;
1230                }
1231            }
1232            Capability::ReadOnly => {
1233                if !role.can_read_projects() {
1234                    return Err(anyhow!("not authorized to read projects"))?;
1235                }
1236            }
1237        }
1238
1239        Ok((project, role))
1240    }
1241
1242    /// Returns the host connection for a read-only request to join a shared project.
1243    pub async fn host_for_read_only_project_request(
1244        &self,
1245        project_id: ProjectId,
1246        connection_id: ConnectionId,
1247    ) -> Result<ConnectionId> {
1248        self.project_transaction(project_id, |tx| async move {
1249            let (project, _) = self
1250                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1251                .await?;
1252            project.host_connection()
1253        })
1254        .await
1255        .map(|guard| guard.into_inner())
1256    }
1257
1258    /// Returns the host connection for a request to join a shared project.
1259    pub async fn host_for_mutating_project_request(
1260        &self,
1261        project_id: ProjectId,
1262        connection_id: ConnectionId,
1263    ) -> Result<ConnectionId> {
1264        self.project_transaction(project_id, |tx| async move {
1265            let (project, _) = self
1266                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1267                .await?;
1268            project.host_connection()
1269        })
1270        .await
1271        .map(|guard| guard.into_inner())
1272    }
1273
1274    pub async fn connections_for_buffer_update(
1275        &self,
1276        project_id: ProjectId,
1277        connection_id: ConnectionId,
1278        capability: Capability,
1279    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1280        self.project_transaction(project_id, |tx| async move {
1281            // Authorize
1282            let (project, _) = self
1283                .access_project(project_id, connection_id, capability, &tx)
1284                .await?;
1285
1286            let host_connection_id = project.host_connection()?;
1287
1288            let collaborators = project_collaborator::Entity::find()
1289                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1290                .all(&*tx)
1291                .await?;
1292
1293            let guest_connection_ids = collaborators
1294                .into_iter()
1295                .filter_map(|collaborator| {
1296                    if collaborator.is_host {
1297                        None
1298                    } else {
1299                        Some(collaborator.connection())
1300                    }
1301                })
1302                .collect();
1303
1304            Ok((host_connection_id, guest_connection_ids))
1305        })
1306        .await
1307    }
1308
1309    /// Returns the connection IDs in the given project.
1310    ///
1311    /// The provided `connection_id` must also be a collaborator in the project,
1312    /// otherwise an error will be returned.
1313    pub async fn project_connection_ids(
1314        &self,
1315        project_id: ProjectId,
1316        connection_id: ConnectionId,
1317        exclude_dev_server: bool,
1318    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1319        self.project_transaction(project_id, |tx| async move {
1320            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1321                .await
1322        })
1323        .await
1324    }
1325
1326    async fn internal_project_connection_ids(
1327        &self,
1328        project_id: ProjectId,
1329        connection_id: ConnectionId,
1330        exclude_dev_server: bool,
1331        tx: &DatabaseTransaction,
1332    ) -> Result<HashSet<ConnectionId>> {
1333        let project = project::Entity::find_by_id(project_id)
1334            .one(tx)
1335            .await?
1336            .context("no such project")?;
1337
1338        let mut collaborators = project_collaborator::Entity::find()
1339            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1340            .stream(tx)
1341            .await?;
1342
1343        let mut connection_ids = HashSet::default();
1344        if let Some(host_connection) = project.host_connection().log_err()
1345            && !exclude_dev_server
1346        {
1347            connection_ids.insert(host_connection);
1348        }
1349
1350        while let Some(collaborator) = collaborators.next().await {
1351            let collaborator = collaborator?;
1352            connection_ids.insert(collaborator.connection());
1353        }
1354
1355        if connection_ids.contains(&connection_id)
1356            || Some(connection_id) == project.host_connection().ok()
1357        {
1358            Ok(connection_ids)
1359        } else {
1360            Err(anyhow!(
1361                "can only send project updates to a project you're in"
1362            ))?
1363        }
1364    }
1365
1366    async fn project_guest_connection_ids(
1367        &self,
1368        project_id: ProjectId,
1369        tx: &DatabaseTransaction,
1370    ) -> Result<Vec<ConnectionId>> {
1371        let mut collaborators = project_collaborator::Entity::find()
1372            .filter(
1373                project_collaborator::Column::ProjectId
1374                    .eq(project_id)
1375                    .and(project_collaborator::Column::IsHost.eq(false)),
1376            )
1377            .stream(tx)
1378            .await?;
1379
1380        let mut guest_connection_ids = Vec::new();
1381        while let Some(collaborator) = collaborators.next().await {
1382            let collaborator = collaborator?;
1383            guest_connection_ids.push(collaborator.connection());
1384        }
1385        Ok(guest_connection_ids)
1386    }
1387
1388    /// Returns the [`RoomId`] for the given project.
1389    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1390        self.transaction(|tx| async move {
1391            Ok(project::Entity::find_by_id(project_id)
1392                .one(&*tx)
1393                .await?
1394                .and_then(|project| project.room_id))
1395        })
1396        .await
1397    }
1398
1399    pub async fn check_room_participants(
1400        &self,
1401        room_id: RoomId,
1402        leader_id: ConnectionId,
1403        follower_id: ConnectionId,
1404    ) -> Result<()> {
1405        self.transaction(|tx| async move {
1406            use room_participant::Column;
1407
1408            let count = room_participant::Entity::find()
1409                .filter(
1410                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1411                        Condition::any()
1412                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1413                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1414                            ))
1415                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1416                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1417                            )),
1418                    ),
1419                )
1420                .count(&*tx)
1421                .await?;
1422
1423            if count < 2 {
1424                Err(anyhow!("not room participants"))?;
1425            }
1426
1427            Ok(())
1428        })
1429        .await
1430    }
1431
1432    /// Adds the given follower connection as a follower of the given leader connection.
1433    pub async fn follow(
1434        &self,
1435        room_id: RoomId,
1436        project_id: ProjectId,
1437        leader_connection: ConnectionId,
1438        follower_connection: ConnectionId,
1439    ) -> Result<TransactionGuard<proto::Room>> {
1440        self.room_transaction(room_id, |tx| async move {
1441            follower::ActiveModel {
1442                room_id: ActiveValue::set(room_id),
1443                project_id: ActiveValue::set(project_id),
1444                leader_connection_server_id: ActiveValue::set(ServerId(
1445                    leader_connection.owner_id as i32,
1446                )),
1447                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1448                follower_connection_server_id: ActiveValue::set(ServerId(
1449                    follower_connection.owner_id as i32,
1450                )),
1451                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1452                ..Default::default()
1453            }
1454            .insert(&*tx)
1455            .await?;
1456
1457            let room = self.get_room(room_id, &tx).await?;
1458            Ok(room)
1459        })
1460        .await
1461    }
1462
1463    /// Removes the given follower connection as a follower of the given leader connection.
1464    pub async fn unfollow(
1465        &self,
1466        room_id: RoomId,
1467        project_id: ProjectId,
1468        leader_connection: ConnectionId,
1469        follower_connection: ConnectionId,
1470    ) -> Result<TransactionGuard<proto::Room>> {
1471        self.room_transaction(room_id, |tx| async move {
1472            follower::Entity::delete_many()
1473                .filter(
1474                    Condition::all()
1475                        .add(follower::Column::RoomId.eq(room_id))
1476                        .add(follower::Column::ProjectId.eq(project_id))
1477                        .add(
1478                            follower::Column::LeaderConnectionServerId
1479                                .eq(leader_connection.owner_id),
1480                        )
1481                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1482                        .add(
1483                            follower::Column::FollowerConnectionServerId
1484                                .eq(follower_connection.owner_id),
1485                        )
1486                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1487                )
1488                .exec(&*tx)
1489                .await?;
1490
1491            let room = self.get_room(room_id, &tx).await?;
1492            Ok(room)
1493        })
1494        .await
1495    }
1496}