projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project {
  95                clock::ReplicaId::REMOTE_SERVER
  96            } else {
  97                clock::ReplicaId::LOCAL
  98            };
  99
 100            project_collaborator::ActiveModel {
 101                project_id: ActiveValue::set(project.id),
 102                connection_id: ActiveValue::set(connection.id as i32),
 103                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 104                user_id: ActiveValue::set(participant.user_id),
 105                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 106                is_host: ActiveValue::set(true),
 107                id: ActiveValue::NotSet,
 108                committer_name: ActiveValue::Set(None),
 109                committer_email: ActiveValue::Set(None),
 110            }
 111            .insert(&*tx)
 112            .await?;
 113
 114            let room = self.get_room(room_id, &tx).await?;
 115            Ok((project.id, room))
 116        })
 117        .await
 118    }
 119
 120    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 121        self.transaction(|tx| async move {
 122            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 123            Ok(())
 124        })
 125        .await
 126    }
 127
 128    /// Unshares the given project.
 129    pub async fn unshare_project(
 130        &self,
 131        project_id: ProjectId,
 132        connection: ConnectionId,
 133    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 134        self.project_transaction(project_id, |tx| async move {
 135            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 136            let project = project::Entity::find_by_id(project_id)
 137                .one(&*tx)
 138                .await?
 139                .context("project not found")?;
 140            let room = if let Some(room_id) = project.room_id {
 141                Some(self.get_room(room_id, &tx).await?)
 142            } else {
 143                None
 144            };
 145            if project.host_connection()? == connection {
 146                return Ok((true, room, guest_connection_ids));
 147            }
 148            Err(anyhow!("cannot unshare a project hosted by another user"))?
 149        })
 150        .await
 151    }
 152
 153    /// Updates the worktrees associated with the given project.
 154    pub async fn update_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        worktrees: &[proto::WorktreeMetadata],
 159    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let project = project::Entity::find_by_id(project_id)
 162                .filter(
 163                    Condition::all()
 164                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 165                        .add(
 166                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 167                        ),
 168                )
 169                .one(&*tx)
 170                .await?
 171                .context("no such project")?;
 172
 173            self.update_project_worktrees(project.id, worktrees, &tx)
 174                .await?;
 175
 176            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 177
 178            let room = if let Some(room_id) = project.room_id {
 179                Some(self.get_room(room_id, &tx).await?)
 180            } else {
 181                None
 182            };
 183
 184            Ok((room, guest_connection_ids))
 185        })
 186        .await
 187    }
 188
 189    pub(in crate::db) async fn update_project_worktrees(
 190        &self,
 191        project_id: ProjectId,
 192        worktrees: &[proto::WorktreeMetadata],
 193        tx: &DatabaseTransaction,
 194    ) -> Result<()> {
 195        if !worktrees.is_empty() {
 196            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 197                id: ActiveValue::set(worktree.id as i64),
 198                project_id: ActiveValue::set(project_id),
 199                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 200                root_name: ActiveValue::set(worktree.root_name.clone()),
 201                visible: ActiveValue::set(worktree.visible),
 202                scan_id: ActiveValue::set(0),
 203                completed_scan_id: ActiveValue::set(0),
 204            }))
 205            .on_conflict(
 206                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 207                    .update_column(worktree::Column::RootName)
 208                    .to_owned(),
 209            )
 210            .exec(tx)
 211            .await?;
 212        }
 213
 214        worktree::Entity::delete_many()
 215            .filter(worktree::Column::ProjectId.eq(project_id).and(
 216                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 217            ))
 218            .exec(tx)
 219            .await?;
 220
 221        Ok(())
 222    }
 223
 224    pub async fn update_worktree(
 225        &self,
 226        update: &proto::UpdateWorktree,
 227        connection: ConnectionId,
 228    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 229        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 230            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 231        {
 232            return Err(anyhow!(
 233                "invalid worktree update. removed entries: {}, updated entries: {}",
 234                update.removed_entries.len(),
 235                update.updated_entries.len()
 236            ))?;
 237        }
 238
 239        let project_id = ProjectId::from_proto(update.project_id);
 240        let worktree_id = update.worktree_id as i64;
 241        self.project_transaction(project_id, |tx| async move {
 242            // Ensure the update comes from the host.
 243            let _project = project::Entity::find_by_id(project_id)
 244                .filter(
 245                    Condition::all()
 246                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 247                        .add(
 248                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 249                        ),
 250                )
 251                .one(&*tx)
 252                .await?
 253                .with_context(|| format!("no such project: {project_id}"))?;
 254
 255            // Update metadata.
 256            worktree::Entity::update(worktree::ActiveModel {
 257                id: ActiveValue::set(worktree_id),
 258                project_id: ActiveValue::set(project_id),
 259                root_name: ActiveValue::set(update.root_name.clone()),
 260                scan_id: ActiveValue::set(update.scan_id as i64),
 261                completed_scan_id: if update.is_last_update {
 262                    ActiveValue::set(update.scan_id as i64)
 263                } else {
 264                    ActiveValue::default()
 265                },
 266                abs_path: ActiveValue::set(update.abs_path.clone()),
 267                ..Default::default()
 268            })
 269            .exec(&*tx)
 270            .await?;
 271
 272            if !update.updated_entries.is_empty() {
 273                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 274                    let mtime = entry.mtime.clone().unwrap_or_default();
 275                    worktree_entry::ActiveModel {
 276                        project_id: ActiveValue::set(project_id),
 277                        worktree_id: ActiveValue::set(worktree_id),
 278                        id: ActiveValue::set(entry.id as i64),
 279                        is_dir: ActiveValue::set(entry.is_dir),
 280                        path: ActiveValue::set(entry.path.clone()),
 281                        inode: ActiveValue::set(entry.inode as i64),
 282                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 283                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 284                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 285                        is_ignored: ActiveValue::set(entry.is_ignored),
 286                        git_status: ActiveValue::set(None),
 287                        is_external: ActiveValue::set(entry.is_external),
 288                        is_deleted: ActiveValue::set(false),
 289                        is_hidden: ActiveValue::set(entry.is_hidden),
 290                        scan_id: ActiveValue::set(update.scan_id as i64),
 291                        is_fifo: ActiveValue::set(entry.is_fifo),
 292                    }
 293                }))
 294                .on_conflict(
 295                    OnConflict::columns([
 296                        worktree_entry::Column::ProjectId,
 297                        worktree_entry::Column::WorktreeId,
 298                        worktree_entry::Column::Id,
 299                    ])
 300                    .update_columns([
 301                        worktree_entry::Column::IsDir,
 302                        worktree_entry::Column::Path,
 303                        worktree_entry::Column::Inode,
 304                        worktree_entry::Column::MtimeSeconds,
 305                        worktree_entry::Column::MtimeNanos,
 306                        worktree_entry::Column::CanonicalPath,
 307                        worktree_entry::Column::IsIgnored,
 308                        worktree_entry::Column::IsHidden,
 309                        worktree_entry::Column::ScanId,
 310                    ])
 311                    .to_owned(),
 312                )
 313                .exec(&*tx)
 314                .await?;
 315            }
 316
 317            if !update.removed_entries.is_empty() {
 318                worktree_entry::Entity::update_many()
 319                    .filter(
 320                        worktree_entry::Column::ProjectId
 321                            .eq(project_id)
 322                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 323                            .and(
 324                                worktree_entry::Column::Id
 325                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 326                            ),
 327                    )
 328                    .set(worktree_entry::ActiveModel {
 329                        is_deleted: ActiveValue::Set(true),
 330                        scan_id: ActiveValue::Set(update.scan_id as i64),
 331                        ..Default::default()
 332                    })
 333                    .exec(&*tx)
 334                    .await?;
 335            }
 336
 337            // Backward-compatibility for old Zed clients.
 338            //
 339            // Remove this block when Zed 1.80 stable has been out for a week.
 340            {
 341                if !update.updated_repositories.is_empty() {
 342                    project_repository::Entity::insert_many(
 343                        update.updated_repositories.iter().map(|repository| {
 344                            project_repository::ActiveModel {
 345                                project_id: ActiveValue::set(project_id),
 346                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 347                                id: ActiveValue::set(repository.repository_id as i64),
 348                                scan_id: ActiveValue::set(update.scan_id as i64),
 349                                is_deleted: ActiveValue::set(false),
 350                                branch_summary: ActiveValue::Set(
 351                                    repository
 352                                        .branch_summary
 353                                        .as_ref()
 354                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 355                                ),
 356                                current_merge_conflicts: ActiveValue::Set(Some(
 357                                    serde_json::to_string(&repository.current_merge_conflicts)
 358                                        .unwrap(),
 359                                )),
 360                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 361                                abs_path: ActiveValue::set(String::new()),
 362                                entry_ids: ActiveValue::set("[]".into()),
 363                                head_commit_details: ActiveValue::set(None),
 364                                merge_message: ActiveValue::set(None),
 365                                remote_upstream_url: ActiveValue::set(None),
 366                                remote_origin_url: ActiveValue::set(None),
 367                            }
 368                        }),
 369                    )
 370                    .on_conflict(
 371                        OnConflict::columns([
 372                            project_repository::Column::ProjectId,
 373                            project_repository::Column::Id,
 374                        ])
 375                        .update_columns([
 376                            project_repository::Column::ScanId,
 377                            project_repository::Column::BranchSummary,
 378                            project_repository::Column::CurrentMergeConflicts,
 379                        ])
 380                        .to_owned(),
 381                    )
 382                    .exec(&*tx)
 383                    .await?;
 384
 385                    let has_any_statuses = update
 386                        .updated_repositories
 387                        .iter()
 388                        .any(|repository| !repository.updated_statuses.is_empty());
 389
 390                    if has_any_statuses {
 391                        project_repository_statuses::Entity::insert_many(
 392                            update.updated_repositories.iter().flat_map(
 393                                |repository: &proto::RepositoryEntry| {
 394                                    repository.updated_statuses.iter().map(|status_entry| {
 395                                        let (repo_path, status_kind, first_status, second_status) =
 396                                            proto_status_to_db(status_entry.clone());
 397                                        project_repository_statuses::ActiveModel {
 398                                            project_id: ActiveValue::set(project_id),
 399                                            repository_id: ActiveValue::set(
 400                                                repository.repository_id as i64,
 401                                            ),
 402                                            scan_id: ActiveValue::set(update.scan_id as i64),
 403                                            is_deleted: ActiveValue::set(false),
 404                                            repo_path: ActiveValue::set(repo_path),
 405                                            status: ActiveValue::set(0),
 406                                            status_kind: ActiveValue::set(status_kind),
 407                                            first_status: ActiveValue::set(first_status),
 408                                            second_status: ActiveValue::set(second_status),
 409                                        }
 410                                    })
 411                                },
 412                            ),
 413                        )
 414                        .on_conflict(
 415                            OnConflict::columns([
 416                                project_repository_statuses::Column::ProjectId,
 417                                project_repository_statuses::Column::RepositoryId,
 418                                project_repository_statuses::Column::RepoPath,
 419                            ])
 420                            .update_columns([
 421                                project_repository_statuses::Column::ScanId,
 422                                project_repository_statuses::Column::StatusKind,
 423                                project_repository_statuses::Column::FirstStatus,
 424                                project_repository_statuses::Column::SecondStatus,
 425                            ])
 426                            .to_owned(),
 427                        )
 428                        .exec(&*tx)
 429                        .await?;
 430                    }
 431
 432                    for repo in &update.updated_repositories {
 433                        if !repo.removed_statuses.is_empty() {
 434                            project_repository_statuses::Entity::update_many()
 435                                .filter(
 436                                    project_repository_statuses::Column::ProjectId
 437                                        .eq(project_id)
 438                                        .and(
 439                                            project_repository_statuses::Column::RepositoryId
 440                                                .eq(repo.repository_id),
 441                                        )
 442                                        .and(
 443                                            project_repository_statuses::Column::RepoPath
 444                                                .is_in(repo.removed_statuses.iter()),
 445                                        ),
 446                                )
 447                                .set(project_repository_statuses::ActiveModel {
 448                                    is_deleted: ActiveValue::Set(true),
 449                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 450                                    ..Default::default()
 451                                })
 452                                .exec(&*tx)
 453                                .await?;
 454                        }
 455                    }
 456                }
 457
 458                if !update.removed_repositories.is_empty() {
 459                    project_repository::Entity::update_many()
 460                        .filter(
 461                            project_repository::Column::ProjectId
 462                                .eq(project_id)
 463                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 464                                .and(project_repository::Column::Id.is_in(
 465                                    update.removed_repositories.iter().map(|id| *id as i64),
 466                                )),
 467                        )
 468                        .set(project_repository::ActiveModel {
 469                            is_deleted: ActiveValue::Set(true),
 470                            scan_id: ActiveValue::Set(update.scan_id as i64),
 471                            ..Default::default()
 472                        })
 473                        .exec(&*tx)
 474                        .await?;
 475                }
 476            }
 477
 478            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 479            Ok(connection_ids)
 480        })
 481        .await
 482    }
 483
 484    pub async fn update_repository(
 485        &self,
 486        update: &proto::UpdateRepository,
 487        _connection: ConnectionId,
 488    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 489        let project_id = ProjectId::from_proto(update.project_id);
 490        let repository_id = update.id as i64;
 491        self.project_transaction(project_id, |tx| async move {
 492            project_repository::Entity::insert(project_repository::ActiveModel {
 493                project_id: ActiveValue::set(project_id),
 494                id: ActiveValue::set(repository_id),
 495                legacy_worktree_id: ActiveValue::set(None),
 496                abs_path: ActiveValue::set(update.abs_path.clone()),
 497                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 498                scan_id: ActiveValue::set(update.scan_id as i64),
 499                is_deleted: ActiveValue::set(false),
 500                branch_summary: ActiveValue::Set(
 501                    update
 502                        .branch_summary
 503                        .as_ref()
 504                        .map(|summary| serde_json::to_string(summary).unwrap()),
 505                ),
 506                head_commit_details: ActiveValue::Set(
 507                    update
 508                        .head_commit_details
 509                        .as_ref()
 510                        .map(|details| serde_json::to_string(details).unwrap()),
 511                ),
 512                current_merge_conflicts: ActiveValue::Set(Some(
 513                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 514                )),
 515                merge_message: ActiveValue::set(update.merge_message.clone()),
 516                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 517                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 518            })
 519            .on_conflict(
 520                OnConflict::columns([
 521                    project_repository::Column::ProjectId,
 522                    project_repository::Column::Id,
 523                ])
 524                .update_columns([
 525                    project_repository::Column::ScanId,
 526                    project_repository::Column::BranchSummary,
 527                    project_repository::Column::EntryIds,
 528                    project_repository::Column::AbsPath,
 529                    project_repository::Column::CurrentMergeConflicts,
 530                    project_repository::Column::HeadCommitDetails,
 531                    project_repository::Column::MergeMessage,
 532                ])
 533                .to_owned(),
 534            )
 535            .exec(&*tx)
 536            .await?;
 537
 538            let has_any_statuses = !update.updated_statuses.is_empty();
 539
 540            if has_any_statuses {
 541                project_repository_statuses::Entity::insert_many(
 542                    update.updated_statuses.iter().map(|status_entry| {
 543                        let (repo_path, status_kind, first_status, second_status) =
 544                            proto_status_to_db(status_entry.clone());
 545                        project_repository_statuses::ActiveModel {
 546                            project_id: ActiveValue::set(project_id),
 547                            repository_id: ActiveValue::set(repository_id),
 548                            scan_id: ActiveValue::set(update.scan_id as i64),
 549                            is_deleted: ActiveValue::set(false),
 550                            repo_path: ActiveValue::set(repo_path),
 551                            status: ActiveValue::set(0),
 552                            status_kind: ActiveValue::set(status_kind),
 553                            first_status: ActiveValue::set(first_status),
 554                            second_status: ActiveValue::set(second_status),
 555                        }
 556                    }),
 557                )
 558                .on_conflict(
 559                    OnConflict::columns([
 560                        project_repository_statuses::Column::ProjectId,
 561                        project_repository_statuses::Column::RepositoryId,
 562                        project_repository_statuses::Column::RepoPath,
 563                    ])
 564                    .update_columns([
 565                        project_repository_statuses::Column::ScanId,
 566                        project_repository_statuses::Column::StatusKind,
 567                        project_repository_statuses::Column::FirstStatus,
 568                        project_repository_statuses::Column::SecondStatus,
 569                    ])
 570                    .to_owned(),
 571                )
 572                .exec(&*tx)
 573                .await?;
 574            }
 575
 576            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 577
 578            if has_any_removed_statuses {
 579                project_repository_statuses::Entity::update_many()
 580                    .filter(
 581                        project_repository_statuses::Column::ProjectId
 582                            .eq(project_id)
 583                            .and(
 584                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 585                            )
 586                            .and(
 587                                project_repository_statuses::Column::RepoPath
 588                                    .is_in(update.removed_statuses.iter()),
 589                            ),
 590                    )
 591                    .set(project_repository_statuses::ActiveModel {
 592                        is_deleted: ActiveValue::Set(true),
 593                        scan_id: ActiveValue::Set(update.scan_id as i64),
 594                        ..Default::default()
 595                    })
 596                    .exec(&*tx)
 597                    .await?;
 598            }
 599
 600            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 601            Ok(connection_ids)
 602        })
 603        .await
 604    }
 605
 606    pub async fn remove_repository(
 607        &self,
 608        remove: &proto::RemoveRepository,
 609        _connection: ConnectionId,
 610    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 611        let project_id = ProjectId::from_proto(remove.project_id);
 612        let repository_id = remove.id as i64;
 613        self.project_transaction(project_id, |tx| async move {
 614            project_repository::Entity::update_many()
 615                .filter(
 616                    project_repository::Column::ProjectId
 617                        .eq(project_id)
 618                        .and(project_repository::Column::Id.eq(repository_id)),
 619                )
 620                .set(project_repository::ActiveModel {
 621                    is_deleted: ActiveValue::Set(true),
 622                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 623                    ..Default::default()
 624                })
 625                .exec(&*tx)
 626                .await?;
 627
 628            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 629            Ok(connection_ids)
 630        })
 631        .await
 632    }
 633
 634    /// Updates the diagnostic summary for the given connection.
 635    pub async fn update_diagnostic_summary(
 636        &self,
 637        update: &proto::UpdateDiagnosticSummary,
 638        connection: ConnectionId,
 639    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 640        let project_id = ProjectId::from_proto(update.project_id);
 641        let worktree_id = update.worktree_id as i64;
 642        self.project_transaction(project_id, |tx| async move {
 643            let summary = update.summary.as_ref().context("invalid summary")?;
 644
 645            // Ensure the update comes from the host.
 646            let project = project::Entity::find_by_id(project_id)
 647                .one(&*tx)
 648                .await?
 649                .context("no such project")?;
 650            if project.host_connection()? != connection {
 651                return Err(anyhow!("can't update a project hosted by someone else"))?;
 652            }
 653
 654            // Update summary.
 655            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 656                project_id: ActiveValue::set(project_id),
 657                worktree_id: ActiveValue::set(worktree_id),
 658                path: ActiveValue::set(summary.path.clone()),
 659                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 660                error_count: ActiveValue::set(summary.error_count as i32),
 661                warning_count: ActiveValue::set(summary.warning_count as i32),
 662            })
 663            .on_conflict(
 664                OnConflict::columns([
 665                    worktree_diagnostic_summary::Column::ProjectId,
 666                    worktree_diagnostic_summary::Column::WorktreeId,
 667                    worktree_diagnostic_summary::Column::Path,
 668                ])
 669                .update_columns([
 670                    worktree_diagnostic_summary::Column::LanguageServerId,
 671                    worktree_diagnostic_summary::Column::ErrorCount,
 672                    worktree_diagnostic_summary::Column::WarningCount,
 673                ])
 674                .to_owned(),
 675            )
 676            .exec(&*tx)
 677            .await?;
 678
 679            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 680            Ok(connection_ids)
 681        })
 682        .await
 683    }
 684
 685    /// Starts the language server for the given connection.
 686    pub async fn start_language_server(
 687        &self,
 688        update: &proto::StartLanguageServer,
 689        connection: ConnectionId,
 690    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 691        let project_id = ProjectId::from_proto(update.project_id);
 692        self.project_transaction(project_id, |tx| async move {
 693            let server = update.server.as_ref().context("invalid language server")?;
 694
 695            // Ensure the update comes from the host.
 696            let project = project::Entity::find_by_id(project_id)
 697                .one(&*tx)
 698                .await?
 699                .context("no such project")?;
 700            if project.host_connection()? != connection {
 701                return Err(anyhow!("can't update a project hosted by someone else"))?;
 702            }
 703
 704            // Add the newly-started language server.
 705            language_server::Entity::insert(language_server::ActiveModel {
 706                project_id: ActiveValue::set(project_id),
 707                id: ActiveValue::set(server.id as i64),
 708                name: ActiveValue::set(server.name.clone()),
 709                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 710                capabilities: ActiveValue::set(update.capabilities.clone()),
 711            })
 712            .on_conflict(
 713                OnConflict::columns([
 714                    language_server::Column::ProjectId,
 715                    language_server::Column::Id,
 716                ])
 717                .update_columns([
 718                    language_server::Column::Name,
 719                    language_server::Column::Capabilities,
 720                    language_server::Column::WorktreeId,
 721                ])
 722                .to_owned(),
 723            )
 724            .exec(&*tx)
 725            .await?;
 726
 727            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 728            Ok(connection_ids)
 729        })
 730        .await
 731    }
 732
 733    /// Updates the worktree settings for the given connection.
 734    pub async fn update_worktree_settings(
 735        &self,
 736        update: &proto::UpdateWorktreeSettings,
 737        connection: ConnectionId,
 738    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 739        let project_id = ProjectId::from_proto(update.project_id);
 740        let kind = match update.kind {
 741            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 742                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 743            None => proto::LocalSettingsKind::Settings,
 744        };
 745        let kind = LocalSettingsKind::from_proto(kind);
 746        self.project_transaction(project_id, |tx| async move {
 747            // Ensure the update comes from the host.
 748            let project = project::Entity::find_by_id(project_id)
 749                .one(&*tx)
 750                .await?
 751                .context("no such project")?;
 752            if project.host_connection()? != connection {
 753                return Err(anyhow!("can't update a project hosted by someone else"))?;
 754            }
 755
 756            if let Some(content) = &update.content {
 757                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 758                    project_id: ActiveValue::Set(project_id),
 759                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 760                    path: ActiveValue::Set(update.path.clone()),
 761                    content: ActiveValue::Set(content.clone()),
 762                    kind: ActiveValue::Set(kind),
 763                    outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
 764                })
 765                .on_conflict(
 766                    OnConflict::columns([
 767                        worktree_settings_file::Column::ProjectId,
 768                        worktree_settings_file::Column::WorktreeId,
 769                        worktree_settings_file::Column::Path,
 770                    ])
 771                    .update_columns([
 772                        worktree_settings_file::Column::Content,
 773                        worktree_settings_file::Column::OutsideWorktree,
 774                    ])
 775                    .to_owned(),
 776                )
 777                .exec(&*tx)
 778                .await?;
 779            } else {
 780                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 781                    project_id: ActiveValue::Set(project_id),
 782                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 783                    path: ActiveValue::Set(update.path.clone()),
 784                    ..Default::default()
 785                })
 786                .exec(&*tx)
 787                .await?;
 788            }
 789
 790            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 791            Ok(connection_ids)
 792        })
 793        .await
 794    }
 795
 796    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 797        self.transaction(|tx| async move {
 798            Ok(project::Entity::find_by_id(id)
 799                .one(&*tx)
 800                .await?
 801                .context("no such project")?)
 802        })
 803        .await
 804    }
 805
 806    /// Adds the given connection to the specified project
 807    /// in the current room.
 808    pub async fn join_project(
 809        &self,
 810        project_id: ProjectId,
 811        connection: ConnectionId,
 812        user_id: UserId,
 813        committer_name: Option<String>,
 814        committer_email: Option<String>,
 815    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 816        self.project_transaction(project_id, move |tx| {
 817            let committer_name = committer_name.clone();
 818            let committer_email = committer_email.clone();
 819            async move {
 820                let (project, role) = self
 821                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 822                    .await?;
 823                self.join_project_internal(
 824                    project,
 825                    user_id,
 826                    committer_name,
 827                    committer_email,
 828                    connection,
 829                    role,
 830                    &tx,
 831                )
 832                .await
 833            }
 834        })
 835        .await
 836    }
 837
 838    async fn join_project_internal(
 839        &self,
 840        project: project::Model,
 841        user_id: UserId,
 842        committer_name: Option<String>,
 843        committer_email: Option<String>,
 844        connection: ConnectionId,
 845        role: ChannelRole,
 846        tx: &DatabaseTransaction,
 847    ) -> Result<(Project, ReplicaId)> {
 848        let mut collaborators = project
 849            .find_related(project_collaborator::Entity)
 850            .all(tx)
 851            .await?;
 852        let replica_ids = collaborators
 853            .iter()
 854            .map(|c| c.replica_id)
 855            .collect::<HashSet<_>>();
 856        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 857        while replica_ids.contains(&replica_id) {
 858            replica_id.0 += 1;
 859        }
 860        let new_collaborator = project_collaborator::ActiveModel {
 861            project_id: ActiveValue::set(project.id),
 862            connection_id: ActiveValue::set(connection.id as i32),
 863            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 864            user_id: ActiveValue::set(user_id),
 865            replica_id: ActiveValue::set(replica_id),
 866            is_host: ActiveValue::set(false),
 867            id: ActiveValue::NotSet,
 868            committer_name: ActiveValue::set(committer_name),
 869            committer_email: ActiveValue::set(committer_email),
 870        }
 871        .insert(tx)
 872        .await?;
 873        collaborators.push(new_collaborator);
 874
 875        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 876        let mut worktrees = db_worktrees
 877            .into_iter()
 878            .map(|db_worktree| {
 879                (
 880                    db_worktree.id as u64,
 881                    Worktree {
 882                        id: db_worktree.id as u64,
 883                        abs_path: db_worktree.abs_path,
 884                        root_name: db_worktree.root_name,
 885                        visible: db_worktree.visible,
 886                        entries: Default::default(),
 887                        diagnostic_summaries: Default::default(),
 888                        settings_files: Default::default(),
 889                        scan_id: db_worktree.scan_id as u64,
 890                        completed_scan_id: db_worktree.completed_scan_id as u64,
 891                        legacy_repository_entries: Default::default(),
 892                    },
 893                )
 894            })
 895            .collect::<BTreeMap<_, _>>();
 896
 897        // Populate worktree entries.
 898        {
 899            let mut db_entries = worktree_entry::Entity::find()
 900                .filter(
 901                    Condition::all()
 902                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 903                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 904                )
 905                .stream(tx)
 906                .await?;
 907            while let Some(db_entry) = db_entries.next().await {
 908                let db_entry = db_entry?;
 909                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 910                    worktree.entries.push(proto::Entry {
 911                        id: db_entry.id as u64,
 912                        is_dir: db_entry.is_dir,
 913                        path: db_entry.path,
 914                        inode: db_entry.inode as u64,
 915                        mtime: Some(proto::Timestamp {
 916                            seconds: db_entry.mtime_seconds as u64,
 917                            nanos: db_entry.mtime_nanos as u32,
 918                        }),
 919                        canonical_path: db_entry.canonical_path,
 920                        is_ignored: db_entry.is_ignored,
 921                        is_external: db_entry.is_external,
 922                        is_hidden: db_entry.is_hidden,
 923                        // This is only used in the summarization backlog, so if it's None,
 924                        // that just means we won't be able to detect when to resummarize
 925                        // based on total number of backlogged bytes - instead, we'd go
 926                        // on number of files only. That shouldn't be a huge deal in practice.
 927                        size: None,
 928                        is_fifo: db_entry.is_fifo,
 929                    });
 930                }
 931            }
 932        }
 933
 934        // Populate repository entries.
 935        let mut repositories = Vec::new();
 936        {
 937            let db_repository_entries = project_repository::Entity::find()
 938                .filter(
 939                    Condition::all()
 940                        .add(project_repository::Column::ProjectId.eq(project.id))
 941                        .add(project_repository::Column::IsDeleted.eq(false)),
 942                )
 943                .all(tx)
 944                .await?;
 945            for db_repository_entry in db_repository_entries {
 946                let mut repository_statuses = project_repository_statuses::Entity::find()
 947                    .filter(
 948                        Condition::all()
 949                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 950                            .add(
 951                                project_repository_statuses::Column::RepositoryId
 952                                    .eq(db_repository_entry.id),
 953                            )
 954                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 955                    )
 956                    .stream(tx)
 957                    .await?;
 958                let mut updated_statuses = Vec::new();
 959                while let Some(status_entry) = repository_statuses.next().await {
 960                    let status_entry = status_entry?;
 961                    updated_statuses.push(db_status_to_proto(status_entry)?);
 962                }
 963
 964                let current_merge_conflicts = db_repository_entry
 965                    .current_merge_conflicts
 966                    .as_ref()
 967                    .map(|conflicts| serde_json::from_str(conflicts))
 968                    .transpose()?
 969                    .unwrap_or_default();
 970
 971                let branch_summary = db_repository_entry
 972                    .branch_summary
 973                    .as_ref()
 974                    .map(|branch_summary| serde_json::from_str(branch_summary))
 975                    .transpose()?
 976                    .unwrap_or_default();
 977
 978                let head_commit_details = db_repository_entry
 979                    .head_commit_details
 980                    .as_ref()
 981                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 982                    .transpose()?
 983                    .unwrap_or_default();
 984
 985                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 986                    .context("failed to deserialize repository's entry ids")?;
 987
 988                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 989                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 990                        worktree.legacy_repository_entries.insert(
 991                            db_repository_entry.id as u64,
 992                            proto::RepositoryEntry {
 993                                repository_id: db_repository_entry.id as u64,
 994                                updated_statuses,
 995                                removed_statuses: Vec::new(),
 996                                current_merge_conflicts,
 997                                branch_summary,
 998                            },
 999                        );
1000                    }
1001                } else {
1002                    repositories.push(proto::UpdateRepository {
1003                        project_id: db_repository_entry.project_id.0 as u64,
1004                        id: db_repository_entry.id as u64,
1005                        abs_path: db_repository_entry.abs_path,
1006                        entry_ids,
1007                        updated_statuses,
1008                        removed_statuses: Vec::new(),
1009                        current_merge_conflicts,
1010                        branch_summary,
1011                        head_commit_details,
1012                        scan_id: db_repository_entry.scan_id as u64,
1013                        is_last_update: true,
1014                        merge_message: db_repository_entry.merge_message,
1015                        stash_entries: Vec::new(),
1016                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
1017                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
1018                    });
1019                }
1020            }
1021        }
1022
1023        // Populate worktree diagnostic summaries.
1024        {
1025            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1026                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1027                .stream(tx)
1028                .await?;
1029            while let Some(db_summary) = db_summaries.next().await {
1030                let db_summary = db_summary?;
1031                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1032                    worktree
1033                        .diagnostic_summaries
1034                        .push(proto::DiagnosticSummary {
1035                            path: db_summary.path,
1036                            language_server_id: db_summary.language_server_id as u64,
1037                            error_count: db_summary.error_count as u32,
1038                            warning_count: db_summary.warning_count as u32,
1039                        });
1040                }
1041            }
1042        }
1043
1044        // Populate worktree settings files
1045        {
1046            let mut db_settings_files = worktree_settings_file::Entity::find()
1047                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1048                .stream(tx)
1049                .await?;
1050            while let Some(db_settings_file) = db_settings_files.next().await {
1051                let db_settings_file = db_settings_file?;
1052                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1053                    worktree.settings_files.push(WorktreeSettingsFile {
1054                        path: db_settings_file.path,
1055                        content: db_settings_file.content,
1056                        kind: db_settings_file.kind,
1057                        outside_worktree: db_settings_file.outside_worktree,
1058                    });
1059                }
1060            }
1061        }
1062
1063        // Populate language servers.
1064        let language_servers = project
1065            .find_related(language_server::Entity)
1066            .all(tx)
1067            .await?;
1068
1069        let path_style = if project.windows_paths {
1070            PathStyle::Windows
1071        } else {
1072            PathStyle::Posix
1073        };
1074
1075        let project = Project {
1076            id: project.id,
1077            role,
1078            collaborators: collaborators
1079                .into_iter()
1080                .map(|collaborator| ProjectCollaborator {
1081                    connection_id: collaborator.connection(),
1082                    user_id: collaborator.user_id,
1083                    replica_id: collaborator.replica_id,
1084                    is_host: collaborator.is_host,
1085                    committer_name: collaborator.committer_name,
1086                    committer_email: collaborator.committer_email,
1087                })
1088                .collect(),
1089            worktrees,
1090            repositories,
1091            language_servers: language_servers
1092                .into_iter()
1093                .map(|language_server| LanguageServer {
1094                    server: proto::LanguageServer {
1095                        id: language_server.id as u64,
1096                        name: language_server.name,
1097                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1098                    },
1099                    capabilities: language_server.capabilities,
1100                })
1101                .collect(),
1102            path_style,
1103        };
1104        Ok((project, replica_id as ReplicaId))
1105    }
1106
1107    /// Removes the given connection from the specified project.
1108    pub async fn leave_project(
1109        &self,
1110        project_id: ProjectId,
1111        connection: ConnectionId,
1112    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1113        self.project_transaction(project_id, |tx| async move {
1114            let result = project_collaborator::Entity::delete_many()
1115                .filter(
1116                    Condition::all()
1117                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1118                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1119                        .add(
1120                            project_collaborator::Column::ConnectionServerId
1121                                .eq(connection.owner_id as i32),
1122                        ),
1123                )
1124                .exec(&*tx)
1125                .await?;
1126            if result.rows_affected == 0 {
1127                Err(anyhow!("not a collaborator on this project"))?;
1128            }
1129
1130            let project = project::Entity::find_by_id(project_id)
1131                .one(&*tx)
1132                .await?
1133                .context("no such project")?;
1134            let collaborators = project
1135                .find_related(project_collaborator::Entity)
1136                .all(&*tx)
1137                .await?;
1138            let connection_ids: Vec<ConnectionId> = collaborators
1139                .into_iter()
1140                .map(|collaborator| collaborator.connection())
1141                .collect();
1142
1143            follower::Entity::delete_many()
1144                .filter(
1145                    Condition::any()
1146                        .add(
1147                            Condition::all()
1148                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1149                                .add(
1150                                    follower::Column::LeaderConnectionServerId
1151                                        .eq(connection.owner_id),
1152                                )
1153                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1154                        )
1155                        .add(
1156                            Condition::all()
1157                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1158                                .add(
1159                                    follower::Column::FollowerConnectionServerId
1160                                        .eq(connection.owner_id),
1161                                )
1162                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1163                        ),
1164                )
1165                .exec(&*tx)
1166                .await?;
1167
1168            let room = if let Some(room_id) = project.room_id {
1169                Some(self.get_room(room_id, &tx).await?)
1170            } else {
1171                None
1172            };
1173
1174            let left_project = LeftProject {
1175                id: project_id,
1176                should_unshare: connection == project.host_connection()?,
1177                connection_ids,
1178            };
1179            Ok((room, left_project))
1180        })
1181        .await
1182    }
1183
1184    pub async fn check_user_is_project_host(
1185        &self,
1186        project_id: ProjectId,
1187        connection_id: ConnectionId,
1188    ) -> Result<()> {
1189        self.project_transaction(project_id, |tx| async move {
1190            project::Entity::find()
1191                .filter(
1192                    Condition::all()
1193                        .add(project::Column::Id.eq(project_id))
1194                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1195                        .add(
1196                            project::Column::HostConnectionServerId
1197                                .eq(Some(connection_id.owner_id as i32)),
1198                        ),
1199                )
1200                .one(&*tx)
1201                .await?
1202                .context("failed to read project host")?;
1203
1204            Ok(())
1205        })
1206        .await
1207        .map(|guard| guard.into_inner())
1208    }
1209
1210    /// Returns the current project if the given user is authorized to access it with the specified capability.
1211    pub async fn access_project(
1212        &self,
1213        project_id: ProjectId,
1214        connection_id: ConnectionId,
1215        capability: Capability,
1216        tx: &DatabaseTransaction,
1217    ) -> Result<(project::Model, ChannelRole)> {
1218        let project = project::Entity::find_by_id(project_id)
1219            .one(tx)
1220            .await?
1221            .context("no such project")?;
1222
1223        let role_from_room = if let Some(room_id) = project.room_id {
1224            room_participant::Entity::find()
1225                .filter(room_participant::Column::RoomId.eq(room_id))
1226                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1227                .one(tx)
1228                .await?
1229                .and_then(|participant| participant.role)
1230        } else {
1231            None
1232        };
1233
1234        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1235
1236        match capability {
1237            Capability::ReadWrite => {
1238                if !role.can_edit_projects() {
1239                    return Err(anyhow!("not authorized to edit projects"))?;
1240                }
1241            }
1242            Capability::ReadOnly => {
1243                if !role.can_read_projects() {
1244                    return Err(anyhow!("not authorized to read projects"))?;
1245                }
1246            }
1247        }
1248
1249        Ok((project, role))
1250    }
1251
1252    /// Returns the host connection for a read-only request to join a shared project.
1253    pub async fn host_for_read_only_project_request(
1254        &self,
1255        project_id: ProjectId,
1256        connection_id: ConnectionId,
1257    ) -> Result<ConnectionId> {
1258        self.project_transaction(project_id, |tx| async move {
1259            let (project, _) = self
1260                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1261                .await?;
1262            project.host_connection()
1263        })
1264        .await
1265        .map(|guard| guard.into_inner())
1266    }
1267
1268    /// Returns the host connection for a request to join a shared project.
1269    pub async fn host_for_mutating_project_request(
1270        &self,
1271        project_id: ProjectId,
1272        connection_id: ConnectionId,
1273    ) -> Result<ConnectionId> {
1274        self.project_transaction(project_id, |tx| async move {
1275            let (project, _) = self
1276                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1277                .await?;
1278            project.host_connection()
1279        })
1280        .await
1281        .map(|guard| guard.into_inner())
1282    }
1283
1284    pub async fn connections_for_buffer_update(
1285        &self,
1286        project_id: ProjectId,
1287        connection_id: ConnectionId,
1288        capability: Capability,
1289    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1290        self.project_transaction(project_id, |tx| async move {
1291            // Authorize
1292            let (project, _) = self
1293                .access_project(project_id, connection_id, capability, &tx)
1294                .await?;
1295
1296            let host_connection_id = project.host_connection()?;
1297
1298            let collaborators = project_collaborator::Entity::find()
1299                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1300                .all(&*tx)
1301                .await?;
1302
1303            let guest_connection_ids = collaborators
1304                .into_iter()
1305                .filter_map(|collaborator| {
1306                    if collaborator.is_host {
1307                        None
1308                    } else {
1309                        Some(collaborator.connection())
1310                    }
1311                })
1312                .collect();
1313
1314            Ok((host_connection_id, guest_connection_ids))
1315        })
1316        .await
1317    }
1318
1319    /// Returns the connection IDs in the given project.
1320    ///
1321    /// The provided `connection_id` must also be a collaborator in the project,
1322    /// otherwise an error will be returned.
1323    pub async fn project_connection_ids(
1324        &self,
1325        project_id: ProjectId,
1326        connection_id: ConnectionId,
1327        exclude_dev_server: bool,
1328    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1329        self.project_transaction(project_id, |tx| async move {
1330            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1331                .await
1332        })
1333        .await
1334    }
1335
1336    async fn internal_project_connection_ids(
1337        &self,
1338        project_id: ProjectId,
1339        connection_id: ConnectionId,
1340        exclude_dev_server: bool,
1341        tx: &DatabaseTransaction,
1342    ) -> Result<HashSet<ConnectionId>> {
1343        let project = project::Entity::find_by_id(project_id)
1344            .one(tx)
1345            .await?
1346            .context("no such project")?;
1347
1348        let mut collaborators = project_collaborator::Entity::find()
1349            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1350            .stream(tx)
1351            .await?;
1352
1353        let mut connection_ids = HashSet::default();
1354        if let Some(host_connection) = project.host_connection().log_err()
1355            && !exclude_dev_server
1356        {
1357            connection_ids.insert(host_connection);
1358        }
1359
1360        while let Some(collaborator) = collaborators.next().await {
1361            let collaborator = collaborator?;
1362            connection_ids.insert(collaborator.connection());
1363        }
1364
1365        if connection_ids.contains(&connection_id)
1366            || Some(connection_id) == project.host_connection().ok()
1367        {
1368            Ok(connection_ids)
1369        } else {
1370            Err(anyhow!(
1371                "can only send project updates to a project you're in"
1372            ))?
1373        }
1374    }
1375
1376    async fn project_guest_connection_ids(
1377        &self,
1378        project_id: ProjectId,
1379        tx: &DatabaseTransaction,
1380    ) -> Result<Vec<ConnectionId>> {
1381        let mut collaborators = project_collaborator::Entity::find()
1382            .filter(
1383                project_collaborator::Column::ProjectId
1384                    .eq(project_id)
1385                    .and(project_collaborator::Column::IsHost.eq(false)),
1386            )
1387            .stream(tx)
1388            .await?;
1389
1390        let mut guest_connection_ids = Vec::new();
1391        while let Some(collaborator) = collaborators.next().await {
1392            let collaborator = collaborator?;
1393            guest_connection_ids.push(collaborator.connection());
1394        }
1395        Ok(guest_connection_ids)
1396    }
1397
1398    /// Returns the [`RoomId`] for the given project.
1399    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1400        self.transaction(|tx| async move {
1401            Ok(project::Entity::find_by_id(project_id)
1402                .one(&*tx)
1403                .await?
1404                .and_then(|project| project.room_id))
1405        })
1406        .await
1407    }
1408
1409    pub async fn check_room_participants(
1410        &self,
1411        room_id: RoomId,
1412        leader_id: ConnectionId,
1413        follower_id: ConnectionId,
1414    ) -> Result<()> {
1415        self.transaction(|tx| async move {
1416            use room_participant::Column;
1417
1418            let count = room_participant::Entity::find()
1419                .filter(
1420                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1421                        Condition::any()
1422                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1423                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1424                            ))
1425                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1426                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1427                            )),
1428                    ),
1429                )
1430                .count(&*tx)
1431                .await?;
1432
1433            if count < 2 {
1434                Err(anyhow!("not room participants"))?;
1435            }
1436
1437            Ok(())
1438        })
1439        .await
1440    }
1441
1442    /// Adds the given follower connection as a follower of the given leader connection.
1443    pub async fn follow(
1444        &self,
1445        room_id: RoomId,
1446        project_id: ProjectId,
1447        leader_connection: ConnectionId,
1448        follower_connection: ConnectionId,
1449    ) -> Result<TransactionGuard<proto::Room>> {
1450        self.room_transaction(room_id, |tx| async move {
1451            follower::ActiveModel {
1452                room_id: ActiveValue::set(room_id),
1453                project_id: ActiveValue::set(project_id),
1454                leader_connection_server_id: ActiveValue::set(ServerId(
1455                    leader_connection.owner_id as i32,
1456                )),
1457                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1458                follower_connection_server_id: ActiveValue::set(ServerId(
1459                    follower_connection.owner_id as i32,
1460                )),
1461                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1462                ..Default::default()
1463            }
1464            .insert(&*tx)
1465            .await?;
1466
1467            let room = self.get_room(room_id, &tx).await?;
1468            Ok(room)
1469        })
1470        .await
1471    }
1472
1473    /// Removes the given follower connection as a follower of the given leader connection.
1474    pub async fn unfollow(
1475        &self,
1476        room_id: RoomId,
1477        project_id: ProjectId,
1478        leader_connection: ConnectionId,
1479        follower_connection: ConnectionId,
1480    ) -> Result<TransactionGuard<proto::Room>> {
1481        self.room_transaction(room_id, |tx| async move {
1482            follower::Entity::delete_many()
1483                .filter(
1484                    Condition::all()
1485                        .add(follower::Column::RoomId.eq(room_id))
1486                        .add(follower::Column::ProjectId.eq(project_id))
1487                        .add(
1488                            follower::Column::LeaderConnectionServerId
1489                                .eq(leader_connection.owner_id),
1490                        )
1491                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1492                        .add(
1493                            follower::Column::FollowerConnectionServerId
1494                                .eq(follower_connection.owner_id),
1495                        )
1496                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1497                )
1498                .exec(&*tx)
1499                .await?;
1500
1501            let room = self.get_room(room_id, &tx).await?;
1502            Ok(room)
1503        })
1504        .await
1505    }
1506}