1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project { 1 } else { 0 };
  95
  96            project_collaborator::ActiveModel {
  97                project_id: ActiveValue::set(project.id),
  98                connection_id: ActiveValue::set(connection.id as i32),
  99                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 100                user_id: ActiveValue::set(participant.user_id),
 101                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 102                is_host: ActiveValue::set(true),
 103                id: ActiveValue::NotSet,
 104                committer_name: ActiveValue::Set(None),
 105                committer_email: ActiveValue::Set(None),
 106            }
 107            .insert(&*tx)
 108            .await?;
 109
 110            let room = self.get_room(room_id, &tx).await?;
 111            Ok((project.id, room))
 112        })
 113        .await
 114    }
 115
 116    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 117        self.transaction(|tx| async move {
 118            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 119            Ok(())
 120        })
 121        .await
 122    }
 123
 124    /// Unshares the given project.
 125    pub async fn unshare_project(
 126        &self,
 127        project_id: ProjectId,
 128        connection: ConnectionId,
 129    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 130        self.project_transaction(project_id, |tx| async move {
 131            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 132            let project = project::Entity::find_by_id(project_id)
 133                .one(&*tx)
 134                .await?
 135                .context("project not found")?;
 136            let room = if let Some(room_id) = project.room_id {
 137                Some(self.get_room(room_id, &tx).await?)
 138            } else {
 139                None
 140            };
 141            if project.host_connection()? == connection {
 142                return Ok((true, room, guest_connection_ids));
 143            }
 144            Err(anyhow!("cannot unshare a project hosted by another user"))?
 145        })
 146        .await
 147    }
 148
 149    /// Updates the worktrees associated with the given project.
 150    pub async fn update_project(
 151        &self,
 152        project_id: ProjectId,
 153        connection: ConnectionId,
 154        worktrees: &[proto::WorktreeMetadata],
 155    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 156        self.project_transaction(project_id, |tx| async move {
 157            let project = project::Entity::find_by_id(project_id)
 158                .filter(
 159                    Condition::all()
 160                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 161                        .add(
 162                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 163                        ),
 164                )
 165                .one(&*tx)
 166                .await?
 167                .context("no such project")?;
 168
 169            self.update_project_worktrees(project.id, worktrees, &tx)
 170                .await?;
 171
 172            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 173
 174            let room = if let Some(room_id) = project.room_id {
 175                Some(self.get_room(room_id, &tx).await?)
 176            } else {
 177                None
 178            };
 179
 180            Ok((room, guest_connection_ids))
 181        })
 182        .await
 183    }
 184
 185    pub(in crate::db) async fn update_project_worktrees(
 186        &self,
 187        project_id: ProjectId,
 188        worktrees: &[proto::WorktreeMetadata],
 189        tx: &DatabaseTransaction,
 190    ) -> Result<()> {
 191        if !worktrees.is_empty() {
 192            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 193                id: ActiveValue::set(worktree.id as i64),
 194                project_id: ActiveValue::set(project_id),
 195                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 196                root_name: ActiveValue::set(worktree.root_name.clone()),
 197                visible: ActiveValue::set(worktree.visible),
 198                scan_id: ActiveValue::set(0),
 199                completed_scan_id: ActiveValue::set(0),
 200            }))
 201            .on_conflict(
 202                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 203                    .update_column(worktree::Column::RootName)
 204                    .to_owned(),
 205            )
 206            .exec(tx)
 207            .await?;
 208        }
 209
 210        worktree::Entity::delete_many()
 211            .filter(worktree::Column::ProjectId.eq(project_id).and(
 212                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 213            ))
 214            .exec(tx)
 215            .await?;
 216
 217        Ok(())
 218    }
 219
 220    pub async fn update_worktree(
 221        &self,
 222        update: &proto::UpdateWorktree,
 223        connection: ConnectionId,
 224    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 225        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 226            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 227        {
 228            return Err(anyhow!(
 229                "invalid worktree update. removed entries: {}, updated entries: {}",
 230                update.removed_entries.len(),
 231                update.updated_entries.len()
 232            ))?;
 233        }
 234
 235        let project_id = ProjectId::from_proto(update.project_id);
 236        let worktree_id = update.worktree_id as i64;
 237        self.project_transaction(project_id, |tx| async move {
 238            // Ensure the update comes from the host.
 239            let _project = project::Entity::find_by_id(project_id)
 240                .filter(
 241                    Condition::all()
 242                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 243                        .add(
 244                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 245                        ),
 246                )
 247                .one(&*tx)
 248                .await?
 249                .with_context(|| format!("no such project: {project_id}"))?;
 250
 251            // Update metadata.
 252            worktree::Entity::update(worktree::ActiveModel {
 253                id: ActiveValue::set(worktree_id),
 254                project_id: ActiveValue::set(project_id),
 255                root_name: ActiveValue::set(update.root_name.clone()),
 256                scan_id: ActiveValue::set(update.scan_id as i64),
 257                completed_scan_id: if update.is_last_update {
 258                    ActiveValue::set(update.scan_id as i64)
 259                } else {
 260                    ActiveValue::default()
 261                },
 262                abs_path: ActiveValue::set(update.abs_path.clone()),
 263                ..Default::default()
 264            })
 265            .exec(&*tx)
 266            .await?;
 267
 268            if !update.updated_entries.is_empty() {
 269                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 270                    let mtime = entry.mtime.clone().unwrap_or_default();
 271                    worktree_entry::ActiveModel {
 272                        project_id: ActiveValue::set(project_id),
 273                        worktree_id: ActiveValue::set(worktree_id),
 274                        id: ActiveValue::set(entry.id as i64),
 275                        is_dir: ActiveValue::set(entry.is_dir),
 276                        path: ActiveValue::set(entry.path.clone()),
 277                        inode: ActiveValue::set(entry.inode as i64),
 278                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 279                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 280                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 281                        is_ignored: ActiveValue::set(entry.is_ignored),
 282                        git_status: ActiveValue::set(None),
 283                        is_external: ActiveValue::set(entry.is_external),
 284                        is_deleted: ActiveValue::set(false),
 285                        is_hidden: ActiveValue::set(entry.is_hidden),
 286                        scan_id: ActiveValue::set(update.scan_id as i64),
 287                        is_fifo: ActiveValue::set(entry.is_fifo),
 288                    }
 289                }))
 290                .on_conflict(
 291                    OnConflict::columns([
 292                        worktree_entry::Column::ProjectId,
 293                        worktree_entry::Column::WorktreeId,
 294                        worktree_entry::Column::Id,
 295                    ])
 296                    .update_columns([
 297                        worktree_entry::Column::IsDir,
 298                        worktree_entry::Column::Path,
 299                        worktree_entry::Column::Inode,
 300                        worktree_entry::Column::MtimeSeconds,
 301                        worktree_entry::Column::MtimeNanos,
 302                        worktree_entry::Column::CanonicalPath,
 303                        worktree_entry::Column::IsIgnored,
 304                        worktree_entry::Column::IsHidden,
 305                        worktree_entry::Column::ScanId,
 306                    ])
 307                    .to_owned(),
 308                )
 309                .exec(&*tx)
 310                .await?;
 311            }
 312
 313            if !update.removed_entries.is_empty() {
 314                worktree_entry::Entity::update_many()
 315                    .filter(
 316                        worktree_entry::Column::ProjectId
 317                            .eq(project_id)
 318                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 319                            .and(
 320                                worktree_entry::Column::Id
 321                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 322                            ),
 323                    )
 324                    .set(worktree_entry::ActiveModel {
 325                        is_deleted: ActiveValue::Set(true),
 326                        scan_id: ActiveValue::Set(update.scan_id as i64),
 327                        ..Default::default()
 328                    })
 329                    .exec(&*tx)
 330                    .await?;
 331            }
 332
 333            // Backward-compatibility for old Zed clients.
 334            //
 335            // Remove this block when Zed 1.80 stable has been out for a week.
 336            {
 337                if !update.updated_repositories.is_empty() {
 338                    project_repository::Entity::insert_many(
 339                        update.updated_repositories.iter().map(|repository| {
 340                            project_repository::ActiveModel {
 341                                project_id: ActiveValue::set(project_id),
 342                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 343                                id: ActiveValue::set(repository.repository_id as i64),
 344                                scan_id: ActiveValue::set(update.scan_id as i64),
 345                                is_deleted: ActiveValue::set(false),
 346                                branch_summary: ActiveValue::Set(
 347                                    repository
 348                                        .branch_summary
 349                                        .as_ref()
 350                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 351                                ),
 352                                current_merge_conflicts: ActiveValue::Set(Some(
 353                                    serde_json::to_string(&repository.current_merge_conflicts)
 354                                        .unwrap(),
 355                                )),
 356                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 357                                abs_path: ActiveValue::set(String::new()),
 358                                entry_ids: ActiveValue::set("[]".into()),
 359                                head_commit_details: ActiveValue::set(None),
 360                                merge_message: ActiveValue::set(None),
 361                            }
 362                        }),
 363                    )
 364                    .on_conflict(
 365                        OnConflict::columns([
 366                            project_repository::Column::ProjectId,
 367                            project_repository::Column::Id,
 368                        ])
 369                        .update_columns([
 370                            project_repository::Column::ScanId,
 371                            project_repository::Column::BranchSummary,
 372                            project_repository::Column::CurrentMergeConflicts,
 373                        ])
 374                        .to_owned(),
 375                    )
 376                    .exec(&*tx)
 377                    .await?;
 378
 379                    let has_any_statuses = update
 380                        .updated_repositories
 381                        .iter()
 382                        .any(|repository| !repository.updated_statuses.is_empty());
 383
 384                    if has_any_statuses {
 385                        project_repository_statuses::Entity::insert_many(
 386                            update.updated_repositories.iter().flat_map(
 387                                |repository: &proto::RepositoryEntry| {
 388                                    repository.updated_statuses.iter().map(|status_entry| {
 389                                        let (repo_path, status_kind, first_status, second_status) =
 390                                            proto_status_to_db(status_entry.clone());
 391                                        project_repository_statuses::ActiveModel {
 392                                            project_id: ActiveValue::set(project_id),
 393                                            repository_id: ActiveValue::set(
 394                                                repository.repository_id as i64,
 395                                            ),
 396                                            scan_id: ActiveValue::set(update.scan_id as i64),
 397                                            is_deleted: ActiveValue::set(false),
 398                                            repo_path: ActiveValue::set(repo_path),
 399                                            status: ActiveValue::set(0),
 400                                            status_kind: ActiveValue::set(status_kind),
 401                                            first_status: ActiveValue::set(first_status),
 402                                            second_status: ActiveValue::set(second_status),
 403                                        }
 404                                    })
 405                                },
 406                            ),
 407                        )
 408                        .on_conflict(
 409                            OnConflict::columns([
 410                                project_repository_statuses::Column::ProjectId,
 411                                project_repository_statuses::Column::RepositoryId,
 412                                project_repository_statuses::Column::RepoPath,
 413                            ])
 414                            .update_columns([
 415                                project_repository_statuses::Column::ScanId,
 416                                project_repository_statuses::Column::StatusKind,
 417                                project_repository_statuses::Column::FirstStatus,
 418                                project_repository_statuses::Column::SecondStatus,
 419                            ])
 420                            .to_owned(),
 421                        )
 422                        .exec(&*tx)
 423                        .await?;
 424                    }
 425
 426                    for repo in &update.updated_repositories {
 427                        if !repo.removed_statuses.is_empty() {
 428                            project_repository_statuses::Entity::update_many()
 429                                .filter(
 430                                    project_repository_statuses::Column::ProjectId
 431                                        .eq(project_id)
 432                                        .and(
 433                                            project_repository_statuses::Column::RepositoryId
 434                                                .eq(repo.repository_id),
 435                                        )
 436                                        .and(
 437                                            project_repository_statuses::Column::RepoPath
 438                                                .is_in(repo.removed_statuses.iter()),
 439                                        ),
 440                                )
 441                                .set(project_repository_statuses::ActiveModel {
 442                                    is_deleted: ActiveValue::Set(true),
 443                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 444                                    ..Default::default()
 445                                })
 446                                .exec(&*tx)
 447                                .await?;
 448                        }
 449                    }
 450                }
 451
 452                if !update.removed_repositories.is_empty() {
 453                    project_repository::Entity::update_many()
 454                        .filter(
 455                            project_repository::Column::ProjectId
 456                                .eq(project_id)
 457                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 458                                .and(project_repository::Column::Id.is_in(
 459                                    update.removed_repositories.iter().map(|id| *id as i64),
 460                                )),
 461                        )
 462                        .set(project_repository::ActiveModel {
 463                            is_deleted: ActiveValue::Set(true),
 464                            scan_id: ActiveValue::Set(update.scan_id as i64),
 465                            ..Default::default()
 466                        })
 467                        .exec(&*tx)
 468                        .await?;
 469                }
 470            }
 471
 472            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 473            Ok(connection_ids)
 474        })
 475        .await
 476    }
 477
 478    pub async fn update_repository(
 479        &self,
 480        update: &proto::UpdateRepository,
 481        _connection: ConnectionId,
 482    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 483        let project_id = ProjectId::from_proto(update.project_id);
 484        let repository_id = update.id as i64;
 485        self.project_transaction(project_id, |tx| async move {
 486            project_repository::Entity::insert(project_repository::ActiveModel {
 487                project_id: ActiveValue::set(project_id),
 488                id: ActiveValue::set(repository_id),
 489                legacy_worktree_id: ActiveValue::set(None),
 490                abs_path: ActiveValue::set(update.abs_path.clone()),
 491                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 492                scan_id: ActiveValue::set(update.scan_id as i64),
 493                is_deleted: ActiveValue::set(false),
 494                branch_summary: ActiveValue::Set(
 495                    update
 496                        .branch_summary
 497                        .as_ref()
 498                        .map(|summary| serde_json::to_string(summary).unwrap()),
 499                ),
 500                head_commit_details: ActiveValue::Set(
 501                    update
 502                        .head_commit_details
 503                        .as_ref()
 504                        .map(|details| serde_json::to_string(details).unwrap()),
 505                ),
 506                current_merge_conflicts: ActiveValue::Set(Some(
 507                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 508                )),
 509                merge_message: ActiveValue::set(update.merge_message.clone()),
 510            })
 511            .on_conflict(
 512                OnConflict::columns([
 513                    project_repository::Column::ProjectId,
 514                    project_repository::Column::Id,
 515                ])
 516                .update_columns([
 517                    project_repository::Column::ScanId,
 518                    project_repository::Column::BranchSummary,
 519                    project_repository::Column::EntryIds,
 520                    project_repository::Column::AbsPath,
 521                    project_repository::Column::CurrentMergeConflicts,
 522                    project_repository::Column::HeadCommitDetails,
 523                    project_repository::Column::MergeMessage,
 524                ])
 525                .to_owned(),
 526            )
 527            .exec(&*tx)
 528            .await?;
 529
 530            let has_any_statuses = !update.updated_statuses.is_empty();
 531
 532            if has_any_statuses {
 533                project_repository_statuses::Entity::insert_many(
 534                    update.updated_statuses.iter().map(|status_entry| {
 535                        let (repo_path, status_kind, first_status, second_status) =
 536                            proto_status_to_db(status_entry.clone());
 537                        project_repository_statuses::ActiveModel {
 538                            project_id: ActiveValue::set(project_id),
 539                            repository_id: ActiveValue::set(repository_id),
 540                            scan_id: ActiveValue::set(update.scan_id as i64),
 541                            is_deleted: ActiveValue::set(false),
 542                            repo_path: ActiveValue::set(repo_path),
 543                            status: ActiveValue::set(0),
 544                            status_kind: ActiveValue::set(status_kind),
 545                            first_status: ActiveValue::set(first_status),
 546                            second_status: ActiveValue::set(second_status),
 547                        }
 548                    }),
 549                )
 550                .on_conflict(
 551                    OnConflict::columns([
 552                        project_repository_statuses::Column::ProjectId,
 553                        project_repository_statuses::Column::RepositoryId,
 554                        project_repository_statuses::Column::RepoPath,
 555                    ])
 556                    .update_columns([
 557                        project_repository_statuses::Column::ScanId,
 558                        project_repository_statuses::Column::StatusKind,
 559                        project_repository_statuses::Column::FirstStatus,
 560                        project_repository_statuses::Column::SecondStatus,
 561                    ])
 562                    .to_owned(),
 563                )
 564                .exec(&*tx)
 565                .await?;
 566            }
 567
 568            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 569
 570            if has_any_removed_statuses {
 571                project_repository_statuses::Entity::update_many()
 572                    .filter(
 573                        project_repository_statuses::Column::ProjectId
 574                            .eq(project_id)
 575                            .and(
 576                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 577                            )
 578                            .and(
 579                                project_repository_statuses::Column::RepoPath
 580                                    .is_in(update.removed_statuses.iter()),
 581                            ),
 582                    )
 583                    .set(project_repository_statuses::ActiveModel {
 584                        is_deleted: ActiveValue::Set(true),
 585                        scan_id: ActiveValue::Set(update.scan_id as i64),
 586                        ..Default::default()
 587                    })
 588                    .exec(&*tx)
 589                    .await?;
 590            }
 591
 592            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 593            Ok(connection_ids)
 594        })
 595        .await
 596    }
 597
 598    pub async fn remove_repository(
 599        &self,
 600        remove: &proto::RemoveRepository,
 601        _connection: ConnectionId,
 602    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 603        let project_id = ProjectId::from_proto(remove.project_id);
 604        let repository_id = remove.id as i64;
 605        self.project_transaction(project_id, |tx| async move {
 606            project_repository::Entity::update_many()
 607                .filter(
 608                    project_repository::Column::ProjectId
 609                        .eq(project_id)
 610                        .and(project_repository::Column::Id.eq(repository_id)),
 611                )
 612                .set(project_repository::ActiveModel {
 613                    is_deleted: ActiveValue::Set(true),
 614                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 615                    ..Default::default()
 616                })
 617                .exec(&*tx)
 618                .await?;
 619
 620            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 621            Ok(connection_ids)
 622        })
 623        .await
 624    }
 625
 626    /// Updates the diagnostic summary for the given connection.
 627    pub async fn update_diagnostic_summary(
 628        &self,
 629        update: &proto::UpdateDiagnosticSummary,
 630        connection: ConnectionId,
 631    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 632        let project_id = ProjectId::from_proto(update.project_id);
 633        let worktree_id = update.worktree_id as i64;
 634        self.project_transaction(project_id, |tx| async move {
 635            let summary = update.summary.as_ref().context("invalid summary")?;
 636
 637            // Ensure the update comes from the host.
 638            let project = project::Entity::find_by_id(project_id)
 639                .one(&*tx)
 640                .await?
 641                .context("no such project")?;
 642            if project.host_connection()? != connection {
 643                return Err(anyhow!("can't update a project hosted by someone else"))?;
 644            }
 645
 646            // Update summary.
 647            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 648                project_id: ActiveValue::set(project_id),
 649                worktree_id: ActiveValue::set(worktree_id),
 650                path: ActiveValue::set(summary.path.clone()),
 651                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 652                error_count: ActiveValue::set(summary.error_count as i32),
 653                warning_count: ActiveValue::set(summary.warning_count as i32),
 654            })
 655            .on_conflict(
 656                OnConflict::columns([
 657                    worktree_diagnostic_summary::Column::ProjectId,
 658                    worktree_diagnostic_summary::Column::WorktreeId,
 659                    worktree_diagnostic_summary::Column::Path,
 660                ])
 661                .update_columns([
 662                    worktree_diagnostic_summary::Column::LanguageServerId,
 663                    worktree_diagnostic_summary::Column::ErrorCount,
 664                    worktree_diagnostic_summary::Column::WarningCount,
 665                ])
 666                .to_owned(),
 667            )
 668            .exec(&*tx)
 669            .await?;
 670
 671            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 672            Ok(connection_ids)
 673        })
 674        .await
 675    }
 676
 677    /// Starts the language server for the given connection.
 678    pub async fn start_language_server(
 679        &self,
 680        update: &proto::StartLanguageServer,
 681        connection: ConnectionId,
 682    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 683        let project_id = ProjectId::from_proto(update.project_id);
 684        self.project_transaction(project_id, |tx| async move {
 685            let server = update.server.as_ref().context("invalid language server")?;
 686
 687            // Ensure the update comes from the host.
 688            let project = project::Entity::find_by_id(project_id)
 689                .one(&*tx)
 690                .await?
 691                .context("no such project")?;
 692            if project.host_connection()? != connection {
 693                return Err(anyhow!("can't update a project hosted by someone else"))?;
 694            }
 695
 696            // Add the newly-started language server.
 697            language_server::Entity::insert(language_server::ActiveModel {
 698                project_id: ActiveValue::set(project_id),
 699                id: ActiveValue::set(server.id as i64),
 700                name: ActiveValue::set(server.name.clone()),
 701                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 702                capabilities: ActiveValue::set(update.capabilities.clone()),
 703            })
 704            .on_conflict(
 705                OnConflict::columns([
 706                    language_server::Column::ProjectId,
 707                    language_server::Column::Id,
 708                ])
 709                .update_columns([
 710                    language_server::Column::Name,
 711                    language_server::Column::Capabilities,
 712                    language_server::Column::WorktreeId,
 713                ])
 714                .to_owned(),
 715            )
 716            .exec(&*tx)
 717            .await?;
 718
 719            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 720            Ok(connection_ids)
 721        })
 722        .await
 723    }
 724
 725    /// Updates the worktree settings for the given connection.
 726    pub async fn update_worktree_settings(
 727        &self,
 728        update: &proto::UpdateWorktreeSettings,
 729        connection: ConnectionId,
 730    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 731        let project_id = ProjectId::from_proto(update.project_id);
 732        let kind = match update.kind {
 733            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 734                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 735            None => proto::LocalSettingsKind::Settings,
 736        };
 737        let kind = LocalSettingsKind::from_proto(kind);
 738        self.project_transaction(project_id, |tx| async move {
 739            // Ensure the update comes from the host.
 740            let project = project::Entity::find_by_id(project_id)
 741                .one(&*tx)
 742                .await?
 743                .context("no such project")?;
 744            if project.host_connection()? != connection {
 745                return Err(anyhow!("can't update a project hosted by someone else"))?;
 746            }
 747
 748            if let Some(content) = &update.content {
 749                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 750                    project_id: ActiveValue::Set(project_id),
 751                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 752                    path: ActiveValue::Set(update.path.clone()),
 753                    content: ActiveValue::Set(content.clone()),
 754                    kind: ActiveValue::Set(kind),
 755                })
 756                .on_conflict(
 757                    OnConflict::columns([
 758                        worktree_settings_file::Column::ProjectId,
 759                        worktree_settings_file::Column::WorktreeId,
 760                        worktree_settings_file::Column::Path,
 761                    ])
 762                    .update_column(worktree_settings_file::Column::Content)
 763                    .to_owned(),
 764                )
 765                .exec(&*tx)
 766                .await?;
 767            } else {
 768                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 769                    project_id: ActiveValue::Set(project_id),
 770                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 771                    path: ActiveValue::Set(update.path.clone()),
 772                    ..Default::default()
 773                })
 774                .exec(&*tx)
 775                .await?;
 776            }
 777
 778            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 779            Ok(connection_ids)
 780        })
 781        .await
 782    }
 783
 784    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 785        self.transaction(|tx| async move {
 786            Ok(project::Entity::find_by_id(id)
 787                .one(&*tx)
 788                .await?
 789                .context("no such project")?)
 790        })
 791        .await
 792    }
 793
 794    /// Adds the given connection to the specified project
 795    /// in the current room.
 796    pub async fn join_project(
 797        &self,
 798        project_id: ProjectId,
 799        connection: ConnectionId,
 800        user_id: UserId,
 801        committer_name: Option<String>,
 802        committer_email: Option<String>,
 803    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 804        self.project_transaction(project_id, move |tx| {
 805            let committer_name = committer_name.clone();
 806            let committer_email = committer_email.clone();
 807            async move {
 808                let (project, role) = self
 809                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 810                    .await?;
 811                self.join_project_internal(
 812                    project,
 813                    user_id,
 814                    committer_name,
 815                    committer_email,
 816                    connection,
 817                    role,
 818                    &tx,
 819                )
 820                .await
 821            }
 822        })
 823        .await
 824    }
 825
 826    async fn join_project_internal(
 827        &self,
 828        project: project::Model,
 829        user_id: UserId,
 830        committer_name: Option<String>,
 831        committer_email: Option<String>,
 832        connection: ConnectionId,
 833        role: ChannelRole,
 834        tx: &DatabaseTransaction,
 835    ) -> Result<(Project, ReplicaId)> {
 836        let mut collaborators = project
 837            .find_related(project_collaborator::Entity)
 838            .all(tx)
 839            .await?;
 840        let replica_ids = collaborators
 841            .iter()
 842            .map(|c| c.replica_id)
 843            .collect::<HashSet<_>>();
 844        let mut replica_id = ReplicaId(1);
 845        while replica_ids.contains(&replica_id) {
 846            replica_id.0 += 1;
 847        }
 848        let new_collaborator = project_collaborator::ActiveModel {
 849            project_id: ActiveValue::set(project.id),
 850            connection_id: ActiveValue::set(connection.id as i32),
 851            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 852            user_id: ActiveValue::set(user_id),
 853            replica_id: ActiveValue::set(replica_id),
 854            is_host: ActiveValue::set(false),
 855            id: ActiveValue::NotSet,
 856            committer_name: ActiveValue::set(committer_name),
 857            committer_email: ActiveValue::set(committer_email),
 858        }
 859        .insert(tx)
 860        .await?;
 861        collaborators.push(new_collaborator);
 862
 863        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 864        let mut worktrees = db_worktrees
 865            .into_iter()
 866            .map(|db_worktree| {
 867                (
 868                    db_worktree.id as u64,
 869                    Worktree {
 870                        id: db_worktree.id as u64,
 871                        abs_path: db_worktree.abs_path,
 872                        root_name: db_worktree.root_name,
 873                        visible: db_worktree.visible,
 874                        entries: Default::default(),
 875                        diagnostic_summaries: Default::default(),
 876                        settings_files: Default::default(),
 877                        scan_id: db_worktree.scan_id as u64,
 878                        completed_scan_id: db_worktree.completed_scan_id as u64,
 879                        legacy_repository_entries: Default::default(),
 880                    },
 881                )
 882            })
 883            .collect::<BTreeMap<_, _>>();
 884
 885        // Populate worktree entries.
 886        {
 887            let mut db_entries = worktree_entry::Entity::find()
 888                .filter(
 889                    Condition::all()
 890                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 891                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 892                )
 893                .stream(tx)
 894                .await?;
 895            while let Some(db_entry) = db_entries.next().await {
 896                let db_entry = db_entry?;
 897                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 898                    worktree.entries.push(proto::Entry {
 899                        id: db_entry.id as u64,
 900                        is_dir: db_entry.is_dir,
 901                        path: db_entry.path,
 902                        inode: db_entry.inode as u64,
 903                        mtime: Some(proto::Timestamp {
 904                            seconds: db_entry.mtime_seconds as u64,
 905                            nanos: db_entry.mtime_nanos as u32,
 906                        }),
 907                        canonical_path: db_entry.canonical_path,
 908                        is_ignored: db_entry.is_ignored,
 909                        is_external: db_entry.is_external,
 910                        is_hidden: db_entry.is_hidden,
 911                        // This is only used in the summarization backlog, so if it's None,
 912                        // that just means we won't be able to detect when to resummarize
 913                        // based on total number of backlogged bytes - instead, we'd go
 914                        // on number of files only. That shouldn't be a huge deal in practice.
 915                        size: None,
 916                        is_fifo: db_entry.is_fifo,
 917                    });
 918                }
 919            }
 920        }
 921
 922        // Populate repository entries.
 923        let mut repositories = Vec::new();
 924        {
 925            let db_repository_entries = project_repository::Entity::find()
 926                .filter(
 927                    Condition::all()
 928                        .add(project_repository::Column::ProjectId.eq(project.id))
 929                        .add(project_repository::Column::IsDeleted.eq(false)),
 930                )
 931                .all(tx)
 932                .await?;
 933            for db_repository_entry in db_repository_entries {
 934                let mut repository_statuses = project_repository_statuses::Entity::find()
 935                    .filter(
 936                        Condition::all()
 937                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 938                            .add(
 939                                project_repository_statuses::Column::RepositoryId
 940                                    .eq(db_repository_entry.id),
 941                            )
 942                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 943                    )
 944                    .stream(tx)
 945                    .await?;
 946                let mut updated_statuses = Vec::new();
 947                while let Some(status_entry) = repository_statuses.next().await {
 948                    let status_entry = status_entry?;
 949                    updated_statuses.push(db_status_to_proto(status_entry)?);
 950                }
 951
 952                let current_merge_conflicts = db_repository_entry
 953                    .current_merge_conflicts
 954                    .as_ref()
 955                    .map(|conflicts| serde_json::from_str(conflicts))
 956                    .transpose()?
 957                    .unwrap_or_default();
 958
 959                let branch_summary = db_repository_entry
 960                    .branch_summary
 961                    .as_ref()
 962                    .map(|branch_summary| serde_json::from_str(branch_summary))
 963                    .transpose()?
 964                    .unwrap_or_default();
 965
 966                let head_commit_details = db_repository_entry
 967                    .head_commit_details
 968                    .as_ref()
 969                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 970                    .transpose()?
 971                    .unwrap_or_default();
 972
 973                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 974                    .context("failed to deserialize repository's entry ids")?;
 975
 976                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 977                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 978                        worktree.legacy_repository_entries.insert(
 979                            db_repository_entry.id as u64,
 980                            proto::RepositoryEntry {
 981                                repository_id: db_repository_entry.id as u64,
 982                                updated_statuses,
 983                                removed_statuses: Vec::new(),
 984                                current_merge_conflicts,
 985                                branch_summary,
 986                            },
 987                        );
 988                    }
 989                } else {
 990                    repositories.push(proto::UpdateRepository {
 991                        project_id: db_repository_entry.project_id.0 as u64,
 992                        id: db_repository_entry.id as u64,
 993                        abs_path: db_repository_entry.abs_path,
 994                        entry_ids,
 995                        updated_statuses,
 996                        removed_statuses: Vec::new(),
 997                        current_merge_conflicts,
 998                        branch_summary,
 999                        head_commit_details,
1000                        scan_id: db_repository_entry.scan_id as u64,
1001                        is_last_update: true,
1002                        merge_message: db_repository_entry.merge_message,
1003                        stash_entries: Vec::new(),
1004                    });
1005                }
1006            }
1007        }
1008
1009        // Populate worktree diagnostic summaries.
1010        {
1011            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1012                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1013                .stream(tx)
1014                .await?;
1015            while let Some(db_summary) = db_summaries.next().await {
1016                let db_summary = db_summary?;
1017                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1018                    worktree
1019                        .diagnostic_summaries
1020                        .push(proto::DiagnosticSummary {
1021                            path: db_summary.path,
1022                            language_server_id: db_summary.language_server_id as u64,
1023                            error_count: db_summary.error_count as u32,
1024                            warning_count: db_summary.warning_count as u32,
1025                        });
1026                }
1027            }
1028        }
1029
1030        // Populate worktree settings files
1031        {
1032            let mut db_settings_files = worktree_settings_file::Entity::find()
1033                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1034                .stream(tx)
1035                .await?;
1036            while let Some(db_settings_file) = db_settings_files.next().await {
1037                let db_settings_file = db_settings_file?;
1038                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1039                    worktree.settings_files.push(WorktreeSettingsFile {
1040                        path: db_settings_file.path,
1041                        content: db_settings_file.content,
1042                        kind: db_settings_file.kind,
1043                    });
1044                }
1045            }
1046        }
1047
1048        // Populate language servers.
1049        let language_servers = project
1050            .find_related(language_server::Entity)
1051            .all(tx)
1052            .await?;
1053
1054        let path_style = if project.windows_paths {
1055            PathStyle::Windows
1056        } else {
1057            PathStyle::Posix
1058        };
1059
1060        let project = Project {
1061            id: project.id,
1062            role,
1063            collaborators: collaborators
1064                .into_iter()
1065                .map(|collaborator| ProjectCollaborator {
1066                    connection_id: collaborator.connection(),
1067                    user_id: collaborator.user_id,
1068                    replica_id: collaborator.replica_id,
1069                    is_host: collaborator.is_host,
1070                    committer_name: collaborator.committer_name,
1071                    committer_email: collaborator.committer_email,
1072                })
1073                .collect(),
1074            worktrees,
1075            repositories,
1076            language_servers: language_servers
1077                .into_iter()
1078                .map(|language_server| LanguageServer {
1079                    server: proto::LanguageServer {
1080                        id: language_server.id as u64,
1081                        name: language_server.name,
1082                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1083                    },
1084                    capabilities: language_server.capabilities,
1085                })
1086                .collect(),
1087            path_style,
1088        };
1089        Ok((project, replica_id as ReplicaId))
1090    }
1091
1092    /// Removes the given connection from the specified project.
1093    pub async fn leave_project(
1094        &self,
1095        project_id: ProjectId,
1096        connection: ConnectionId,
1097    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1098        self.project_transaction(project_id, |tx| async move {
1099            let result = project_collaborator::Entity::delete_many()
1100                .filter(
1101                    Condition::all()
1102                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1103                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1104                        .add(
1105                            project_collaborator::Column::ConnectionServerId
1106                                .eq(connection.owner_id as i32),
1107                        ),
1108                )
1109                .exec(&*tx)
1110                .await?;
1111            if result.rows_affected == 0 {
1112                Err(anyhow!("not a collaborator on this project"))?;
1113            }
1114
1115            let project = project::Entity::find_by_id(project_id)
1116                .one(&*tx)
1117                .await?
1118                .context("no such project")?;
1119            let collaborators = project
1120                .find_related(project_collaborator::Entity)
1121                .all(&*tx)
1122                .await?;
1123            let connection_ids: Vec<ConnectionId> = collaborators
1124                .into_iter()
1125                .map(|collaborator| collaborator.connection())
1126                .collect();
1127
1128            follower::Entity::delete_many()
1129                .filter(
1130                    Condition::any()
1131                        .add(
1132                            Condition::all()
1133                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1134                                .add(
1135                                    follower::Column::LeaderConnectionServerId
1136                                        .eq(connection.owner_id),
1137                                )
1138                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1139                        )
1140                        .add(
1141                            Condition::all()
1142                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1143                                .add(
1144                                    follower::Column::FollowerConnectionServerId
1145                                        .eq(connection.owner_id),
1146                                )
1147                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1148                        ),
1149                )
1150                .exec(&*tx)
1151                .await?;
1152
1153            let room = if let Some(room_id) = project.room_id {
1154                Some(self.get_room(room_id, &tx).await?)
1155            } else {
1156                None
1157            };
1158
1159            let left_project = LeftProject {
1160                id: project_id,
1161                should_unshare: connection == project.host_connection()?,
1162                connection_ids,
1163            };
1164            Ok((room, left_project))
1165        })
1166        .await
1167    }
1168
1169    pub async fn check_user_is_project_host(
1170        &self,
1171        project_id: ProjectId,
1172        connection_id: ConnectionId,
1173    ) -> Result<()> {
1174        self.project_transaction(project_id, |tx| async move {
1175            project::Entity::find()
1176                .filter(
1177                    Condition::all()
1178                        .add(project::Column::Id.eq(project_id))
1179                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1180                        .add(
1181                            project::Column::HostConnectionServerId
1182                                .eq(Some(connection_id.owner_id as i32)),
1183                        ),
1184                )
1185                .one(&*tx)
1186                .await?
1187                .context("failed to read project host")?;
1188
1189            Ok(())
1190        })
1191        .await
1192        .map(|guard| guard.into_inner())
1193    }
1194
1195    /// Returns the current project if the given user is authorized to access it with the specified capability.
1196    pub async fn access_project(
1197        &self,
1198        project_id: ProjectId,
1199        connection_id: ConnectionId,
1200        capability: Capability,
1201        tx: &DatabaseTransaction,
1202    ) -> Result<(project::Model, ChannelRole)> {
1203        let project = project::Entity::find_by_id(project_id)
1204            .one(tx)
1205            .await?
1206            .context("no such project")?;
1207
1208        let role_from_room = if let Some(room_id) = project.room_id {
1209            room_participant::Entity::find()
1210                .filter(room_participant::Column::RoomId.eq(room_id))
1211                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1212                .one(tx)
1213                .await?
1214                .and_then(|participant| participant.role)
1215        } else {
1216            None
1217        };
1218
1219        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1220
1221        match capability {
1222            Capability::ReadWrite => {
1223                if !role.can_edit_projects() {
1224                    return Err(anyhow!("not authorized to edit projects"))?;
1225                }
1226            }
1227            Capability::ReadOnly => {
1228                if !role.can_read_projects() {
1229                    return Err(anyhow!("not authorized to read projects"))?;
1230                }
1231            }
1232        }
1233
1234        Ok((project, role))
1235    }
1236
1237    /// Returns the host connection for a read-only request to join a shared project.
1238    pub async fn host_for_read_only_project_request(
1239        &self,
1240        project_id: ProjectId,
1241        connection_id: ConnectionId,
1242    ) -> Result<ConnectionId> {
1243        self.project_transaction(project_id, |tx| async move {
1244            let (project, _) = self
1245                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1246                .await?;
1247            project.host_connection()
1248        })
1249        .await
1250        .map(|guard| guard.into_inner())
1251    }
1252
1253    /// Returns the host connection for a request to join a shared project.
1254    pub async fn host_for_mutating_project_request(
1255        &self,
1256        project_id: ProjectId,
1257        connection_id: ConnectionId,
1258    ) -> Result<ConnectionId> {
1259        self.project_transaction(project_id, |tx| async move {
1260            let (project, _) = self
1261                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1262                .await?;
1263            project.host_connection()
1264        })
1265        .await
1266        .map(|guard| guard.into_inner())
1267    }
1268
1269    pub async fn connections_for_buffer_update(
1270        &self,
1271        project_id: ProjectId,
1272        connection_id: ConnectionId,
1273        capability: Capability,
1274    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1275        self.project_transaction(project_id, |tx| async move {
1276            // Authorize
1277            let (project, _) = self
1278                .access_project(project_id, connection_id, capability, &tx)
1279                .await?;
1280
1281            let host_connection_id = project.host_connection()?;
1282
1283            let collaborators = project_collaborator::Entity::find()
1284                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1285                .all(&*tx)
1286                .await?;
1287
1288            let guest_connection_ids = collaborators
1289                .into_iter()
1290                .filter_map(|collaborator| {
1291                    if collaborator.is_host {
1292                        None
1293                    } else {
1294                        Some(collaborator.connection())
1295                    }
1296                })
1297                .collect();
1298
1299            Ok((host_connection_id, guest_connection_ids))
1300        })
1301        .await
1302    }
1303
1304    /// Returns the connection IDs in the given project.
1305    ///
1306    /// The provided `connection_id` must also be a collaborator in the project,
1307    /// otherwise an error will be returned.
1308    pub async fn project_connection_ids(
1309        &self,
1310        project_id: ProjectId,
1311        connection_id: ConnectionId,
1312        exclude_dev_server: bool,
1313    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1314        self.project_transaction(project_id, |tx| async move {
1315            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1316                .await
1317        })
1318        .await
1319    }
1320
1321    async fn internal_project_connection_ids(
1322        &self,
1323        project_id: ProjectId,
1324        connection_id: ConnectionId,
1325        exclude_dev_server: bool,
1326        tx: &DatabaseTransaction,
1327    ) -> Result<HashSet<ConnectionId>> {
1328        let project = project::Entity::find_by_id(project_id)
1329            .one(tx)
1330            .await?
1331            .context("no such project")?;
1332
1333        let mut collaborators = project_collaborator::Entity::find()
1334            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1335            .stream(tx)
1336            .await?;
1337
1338        let mut connection_ids = HashSet::default();
1339        if let Some(host_connection) = project.host_connection().log_err()
1340            && !exclude_dev_server
1341        {
1342            connection_ids.insert(host_connection);
1343        }
1344
1345        while let Some(collaborator) = collaborators.next().await {
1346            let collaborator = collaborator?;
1347            connection_ids.insert(collaborator.connection());
1348        }
1349
1350        if connection_ids.contains(&connection_id)
1351            || Some(connection_id) == project.host_connection().ok()
1352        {
1353            Ok(connection_ids)
1354        } else {
1355            Err(anyhow!(
1356                "can only send project updates to a project you're in"
1357            ))?
1358        }
1359    }
1360
1361    async fn project_guest_connection_ids(
1362        &self,
1363        project_id: ProjectId,
1364        tx: &DatabaseTransaction,
1365    ) -> Result<Vec<ConnectionId>> {
1366        let mut collaborators = project_collaborator::Entity::find()
1367            .filter(
1368                project_collaborator::Column::ProjectId
1369                    .eq(project_id)
1370                    .and(project_collaborator::Column::IsHost.eq(false)),
1371            )
1372            .stream(tx)
1373            .await?;
1374
1375        let mut guest_connection_ids = Vec::new();
1376        while let Some(collaborator) = collaborators.next().await {
1377            let collaborator = collaborator?;
1378            guest_connection_ids.push(collaborator.connection());
1379        }
1380        Ok(guest_connection_ids)
1381    }
1382
1383    /// Returns the [`RoomId`] for the given project.
1384    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1385        self.transaction(|tx| async move {
1386            Ok(project::Entity::find_by_id(project_id)
1387                .one(&*tx)
1388                .await?
1389                .and_then(|project| project.room_id))
1390        })
1391        .await
1392    }
1393
1394    pub async fn check_room_participants(
1395        &self,
1396        room_id: RoomId,
1397        leader_id: ConnectionId,
1398        follower_id: ConnectionId,
1399    ) -> Result<()> {
1400        self.transaction(|tx| async move {
1401            use room_participant::Column;
1402
1403            let count = room_participant::Entity::find()
1404                .filter(
1405                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1406                        Condition::any()
1407                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1408                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1409                            ))
1410                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1411                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1412                            )),
1413                    ),
1414                )
1415                .count(&*tx)
1416                .await?;
1417
1418            if count < 2 {
1419                Err(anyhow!("not room participants"))?;
1420            }
1421
1422            Ok(())
1423        })
1424        .await
1425    }
1426
1427    /// Adds the given follower connection as a follower of the given leader connection.
1428    pub async fn follow(
1429        &self,
1430        room_id: RoomId,
1431        project_id: ProjectId,
1432        leader_connection: ConnectionId,
1433        follower_connection: ConnectionId,
1434    ) -> Result<TransactionGuard<proto::Room>> {
1435        self.room_transaction(room_id, |tx| async move {
1436            follower::ActiveModel {
1437                room_id: ActiveValue::set(room_id),
1438                project_id: ActiveValue::set(project_id),
1439                leader_connection_server_id: ActiveValue::set(ServerId(
1440                    leader_connection.owner_id as i32,
1441                )),
1442                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1443                follower_connection_server_id: ActiveValue::set(ServerId(
1444                    follower_connection.owner_id as i32,
1445                )),
1446                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1447                ..Default::default()
1448            }
1449            .insert(&*tx)
1450            .await?;
1451
1452            let room = self.get_room(room_id, &tx).await?;
1453            Ok(room)
1454        })
1455        .await
1456    }
1457
1458    /// Removes the given follower connection as a follower of the given leader connection.
1459    pub async fn unfollow(
1460        &self,
1461        room_id: RoomId,
1462        project_id: ProjectId,
1463        leader_connection: ConnectionId,
1464        follower_connection: ConnectionId,
1465    ) -> Result<TransactionGuard<proto::Room>> {
1466        self.room_transaction(room_id, |tx| async move {
1467            follower::Entity::delete_many()
1468                .filter(
1469                    Condition::all()
1470                        .add(follower::Column::RoomId.eq(room_id))
1471                        .add(follower::Column::ProjectId.eq(project_id))
1472                        .add(
1473                            follower::Column::LeaderConnectionServerId
1474                                .eq(leader_connection.owner_id),
1475                        )
1476                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1477                        .add(
1478                            follower::Column::FollowerConnectionServerId
1479                                .eq(follower_connection.owner_id),
1480                        )
1481                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1482                )
1483                .exec(&*tx)
1484                .await?;
1485
1486            let room = self.get_room(room_id, &tx).await?;
1487            Ok(room)
1488        })
1489        .await
1490    }
1491}