projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project {
  95                clock::ReplicaId::REMOTE_SERVER
  96            } else {
  97                clock::ReplicaId::LOCAL
  98            };
  99
 100            project_collaborator::ActiveModel {
 101                project_id: ActiveValue::set(project.id),
 102                connection_id: ActiveValue::set(connection.id as i32),
 103                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 104                user_id: ActiveValue::set(participant.user_id),
 105                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 106                is_host: ActiveValue::set(true),
 107                id: ActiveValue::NotSet,
 108                committer_name: ActiveValue::Set(None),
 109                committer_email: ActiveValue::Set(None),
 110            }
 111            .insert(&*tx)
 112            .await?;
 113
 114            let room = self.get_room(room_id, &tx).await?;
 115            Ok((project.id, room))
 116        })
 117        .await
 118    }
 119
 120    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 121        self.transaction(|tx| async move {
 122            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 123            Ok(())
 124        })
 125        .await
 126    }
 127
 128    /// Unshares the given project.
 129    pub async fn unshare_project(
 130        &self,
 131        project_id: ProjectId,
 132        connection: ConnectionId,
 133    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 134        self.project_transaction(project_id, |tx| async move {
 135            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 136            let project = project::Entity::find_by_id(project_id)
 137                .one(&*tx)
 138                .await?
 139                .context("project not found")?;
 140            let room = if let Some(room_id) = project.room_id {
 141                Some(self.get_room(room_id, &tx).await?)
 142            } else {
 143                None
 144            };
 145            if project.host_connection()? == connection {
 146                return Ok((true, room, guest_connection_ids));
 147            }
 148            Err(anyhow!("cannot unshare a project hosted by another user"))?
 149        })
 150        .await
 151    }
 152
 153    /// Updates the worktrees associated with the given project.
 154    pub async fn update_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        worktrees: &[proto::WorktreeMetadata],
 159    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let project = project::Entity::find_by_id(project_id)
 162                .filter(
 163                    Condition::all()
 164                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 165                        .add(
 166                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 167                        ),
 168                )
 169                .one(&*tx)
 170                .await?
 171                .context("no such project")?;
 172
 173            self.update_project_worktrees(project.id, worktrees, &tx)
 174                .await?;
 175
 176            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 177
 178            let room = if let Some(room_id) = project.room_id {
 179                Some(self.get_room(room_id, &tx).await?)
 180            } else {
 181                None
 182            };
 183
 184            Ok((room, guest_connection_ids))
 185        })
 186        .await
 187    }
 188
 189    pub(in crate::db) async fn update_project_worktrees(
 190        &self,
 191        project_id: ProjectId,
 192        worktrees: &[proto::WorktreeMetadata],
 193        tx: &DatabaseTransaction,
 194    ) -> Result<()> {
 195        if !worktrees.is_empty() {
 196            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 197                id: ActiveValue::set(worktree.id as i64),
 198                project_id: ActiveValue::set(project_id),
 199                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 200                root_name: ActiveValue::set(worktree.root_name.clone()),
 201                visible: ActiveValue::set(worktree.visible),
 202                scan_id: ActiveValue::set(0),
 203                completed_scan_id: ActiveValue::set(0),
 204            }))
 205            .on_conflict(
 206                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 207                    .update_column(worktree::Column::RootName)
 208                    .to_owned(),
 209            )
 210            .exec(tx)
 211            .await?;
 212        }
 213
 214        worktree::Entity::delete_many()
 215            .filter(worktree::Column::ProjectId.eq(project_id).and(
 216                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 217            ))
 218            .exec(tx)
 219            .await?;
 220
 221        Ok(())
 222    }
 223
 224    pub async fn update_worktree(
 225        &self,
 226        update: &proto::UpdateWorktree,
 227        connection: ConnectionId,
 228    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 229        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 230            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 231        {
 232            return Err(anyhow!(
 233                "invalid worktree update. removed entries: {}, updated entries: {}",
 234                update.removed_entries.len(),
 235                update.updated_entries.len()
 236            ))?;
 237        }
 238
 239        let project_id = ProjectId::from_proto(update.project_id);
 240        let worktree_id = update.worktree_id as i64;
 241        self.project_transaction(project_id, |tx| async move {
 242            // Ensure the update comes from the host.
 243            let _project = project::Entity::find_by_id(project_id)
 244                .filter(
 245                    Condition::all()
 246                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 247                        .add(
 248                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 249                        ),
 250                )
 251                .one(&*tx)
 252                .await?
 253                .with_context(|| format!("no such project: {project_id}"))?;
 254
 255            // Update metadata.
 256            worktree::Entity::update(worktree::ActiveModel {
 257                id: ActiveValue::set(worktree_id),
 258                project_id: ActiveValue::set(project_id),
 259                root_name: ActiveValue::set(update.root_name.clone()),
 260                scan_id: ActiveValue::set(update.scan_id as i64),
 261                completed_scan_id: if update.is_last_update {
 262                    ActiveValue::set(update.scan_id as i64)
 263                } else {
 264                    ActiveValue::default()
 265                },
 266                abs_path: ActiveValue::set(update.abs_path.clone()),
 267                ..Default::default()
 268            })
 269            .exec(&*tx)
 270            .await?;
 271
 272            if !update.updated_entries.is_empty() {
 273                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 274                    let mtime = entry.mtime.clone().unwrap_or_default();
 275                    worktree_entry::ActiveModel {
 276                        project_id: ActiveValue::set(project_id),
 277                        worktree_id: ActiveValue::set(worktree_id),
 278                        id: ActiveValue::set(entry.id as i64),
 279                        is_dir: ActiveValue::set(entry.is_dir),
 280                        path: ActiveValue::set(entry.path.clone()),
 281                        inode: ActiveValue::set(entry.inode as i64),
 282                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 283                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 284                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 285                        is_ignored: ActiveValue::set(entry.is_ignored),
 286                        git_status: ActiveValue::set(None),
 287                        is_external: ActiveValue::set(entry.is_external),
 288                        is_deleted: ActiveValue::set(false),
 289                        is_hidden: ActiveValue::set(entry.is_hidden),
 290                        scan_id: ActiveValue::set(update.scan_id as i64),
 291                        is_fifo: ActiveValue::set(entry.is_fifo),
 292                    }
 293                }))
 294                .on_conflict(
 295                    OnConflict::columns([
 296                        worktree_entry::Column::ProjectId,
 297                        worktree_entry::Column::WorktreeId,
 298                        worktree_entry::Column::Id,
 299                    ])
 300                    .update_columns([
 301                        worktree_entry::Column::IsDir,
 302                        worktree_entry::Column::Path,
 303                        worktree_entry::Column::Inode,
 304                        worktree_entry::Column::MtimeSeconds,
 305                        worktree_entry::Column::MtimeNanos,
 306                        worktree_entry::Column::CanonicalPath,
 307                        worktree_entry::Column::IsIgnored,
 308                        worktree_entry::Column::IsHidden,
 309                        worktree_entry::Column::ScanId,
 310                    ])
 311                    .to_owned(),
 312                )
 313                .exec(&*tx)
 314                .await?;
 315            }
 316
 317            if !update.removed_entries.is_empty() {
 318                worktree_entry::Entity::update_many()
 319                    .filter(
 320                        worktree_entry::Column::ProjectId
 321                            .eq(project_id)
 322                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 323                            .and(
 324                                worktree_entry::Column::Id
 325                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 326                            ),
 327                    )
 328                    .set(worktree_entry::ActiveModel {
 329                        is_deleted: ActiveValue::Set(true),
 330                        scan_id: ActiveValue::Set(update.scan_id as i64),
 331                        ..Default::default()
 332                    })
 333                    .exec(&*tx)
 334                    .await?;
 335            }
 336
 337            // Backward-compatibility for old Zed clients.
 338            //
 339            // Remove this block when Zed 1.80 stable has been out for a week.
 340            {
 341                if !update.updated_repositories.is_empty() {
 342                    project_repository::Entity::insert_many(
 343                        update.updated_repositories.iter().map(|repository| {
 344                            project_repository::ActiveModel {
 345                                project_id: ActiveValue::set(project_id),
 346                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 347                                id: ActiveValue::set(repository.repository_id as i64),
 348                                scan_id: ActiveValue::set(update.scan_id as i64),
 349                                is_deleted: ActiveValue::set(false),
 350                                branch_summary: ActiveValue::Set(
 351                                    repository
 352                                        .branch_summary
 353                                        .as_ref()
 354                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 355                                ),
 356                                current_merge_conflicts: ActiveValue::Set(Some(
 357                                    serde_json::to_string(&repository.current_merge_conflicts)
 358                                        .unwrap(),
 359                                )),
 360                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 361                                abs_path: ActiveValue::set(String::new()),
 362                                entry_ids: ActiveValue::set("[]".into()),
 363                                head_commit_details: ActiveValue::set(None),
 364                                merge_message: ActiveValue::set(None),
 365                            }
 366                        }),
 367                    )
 368                    .on_conflict(
 369                        OnConflict::columns([
 370                            project_repository::Column::ProjectId,
 371                            project_repository::Column::Id,
 372                        ])
 373                        .update_columns([
 374                            project_repository::Column::ScanId,
 375                            project_repository::Column::BranchSummary,
 376                            project_repository::Column::CurrentMergeConflicts,
 377                        ])
 378                        .to_owned(),
 379                    )
 380                    .exec(&*tx)
 381                    .await?;
 382
 383                    let has_any_statuses = update
 384                        .updated_repositories
 385                        .iter()
 386                        .any(|repository| !repository.updated_statuses.is_empty());
 387
 388                    if has_any_statuses {
 389                        project_repository_statuses::Entity::insert_many(
 390                            update.updated_repositories.iter().flat_map(
 391                                |repository: &proto::RepositoryEntry| {
 392                                    repository.updated_statuses.iter().map(|status_entry| {
 393                                        let (repo_path, status_kind, first_status, second_status) =
 394                                            proto_status_to_db(status_entry.clone());
 395                                        project_repository_statuses::ActiveModel {
 396                                            project_id: ActiveValue::set(project_id),
 397                                            repository_id: ActiveValue::set(
 398                                                repository.repository_id as i64,
 399                                            ),
 400                                            scan_id: ActiveValue::set(update.scan_id as i64),
 401                                            is_deleted: ActiveValue::set(false),
 402                                            repo_path: ActiveValue::set(repo_path),
 403                                            status: ActiveValue::set(0),
 404                                            status_kind: ActiveValue::set(status_kind),
 405                                            first_status: ActiveValue::set(first_status),
 406                                            second_status: ActiveValue::set(second_status),
 407                                        }
 408                                    })
 409                                },
 410                            ),
 411                        )
 412                        .on_conflict(
 413                            OnConflict::columns([
 414                                project_repository_statuses::Column::ProjectId,
 415                                project_repository_statuses::Column::RepositoryId,
 416                                project_repository_statuses::Column::RepoPath,
 417                            ])
 418                            .update_columns([
 419                                project_repository_statuses::Column::ScanId,
 420                                project_repository_statuses::Column::StatusKind,
 421                                project_repository_statuses::Column::FirstStatus,
 422                                project_repository_statuses::Column::SecondStatus,
 423                            ])
 424                            .to_owned(),
 425                        )
 426                        .exec(&*tx)
 427                        .await?;
 428                    }
 429
 430                    for repo in &update.updated_repositories {
 431                        if !repo.removed_statuses.is_empty() {
 432                            project_repository_statuses::Entity::update_many()
 433                                .filter(
 434                                    project_repository_statuses::Column::ProjectId
 435                                        .eq(project_id)
 436                                        .and(
 437                                            project_repository_statuses::Column::RepositoryId
 438                                                .eq(repo.repository_id),
 439                                        )
 440                                        .and(
 441                                            project_repository_statuses::Column::RepoPath
 442                                                .is_in(repo.removed_statuses.iter()),
 443                                        ),
 444                                )
 445                                .set(project_repository_statuses::ActiveModel {
 446                                    is_deleted: ActiveValue::Set(true),
 447                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 448                                    ..Default::default()
 449                                })
 450                                .exec(&*tx)
 451                                .await?;
 452                        }
 453                    }
 454                }
 455
 456                if !update.removed_repositories.is_empty() {
 457                    project_repository::Entity::update_many()
 458                        .filter(
 459                            project_repository::Column::ProjectId
 460                                .eq(project_id)
 461                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 462                                .and(project_repository::Column::Id.is_in(
 463                                    update.removed_repositories.iter().map(|id| *id as i64),
 464                                )),
 465                        )
 466                        .set(project_repository::ActiveModel {
 467                            is_deleted: ActiveValue::Set(true),
 468                            scan_id: ActiveValue::Set(update.scan_id as i64),
 469                            ..Default::default()
 470                        })
 471                        .exec(&*tx)
 472                        .await?;
 473                }
 474            }
 475
 476            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 477            Ok(connection_ids)
 478        })
 479        .await
 480    }
 481
 482    pub async fn update_repository(
 483        &self,
 484        update: &proto::UpdateRepository,
 485        _connection: ConnectionId,
 486    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 487        let project_id = ProjectId::from_proto(update.project_id);
 488        let repository_id = update.id as i64;
 489        self.project_transaction(project_id, |tx| async move {
 490            project_repository::Entity::insert(project_repository::ActiveModel {
 491                project_id: ActiveValue::set(project_id),
 492                id: ActiveValue::set(repository_id),
 493                legacy_worktree_id: ActiveValue::set(None),
 494                abs_path: ActiveValue::set(update.abs_path.clone()),
 495                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 496                scan_id: ActiveValue::set(update.scan_id as i64),
 497                is_deleted: ActiveValue::set(false),
 498                branch_summary: ActiveValue::Set(
 499                    update
 500                        .branch_summary
 501                        .as_ref()
 502                        .map(|summary| serde_json::to_string(summary).unwrap()),
 503                ),
 504                head_commit_details: ActiveValue::Set(
 505                    update
 506                        .head_commit_details
 507                        .as_ref()
 508                        .map(|details| serde_json::to_string(details).unwrap()),
 509                ),
 510                current_merge_conflicts: ActiveValue::Set(Some(
 511                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 512                )),
 513                merge_message: ActiveValue::set(update.merge_message.clone()),
 514            })
 515            .on_conflict(
 516                OnConflict::columns([
 517                    project_repository::Column::ProjectId,
 518                    project_repository::Column::Id,
 519                ])
 520                .update_columns([
 521                    project_repository::Column::ScanId,
 522                    project_repository::Column::BranchSummary,
 523                    project_repository::Column::EntryIds,
 524                    project_repository::Column::AbsPath,
 525                    project_repository::Column::CurrentMergeConflicts,
 526                    project_repository::Column::HeadCommitDetails,
 527                    project_repository::Column::MergeMessage,
 528                ])
 529                .to_owned(),
 530            )
 531            .exec(&*tx)
 532            .await?;
 533
 534            let has_any_statuses = !update.updated_statuses.is_empty();
 535
 536            if has_any_statuses {
 537                project_repository_statuses::Entity::insert_many(
 538                    update.updated_statuses.iter().map(|status_entry| {
 539                        let (repo_path, status_kind, first_status, second_status) =
 540                            proto_status_to_db(status_entry.clone());
 541                        project_repository_statuses::ActiveModel {
 542                            project_id: ActiveValue::set(project_id),
 543                            repository_id: ActiveValue::set(repository_id),
 544                            scan_id: ActiveValue::set(update.scan_id as i64),
 545                            is_deleted: ActiveValue::set(false),
 546                            repo_path: ActiveValue::set(repo_path),
 547                            status: ActiveValue::set(0),
 548                            status_kind: ActiveValue::set(status_kind),
 549                            first_status: ActiveValue::set(first_status),
 550                            second_status: ActiveValue::set(second_status),
 551                        }
 552                    }),
 553                )
 554                .on_conflict(
 555                    OnConflict::columns([
 556                        project_repository_statuses::Column::ProjectId,
 557                        project_repository_statuses::Column::RepositoryId,
 558                        project_repository_statuses::Column::RepoPath,
 559                    ])
 560                    .update_columns([
 561                        project_repository_statuses::Column::ScanId,
 562                        project_repository_statuses::Column::StatusKind,
 563                        project_repository_statuses::Column::FirstStatus,
 564                        project_repository_statuses::Column::SecondStatus,
 565                    ])
 566                    .to_owned(),
 567                )
 568                .exec(&*tx)
 569                .await?;
 570            }
 571
 572            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 573
 574            if has_any_removed_statuses {
 575                project_repository_statuses::Entity::update_many()
 576                    .filter(
 577                        project_repository_statuses::Column::ProjectId
 578                            .eq(project_id)
 579                            .and(
 580                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 581                            )
 582                            .and(
 583                                project_repository_statuses::Column::RepoPath
 584                                    .is_in(update.removed_statuses.iter()),
 585                            ),
 586                    )
 587                    .set(project_repository_statuses::ActiveModel {
 588                        is_deleted: ActiveValue::Set(true),
 589                        scan_id: ActiveValue::Set(update.scan_id as i64),
 590                        ..Default::default()
 591                    })
 592                    .exec(&*tx)
 593                    .await?;
 594            }
 595
 596            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 597            Ok(connection_ids)
 598        })
 599        .await
 600    }
 601
 602    pub async fn remove_repository(
 603        &self,
 604        remove: &proto::RemoveRepository,
 605        _connection: ConnectionId,
 606    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 607        let project_id = ProjectId::from_proto(remove.project_id);
 608        let repository_id = remove.id as i64;
 609        self.project_transaction(project_id, |tx| async move {
 610            project_repository::Entity::update_many()
 611                .filter(
 612                    project_repository::Column::ProjectId
 613                        .eq(project_id)
 614                        .and(project_repository::Column::Id.eq(repository_id)),
 615                )
 616                .set(project_repository::ActiveModel {
 617                    is_deleted: ActiveValue::Set(true),
 618                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 619                    ..Default::default()
 620                })
 621                .exec(&*tx)
 622                .await?;
 623
 624            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 625            Ok(connection_ids)
 626        })
 627        .await
 628    }
 629
 630    /// Updates the diagnostic summary for the given connection.
 631    pub async fn update_diagnostic_summary(
 632        &self,
 633        update: &proto::UpdateDiagnosticSummary,
 634        connection: ConnectionId,
 635    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 636        let project_id = ProjectId::from_proto(update.project_id);
 637        let worktree_id = update.worktree_id as i64;
 638        self.project_transaction(project_id, |tx| async move {
 639            let summary = update.summary.as_ref().context("invalid summary")?;
 640
 641            // Ensure the update comes from the host.
 642            let project = project::Entity::find_by_id(project_id)
 643                .one(&*tx)
 644                .await?
 645                .context("no such project")?;
 646            if project.host_connection()? != connection {
 647                return Err(anyhow!("can't update a project hosted by someone else"))?;
 648            }
 649
 650            // Update summary.
 651            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 652                project_id: ActiveValue::set(project_id),
 653                worktree_id: ActiveValue::set(worktree_id),
 654                path: ActiveValue::set(summary.path.clone()),
 655                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 656                error_count: ActiveValue::set(summary.error_count as i32),
 657                warning_count: ActiveValue::set(summary.warning_count as i32),
 658            })
 659            .on_conflict(
 660                OnConflict::columns([
 661                    worktree_diagnostic_summary::Column::ProjectId,
 662                    worktree_diagnostic_summary::Column::WorktreeId,
 663                    worktree_diagnostic_summary::Column::Path,
 664                ])
 665                .update_columns([
 666                    worktree_diagnostic_summary::Column::LanguageServerId,
 667                    worktree_diagnostic_summary::Column::ErrorCount,
 668                    worktree_diagnostic_summary::Column::WarningCount,
 669                ])
 670                .to_owned(),
 671            )
 672            .exec(&*tx)
 673            .await?;
 674
 675            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 676            Ok(connection_ids)
 677        })
 678        .await
 679    }
 680
 681    /// Starts the language server for the given connection.
 682    pub async fn start_language_server(
 683        &self,
 684        update: &proto::StartLanguageServer,
 685        connection: ConnectionId,
 686    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 687        let project_id = ProjectId::from_proto(update.project_id);
 688        self.project_transaction(project_id, |tx| async move {
 689            let server = update.server.as_ref().context("invalid language server")?;
 690
 691            // Ensure the update comes from the host.
 692            let project = project::Entity::find_by_id(project_id)
 693                .one(&*tx)
 694                .await?
 695                .context("no such project")?;
 696            if project.host_connection()? != connection {
 697                return Err(anyhow!("can't update a project hosted by someone else"))?;
 698            }
 699
 700            // Add the newly-started language server.
 701            language_server::Entity::insert(language_server::ActiveModel {
 702                project_id: ActiveValue::set(project_id),
 703                id: ActiveValue::set(server.id as i64),
 704                name: ActiveValue::set(server.name.clone()),
 705                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 706                capabilities: ActiveValue::set(update.capabilities.clone()),
 707            })
 708            .on_conflict(
 709                OnConflict::columns([
 710                    language_server::Column::ProjectId,
 711                    language_server::Column::Id,
 712                ])
 713                .update_columns([
 714                    language_server::Column::Name,
 715                    language_server::Column::Capabilities,
 716                    language_server::Column::WorktreeId,
 717                ])
 718                .to_owned(),
 719            )
 720            .exec(&*tx)
 721            .await?;
 722
 723            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 724            Ok(connection_ids)
 725        })
 726        .await
 727    }
 728
 729    /// Updates the worktree settings for the given connection.
 730    pub async fn update_worktree_settings(
 731        &self,
 732        update: &proto::UpdateWorktreeSettings,
 733        connection: ConnectionId,
 734    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 735        let project_id = ProjectId::from_proto(update.project_id);
 736        let kind = match update.kind {
 737            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 738                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 739            None => proto::LocalSettingsKind::Settings,
 740        };
 741        let kind = LocalSettingsKind::from_proto(kind);
 742        self.project_transaction(project_id, |tx| async move {
 743            // Ensure the update comes from the host.
 744            let project = project::Entity::find_by_id(project_id)
 745                .one(&*tx)
 746                .await?
 747                .context("no such project")?;
 748            if project.host_connection()? != connection {
 749                return Err(anyhow!("can't update a project hosted by someone else"))?;
 750            }
 751
 752            if let Some(content) = &update.content {
 753                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 754                    project_id: ActiveValue::Set(project_id),
 755                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 756                    path: ActiveValue::Set(update.path.clone()),
 757                    content: ActiveValue::Set(content.clone()),
 758                    kind: ActiveValue::Set(kind),
 759                })
 760                .on_conflict(
 761                    OnConflict::columns([
 762                        worktree_settings_file::Column::ProjectId,
 763                        worktree_settings_file::Column::WorktreeId,
 764                        worktree_settings_file::Column::Path,
 765                    ])
 766                    .update_column(worktree_settings_file::Column::Content)
 767                    .to_owned(),
 768                )
 769                .exec(&*tx)
 770                .await?;
 771            } else {
 772                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 773                    project_id: ActiveValue::Set(project_id),
 774                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 775                    path: ActiveValue::Set(update.path.clone()),
 776                    ..Default::default()
 777                })
 778                .exec(&*tx)
 779                .await?;
 780            }
 781
 782            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 783            Ok(connection_ids)
 784        })
 785        .await
 786    }
 787
 788    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 789        self.transaction(|tx| async move {
 790            Ok(project::Entity::find_by_id(id)
 791                .one(&*tx)
 792                .await?
 793                .context("no such project")?)
 794        })
 795        .await
 796    }
 797
 798    /// Adds the given connection to the specified project
 799    /// in the current room.
 800    pub async fn join_project(
 801        &self,
 802        project_id: ProjectId,
 803        connection: ConnectionId,
 804        user_id: UserId,
 805        committer_name: Option<String>,
 806        committer_email: Option<String>,
 807    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 808        self.project_transaction(project_id, move |tx| {
 809            let committer_name = committer_name.clone();
 810            let committer_email = committer_email.clone();
 811            async move {
 812                let (project, role) = self
 813                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 814                    .await?;
 815                self.join_project_internal(
 816                    project,
 817                    user_id,
 818                    committer_name,
 819                    committer_email,
 820                    connection,
 821                    role,
 822                    &tx,
 823                )
 824                .await
 825            }
 826        })
 827        .await
 828    }
 829
 830    async fn join_project_internal(
 831        &self,
 832        project: project::Model,
 833        user_id: UserId,
 834        committer_name: Option<String>,
 835        committer_email: Option<String>,
 836        connection: ConnectionId,
 837        role: ChannelRole,
 838        tx: &DatabaseTransaction,
 839    ) -> Result<(Project, ReplicaId)> {
 840        let mut collaborators = project
 841            .find_related(project_collaborator::Entity)
 842            .all(tx)
 843            .await?;
 844        let replica_ids = collaborators
 845            .iter()
 846            .map(|c| c.replica_id)
 847            .collect::<HashSet<_>>();
 848        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 849        while replica_ids.contains(&replica_id) {
 850            replica_id.0 += 1;
 851        }
 852        let new_collaborator = project_collaborator::ActiveModel {
 853            project_id: ActiveValue::set(project.id),
 854            connection_id: ActiveValue::set(connection.id as i32),
 855            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 856            user_id: ActiveValue::set(user_id),
 857            replica_id: ActiveValue::set(replica_id),
 858            is_host: ActiveValue::set(false),
 859            id: ActiveValue::NotSet,
 860            committer_name: ActiveValue::set(committer_name),
 861            committer_email: ActiveValue::set(committer_email),
 862        }
 863        .insert(tx)
 864        .await?;
 865        collaborators.push(new_collaborator);
 866
 867        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 868        let mut worktrees = db_worktrees
 869            .into_iter()
 870            .map(|db_worktree| {
 871                (
 872                    db_worktree.id as u64,
 873                    Worktree {
 874                        id: db_worktree.id as u64,
 875                        abs_path: db_worktree.abs_path,
 876                        root_name: db_worktree.root_name,
 877                        visible: db_worktree.visible,
 878                        entries: Default::default(),
 879                        diagnostic_summaries: Default::default(),
 880                        settings_files: Default::default(),
 881                        scan_id: db_worktree.scan_id as u64,
 882                        completed_scan_id: db_worktree.completed_scan_id as u64,
 883                        legacy_repository_entries: Default::default(),
 884                    },
 885                )
 886            })
 887            .collect::<BTreeMap<_, _>>();
 888
 889        // Populate worktree entries.
 890        {
 891            let mut db_entries = worktree_entry::Entity::find()
 892                .filter(
 893                    Condition::all()
 894                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 895                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 896                )
 897                .stream(tx)
 898                .await?;
 899            while let Some(db_entry) = db_entries.next().await {
 900                let db_entry = db_entry?;
 901                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 902                    worktree.entries.push(proto::Entry {
 903                        id: db_entry.id as u64,
 904                        is_dir: db_entry.is_dir,
 905                        path: db_entry.path,
 906                        inode: db_entry.inode as u64,
 907                        mtime: Some(proto::Timestamp {
 908                            seconds: db_entry.mtime_seconds as u64,
 909                            nanos: db_entry.mtime_nanos as u32,
 910                        }),
 911                        canonical_path: db_entry.canonical_path,
 912                        is_ignored: db_entry.is_ignored,
 913                        is_external: db_entry.is_external,
 914                        is_hidden: db_entry.is_hidden,
 915                        // This is only used in the summarization backlog, so if it's None,
 916                        // that just means we won't be able to detect when to resummarize
 917                        // based on total number of backlogged bytes - instead, we'd go
 918                        // on number of files only. That shouldn't be a huge deal in practice.
 919                        size: None,
 920                        is_fifo: db_entry.is_fifo,
 921                    });
 922                }
 923            }
 924        }
 925
 926        // Populate repository entries.
 927        let mut repositories = Vec::new();
 928        {
 929            let db_repository_entries = project_repository::Entity::find()
 930                .filter(
 931                    Condition::all()
 932                        .add(project_repository::Column::ProjectId.eq(project.id))
 933                        .add(project_repository::Column::IsDeleted.eq(false)),
 934                )
 935                .all(tx)
 936                .await?;
 937            for db_repository_entry in db_repository_entries {
 938                let mut repository_statuses = project_repository_statuses::Entity::find()
 939                    .filter(
 940                        Condition::all()
 941                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 942                            .add(
 943                                project_repository_statuses::Column::RepositoryId
 944                                    .eq(db_repository_entry.id),
 945                            )
 946                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 947                    )
 948                    .stream(tx)
 949                    .await?;
 950                let mut updated_statuses = Vec::new();
 951                while let Some(status_entry) = repository_statuses.next().await {
 952                    let status_entry = status_entry?;
 953                    updated_statuses.push(db_status_to_proto(status_entry)?);
 954                }
 955
 956                let current_merge_conflicts = db_repository_entry
 957                    .current_merge_conflicts
 958                    .as_ref()
 959                    .map(|conflicts| serde_json::from_str(conflicts))
 960                    .transpose()?
 961                    .unwrap_or_default();
 962
 963                let branch_summary = db_repository_entry
 964                    .branch_summary
 965                    .as_ref()
 966                    .map(|branch_summary| serde_json::from_str(branch_summary))
 967                    .transpose()?
 968                    .unwrap_or_default();
 969
 970                let head_commit_details = db_repository_entry
 971                    .head_commit_details
 972                    .as_ref()
 973                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 974                    .transpose()?
 975                    .unwrap_or_default();
 976
 977                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 978                    .context("failed to deserialize repository's entry ids")?;
 979
 980                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 981                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 982                        worktree.legacy_repository_entries.insert(
 983                            db_repository_entry.id as u64,
 984                            proto::RepositoryEntry {
 985                                repository_id: db_repository_entry.id as u64,
 986                                updated_statuses,
 987                                removed_statuses: Vec::new(),
 988                                current_merge_conflicts,
 989                                branch_summary,
 990                            },
 991                        );
 992                    }
 993                } else {
 994                    repositories.push(proto::UpdateRepository {
 995                        project_id: db_repository_entry.project_id.0 as u64,
 996                        id: db_repository_entry.id as u64,
 997                        abs_path: db_repository_entry.abs_path,
 998                        entry_ids,
 999                        updated_statuses,
1000                        removed_statuses: Vec::new(),
1001                        current_merge_conflicts,
1002                        branch_summary,
1003                        head_commit_details,
1004                        scan_id: db_repository_entry.scan_id as u64,
1005                        is_last_update: true,
1006                        merge_message: db_repository_entry.merge_message,
1007                        stash_entries: Vec::new(),
1008                    });
1009                }
1010            }
1011        }
1012
1013        // Populate worktree diagnostic summaries.
1014        {
1015            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1016                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1017                .stream(tx)
1018                .await?;
1019            while let Some(db_summary) = db_summaries.next().await {
1020                let db_summary = db_summary?;
1021                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1022                    worktree
1023                        .diagnostic_summaries
1024                        .push(proto::DiagnosticSummary {
1025                            path: db_summary.path,
1026                            language_server_id: db_summary.language_server_id as u64,
1027                            error_count: db_summary.error_count as u32,
1028                            warning_count: db_summary.warning_count as u32,
1029                        });
1030                }
1031            }
1032        }
1033
1034        // Populate worktree settings files
1035        {
1036            let mut db_settings_files = worktree_settings_file::Entity::find()
1037                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1038                .stream(tx)
1039                .await?;
1040            while let Some(db_settings_file) = db_settings_files.next().await {
1041                let db_settings_file = db_settings_file?;
1042                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1043                    worktree.settings_files.push(WorktreeSettingsFile {
1044                        path: db_settings_file.path,
1045                        content: db_settings_file.content,
1046                        kind: db_settings_file.kind,
1047                    });
1048                }
1049            }
1050        }
1051
1052        // Populate language servers.
1053        let language_servers = project
1054            .find_related(language_server::Entity)
1055            .all(tx)
1056            .await?;
1057
1058        let path_style = if project.windows_paths {
1059            PathStyle::Windows
1060        } else {
1061            PathStyle::Posix
1062        };
1063
1064        let project = Project {
1065            id: project.id,
1066            role,
1067            collaborators: collaborators
1068                .into_iter()
1069                .map(|collaborator| ProjectCollaborator {
1070                    connection_id: collaborator.connection(),
1071                    user_id: collaborator.user_id,
1072                    replica_id: collaborator.replica_id,
1073                    is_host: collaborator.is_host,
1074                    committer_name: collaborator.committer_name,
1075                    committer_email: collaborator.committer_email,
1076                })
1077                .collect(),
1078            worktrees,
1079            repositories,
1080            language_servers: language_servers
1081                .into_iter()
1082                .map(|language_server| LanguageServer {
1083                    server: proto::LanguageServer {
1084                        id: language_server.id as u64,
1085                        name: language_server.name,
1086                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1087                    },
1088                    capabilities: language_server.capabilities,
1089                })
1090                .collect(),
1091            path_style,
1092        };
1093        Ok((project, replica_id as ReplicaId))
1094    }
1095
1096    /// Removes the given connection from the specified project.
1097    pub async fn leave_project(
1098        &self,
1099        project_id: ProjectId,
1100        connection: ConnectionId,
1101    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1102        self.project_transaction(project_id, |tx| async move {
1103            let result = project_collaborator::Entity::delete_many()
1104                .filter(
1105                    Condition::all()
1106                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1107                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1108                        .add(
1109                            project_collaborator::Column::ConnectionServerId
1110                                .eq(connection.owner_id as i32),
1111                        ),
1112                )
1113                .exec(&*tx)
1114                .await?;
1115            if result.rows_affected == 0 {
1116                Err(anyhow!("not a collaborator on this project"))?;
1117            }
1118
1119            let project = project::Entity::find_by_id(project_id)
1120                .one(&*tx)
1121                .await?
1122                .context("no such project")?;
1123            let collaborators = project
1124                .find_related(project_collaborator::Entity)
1125                .all(&*tx)
1126                .await?;
1127            let connection_ids: Vec<ConnectionId> = collaborators
1128                .into_iter()
1129                .map(|collaborator| collaborator.connection())
1130                .collect();
1131
1132            follower::Entity::delete_many()
1133                .filter(
1134                    Condition::any()
1135                        .add(
1136                            Condition::all()
1137                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1138                                .add(
1139                                    follower::Column::LeaderConnectionServerId
1140                                        .eq(connection.owner_id),
1141                                )
1142                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1143                        )
1144                        .add(
1145                            Condition::all()
1146                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1147                                .add(
1148                                    follower::Column::FollowerConnectionServerId
1149                                        .eq(connection.owner_id),
1150                                )
1151                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1152                        ),
1153                )
1154                .exec(&*tx)
1155                .await?;
1156
1157            let room = if let Some(room_id) = project.room_id {
1158                Some(self.get_room(room_id, &tx).await?)
1159            } else {
1160                None
1161            };
1162
1163            let left_project = LeftProject {
1164                id: project_id,
1165                should_unshare: connection == project.host_connection()?,
1166                connection_ids,
1167            };
1168            Ok((room, left_project))
1169        })
1170        .await
1171    }
1172
1173    pub async fn check_user_is_project_host(
1174        &self,
1175        project_id: ProjectId,
1176        connection_id: ConnectionId,
1177    ) -> Result<()> {
1178        self.project_transaction(project_id, |tx| async move {
1179            project::Entity::find()
1180                .filter(
1181                    Condition::all()
1182                        .add(project::Column::Id.eq(project_id))
1183                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1184                        .add(
1185                            project::Column::HostConnectionServerId
1186                                .eq(Some(connection_id.owner_id as i32)),
1187                        ),
1188                )
1189                .one(&*tx)
1190                .await?
1191                .context("failed to read project host")?;
1192
1193            Ok(())
1194        })
1195        .await
1196        .map(|guard| guard.into_inner())
1197    }
1198
1199    /// Returns the current project if the given user is authorized to access it with the specified capability.
1200    pub async fn access_project(
1201        &self,
1202        project_id: ProjectId,
1203        connection_id: ConnectionId,
1204        capability: Capability,
1205        tx: &DatabaseTransaction,
1206    ) -> Result<(project::Model, ChannelRole)> {
1207        let project = project::Entity::find_by_id(project_id)
1208            .one(tx)
1209            .await?
1210            .context("no such project")?;
1211
1212        let role_from_room = if let Some(room_id) = project.room_id {
1213            room_participant::Entity::find()
1214                .filter(room_participant::Column::RoomId.eq(room_id))
1215                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1216                .one(tx)
1217                .await?
1218                .and_then(|participant| participant.role)
1219        } else {
1220            None
1221        };
1222
1223        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1224
1225        match capability {
1226            Capability::ReadWrite => {
1227                if !role.can_edit_projects() {
1228                    return Err(anyhow!("not authorized to edit projects"))?;
1229                }
1230            }
1231            Capability::ReadOnly => {
1232                if !role.can_read_projects() {
1233                    return Err(anyhow!("not authorized to read projects"))?;
1234                }
1235            }
1236        }
1237
1238        Ok((project, role))
1239    }
1240
1241    /// Returns the host connection for a read-only request to join a shared project.
1242    pub async fn host_for_read_only_project_request(
1243        &self,
1244        project_id: ProjectId,
1245        connection_id: ConnectionId,
1246    ) -> Result<ConnectionId> {
1247        self.project_transaction(project_id, |tx| async move {
1248            let (project, _) = self
1249                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1250                .await?;
1251            project.host_connection()
1252        })
1253        .await
1254        .map(|guard| guard.into_inner())
1255    }
1256
1257    /// Returns the host connection for a request to join a shared project.
1258    pub async fn host_for_mutating_project_request(
1259        &self,
1260        project_id: ProjectId,
1261        connection_id: ConnectionId,
1262    ) -> Result<ConnectionId> {
1263        self.project_transaction(project_id, |tx| async move {
1264            let (project, _) = self
1265                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1266                .await?;
1267            project.host_connection()
1268        })
1269        .await
1270        .map(|guard| guard.into_inner())
1271    }
1272
1273    pub async fn connections_for_buffer_update(
1274        &self,
1275        project_id: ProjectId,
1276        connection_id: ConnectionId,
1277        capability: Capability,
1278    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1279        self.project_transaction(project_id, |tx| async move {
1280            // Authorize
1281            let (project, _) = self
1282                .access_project(project_id, connection_id, capability, &tx)
1283                .await?;
1284
1285            let host_connection_id = project.host_connection()?;
1286
1287            let collaborators = project_collaborator::Entity::find()
1288                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1289                .all(&*tx)
1290                .await?;
1291
1292            let guest_connection_ids = collaborators
1293                .into_iter()
1294                .filter_map(|collaborator| {
1295                    if collaborator.is_host {
1296                        None
1297                    } else {
1298                        Some(collaborator.connection())
1299                    }
1300                })
1301                .collect();
1302
1303            Ok((host_connection_id, guest_connection_ids))
1304        })
1305        .await
1306    }
1307
1308    /// Returns the connection IDs in the given project.
1309    ///
1310    /// The provided `connection_id` must also be a collaborator in the project,
1311    /// otherwise an error will be returned.
1312    pub async fn project_connection_ids(
1313        &self,
1314        project_id: ProjectId,
1315        connection_id: ConnectionId,
1316        exclude_dev_server: bool,
1317    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1318        self.project_transaction(project_id, |tx| async move {
1319            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1320                .await
1321        })
1322        .await
1323    }
1324
1325    async fn internal_project_connection_ids(
1326        &self,
1327        project_id: ProjectId,
1328        connection_id: ConnectionId,
1329        exclude_dev_server: bool,
1330        tx: &DatabaseTransaction,
1331    ) -> Result<HashSet<ConnectionId>> {
1332        let project = project::Entity::find_by_id(project_id)
1333            .one(tx)
1334            .await?
1335            .context("no such project")?;
1336
1337        let mut collaborators = project_collaborator::Entity::find()
1338            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1339            .stream(tx)
1340            .await?;
1341
1342        let mut connection_ids = HashSet::default();
1343        if let Some(host_connection) = project.host_connection().log_err()
1344            && !exclude_dev_server
1345        {
1346            connection_ids.insert(host_connection);
1347        }
1348
1349        while let Some(collaborator) = collaborators.next().await {
1350            let collaborator = collaborator?;
1351            connection_ids.insert(collaborator.connection());
1352        }
1353
1354        if connection_ids.contains(&connection_id)
1355            || Some(connection_id) == project.host_connection().ok()
1356        {
1357            Ok(connection_ids)
1358        } else {
1359            Err(anyhow!(
1360                "can only send project updates to a project you're in"
1361            ))?
1362        }
1363    }
1364
1365    async fn project_guest_connection_ids(
1366        &self,
1367        project_id: ProjectId,
1368        tx: &DatabaseTransaction,
1369    ) -> Result<Vec<ConnectionId>> {
1370        let mut collaborators = project_collaborator::Entity::find()
1371            .filter(
1372                project_collaborator::Column::ProjectId
1373                    .eq(project_id)
1374                    .and(project_collaborator::Column::IsHost.eq(false)),
1375            )
1376            .stream(tx)
1377            .await?;
1378
1379        let mut guest_connection_ids = Vec::new();
1380        while let Some(collaborator) = collaborators.next().await {
1381            let collaborator = collaborator?;
1382            guest_connection_ids.push(collaborator.connection());
1383        }
1384        Ok(guest_connection_ids)
1385    }
1386
1387    /// Returns the [`RoomId`] for the given project.
1388    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1389        self.transaction(|tx| async move {
1390            Ok(project::Entity::find_by_id(project_id)
1391                .one(&*tx)
1392                .await?
1393                .and_then(|project| project.room_id))
1394        })
1395        .await
1396    }
1397
1398    pub async fn check_room_participants(
1399        &self,
1400        room_id: RoomId,
1401        leader_id: ConnectionId,
1402        follower_id: ConnectionId,
1403    ) -> Result<()> {
1404        self.transaction(|tx| async move {
1405            use room_participant::Column;
1406
1407            let count = room_participant::Entity::find()
1408                .filter(
1409                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1410                        Condition::any()
1411                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1412                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1413                            ))
1414                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1415                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1416                            )),
1417                    ),
1418                )
1419                .count(&*tx)
1420                .await?;
1421
1422            if count < 2 {
1423                Err(anyhow!("not room participants"))?;
1424            }
1425
1426            Ok(())
1427        })
1428        .await
1429    }
1430
1431    /// Adds the given follower connection as a follower of the given leader connection.
1432    pub async fn follow(
1433        &self,
1434        room_id: RoomId,
1435        project_id: ProjectId,
1436        leader_connection: ConnectionId,
1437        follower_connection: ConnectionId,
1438    ) -> Result<TransactionGuard<proto::Room>> {
1439        self.room_transaction(room_id, |tx| async move {
1440            follower::ActiveModel {
1441                room_id: ActiveValue::set(room_id),
1442                project_id: ActiveValue::set(project_id),
1443                leader_connection_server_id: ActiveValue::set(ServerId(
1444                    leader_connection.owner_id as i32,
1445                )),
1446                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1447                follower_connection_server_id: ActiveValue::set(ServerId(
1448                    follower_connection.owner_id as i32,
1449                )),
1450                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1451                ..Default::default()
1452            }
1453            .insert(&*tx)
1454            .await?;
1455
1456            let room = self.get_room(room_id, &tx).await?;
1457            Ok(room)
1458        })
1459        .await
1460    }
1461
1462    /// Removes the given follower connection as a follower of the given leader connection.
1463    pub async fn unfollow(
1464        &self,
1465        room_id: RoomId,
1466        project_id: ProjectId,
1467        leader_connection: ConnectionId,
1468        follower_connection: ConnectionId,
1469    ) -> Result<TransactionGuard<proto::Room>> {
1470        self.room_transaction(room_id, |tx| async move {
1471            follower::Entity::delete_many()
1472                .filter(
1473                    Condition::all()
1474                        .add(follower::Column::RoomId.eq(room_id))
1475                        .add(follower::Column::ProjectId.eq(project_id))
1476                        .add(
1477                            follower::Column::LeaderConnectionServerId
1478                                .eq(leader_connection.owner_id),
1479                        )
1480                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1481                        .add(
1482                            follower::Column::FollowerConnectionServerId
1483                                .eq(follower_connection.owner_id),
1484                        )
1485                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1486                )
1487                .exec(&*tx)
1488                .await?;
1489
1490            let room = self.get_room(room_id, &tx).await?;
1491            Ok(room)
1492        })
1493        .await
1494    }
1495}