projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                ..Default::default()
 102            }
 103            .insert(&*tx)
 104            .await?;
 105
 106            let room = self.get_room(room_id, &tx).await?;
 107            Ok((project.id, room))
 108        })
 109        .await
 110    }
 111
 112    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 113        self.weak_transaction(|tx| async move {
 114            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 115            Ok(())
 116        })
 117        .await
 118    }
 119
 120    /// Unshares the given project.
 121    pub async fn unshare_project(
 122        &self,
 123        project_id: ProjectId,
 124        connection: ConnectionId,
 125    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 126        self.project_transaction(project_id, |tx| async move {
 127            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 128            let project = project::Entity::find_by_id(project_id)
 129                .one(&*tx)
 130                .await?
 131                .ok_or_else(|| anyhow!("project not found"))?;
 132            let room = if let Some(room_id) = project.room_id {
 133                Some(self.get_room(room_id, &tx).await?)
 134            } else {
 135                None
 136            };
 137            if project.host_connection()? == connection {
 138                return Ok((true, room, guest_connection_ids));
 139            }
 140            Err(anyhow!("cannot unshare a project hosted by another user"))?
 141        })
 142        .await
 143    }
 144
 145    /// Updates the worktrees associated with the given project.
 146    pub async fn update_project(
 147        &self,
 148        project_id: ProjectId,
 149        connection: ConnectionId,
 150        worktrees: &[proto::WorktreeMetadata],
 151    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 152        self.project_transaction(project_id, |tx| async move {
 153            let project = project::Entity::find_by_id(project_id)
 154                .filter(
 155                    Condition::all()
 156                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 157                        .add(
 158                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 159                        ),
 160                )
 161                .one(&*tx)
 162                .await?
 163                .ok_or_else(|| anyhow!("no such project"))?;
 164
 165            self.update_project_worktrees(project.id, worktrees, &tx)
 166                .await?;
 167
 168            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 169
 170            let room = if let Some(room_id) = project.room_id {
 171                Some(self.get_room(room_id, &tx).await?)
 172            } else {
 173                None
 174            };
 175
 176            Ok((room, guest_connection_ids))
 177        })
 178        .await
 179    }
 180
 181    pub(in crate::db) async fn update_project_worktrees(
 182        &self,
 183        project_id: ProjectId,
 184        worktrees: &[proto::WorktreeMetadata],
 185        tx: &DatabaseTransaction,
 186    ) -> Result<()> {
 187        if !worktrees.is_empty() {
 188            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 189                id: ActiveValue::set(worktree.id as i64),
 190                project_id: ActiveValue::set(project_id),
 191                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 192                root_name: ActiveValue::set(worktree.root_name.clone()),
 193                visible: ActiveValue::set(worktree.visible),
 194                scan_id: ActiveValue::set(0),
 195                completed_scan_id: ActiveValue::set(0),
 196            }))
 197            .on_conflict(
 198                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 199                    .update_column(worktree::Column::RootName)
 200                    .to_owned(),
 201            )
 202            .exec(tx)
 203            .await?;
 204        }
 205
 206        worktree::Entity::delete_many()
 207            .filter(worktree::Column::ProjectId.eq(project_id).and(
 208                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 209            ))
 210            .exec(tx)
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    pub async fn update_worktree(
 217        &self,
 218        update: &proto::UpdateWorktree,
 219        connection: ConnectionId,
 220    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 221        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 222            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 223        {
 224            return Err(anyhow!(
 225                "invalid worktree update. removed entries: {}, updated entries: {}",
 226                update.removed_entries.len(),
 227                update.updated_entries.len()
 228            ))?;
 229        }
 230
 231        let project_id = ProjectId::from_proto(update.project_id);
 232        let worktree_id = update.worktree_id as i64;
 233        self.project_transaction(project_id, |tx| async move {
 234            // Ensure the update comes from the host.
 235            let _project = project::Entity::find_by_id(project_id)
 236                .filter(
 237                    Condition::all()
 238                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 239                        .add(
 240                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 241                        ),
 242                )
 243                .one(&*tx)
 244                .await?
 245                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 246
 247            // Update metadata.
 248            worktree::Entity::update(worktree::ActiveModel {
 249                id: ActiveValue::set(worktree_id),
 250                project_id: ActiveValue::set(project_id),
 251                root_name: ActiveValue::set(update.root_name.clone()),
 252                scan_id: ActiveValue::set(update.scan_id as i64),
 253                completed_scan_id: if update.is_last_update {
 254                    ActiveValue::set(update.scan_id as i64)
 255                } else {
 256                    ActiveValue::default()
 257                },
 258                abs_path: ActiveValue::set(update.abs_path.clone()),
 259                ..Default::default()
 260            })
 261            .exec(&*tx)
 262            .await?;
 263
 264            if !update.updated_entries.is_empty() {
 265                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 266                    let mtime = entry.mtime.clone().unwrap_or_default();
 267                    worktree_entry::ActiveModel {
 268                        project_id: ActiveValue::set(project_id),
 269                        worktree_id: ActiveValue::set(worktree_id),
 270                        id: ActiveValue::set(entry.id as i64),
 271                        is_dir: ActiveValue::set(entry.is_dir),
 272                        path: ActiveValue::set(entry.path.clone()),
 273                        inode: ActiveValue::set(entry.inode as i64),
 274                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 275                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 276                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 277                        is_ignored: ActiveValue::set(entry.is_ignored),
 278                        git_status: ActiveValue::set(None),
 279                        is_external: ActiveValue::set(entry.is_external),
 280                        is_deleted: ActiveValue::set(false),
 281                        scan_id: ActiveValue::set(update.scan_id as i64),
 282                        is_fifo: ActiveValue::set(entry.is_fifo),
 283                    }
 284                }))
 285                .on_conflict(
 286                    OnConflict::columns([
 287                        worktree_entry::Column::ProjectId,
 288                        worktree_entry::Column::WorktreeId,
 289                        worktree_entry::Column::Id,
 290                    ])
 291                    .update_columns([
 292                        worktree_entry::Column::IsDir,
 293                        worktree_entry::Column::Path,
 294                        worktree_entry::Column::Inode,
 295                        worktree_entry::Column::MtimeSeconds,
 296                        worktree_entry::Column::MtimeNanos,
 297                        worktree_entry::Column::CanonicalPath,
 298                        worktree_entry::Column::IsIgnored,
 299                        worktree_entry::Column::ScanId,
 300                    ])
 301                    .to_owned(),
 302                )
 303                .exec(&*tx)
 304                .await?;
 305            }
 306
 307            if !update.removed_entries.is_empty() {
 308                worktree_entry::Entity::update_many()
 309                    .filter(
 310                        worktree_entry::Column::ProjectId
 311                            .eq(project_id)
 312                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 313                            .and(
 314                                worktree_entry::Column::Id
 315                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 316                            ),
 317                    )
 318                    .set(worktree_entry::ActiveModel {
 319                        is_deleted: ActiveValue::Set(true),
 320                        scan_id: ActiveValue::Set(update.scan_id as i64),
 321                        ..Default::default()
 322                    })
 323                    .exec(&*tx)
 324                    .await?;
 325            }
 326
 327            if !update.updated_repositories.is_empty() {
 328                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 329                    |repository| {
 330                        worktree_repository::ActiveModel {
 331                            project_id: ActiveValue::set(project_id),
 332                            worktree_id: ActiveValue::set(worktree_id),
 333                            work_directory_id: ActiveValue::set(
 334                                repository.work_directory_id as i64,
 335                            ),
 336                            scan_id: ActiveValue::set(update.scan_id as i64),
 337                            branch: ActiveValue::set(repository.branch.clone()),
 338                            is_deleted: ActiveValue::set(false),
 339                            branch_summary: ActiveValue::Set(
 340                                repository
 341                                    .branch_summary
 342                                    .as_ref()
 343                                    .map(|summary| serde_json::to_string(summary).unwrap()),
 344                            ),
 345                            current_merge_conflicts: ActiveValue::Set(Some(
 346                                serde_json::to_string(&repository.current_merge_conflicts).unwrap(),
 347                            )),
 348                        }
 349                    },
 350                ))
 351                .on_conflict(
 352                    OnConflict::columns([
 353                        worktree_repository::Column::ProjectId,
 354                        worktree_repository::Column::WorktreeId,
 355                        worktree_repository::Column::WorkDirectoryId,
 356                    ])
 357                    .update_columns([
 358                        worktree_repository::Column::ScanId,
 359                        worktree_repository::Column::Branch,
 360                        worktree_repository::Column::BranchSummary,
 361                        worktree_repository::Column::CurrentMergeConflicts,
 362                    ])
 363                    .to_owned(),
 364                )
 365                .exec(&*tx)
 366                .await?;
 367
 368                let has_any_statuses = update
 369                    .updated_repositories
 370                    .iter()
 371                    .any(|repository| !repository.updated_statuses.is_empty());
 372
 373                if has_any_statuses {
 374                    worktree_repository_statuses::Entity::insert_many(
 375                        update.updated_repositories.iter().flat_map(
 376                            |repository: &proto::RepositoryEntry| {
 377                                repository.updated_statuses.iter().map(|status_entry| {
 378                                    let (repo_path, status_kind, first_status, second_status) =
 379                                        proto_status_to_db(status_entry.clone());
 380                                    worktree_repository_statuses::ActiveModel {
 381                                        project_id: ActiveValue::set(project_id),
 382                                        worktree_id: ActiveValue::set(worktree_id),
 383                                        work_directory_id: ActiveValue::set(
 384                                            repository.work_directory_id as i64,
 385                                        ),
 386                                        scan_id: ActiveValue::set(update.scan_id as i64),
 387                                        is_deleted: ActiveValue::set(false),
 388                                        repo_path: ActiveValue::set(repo_path),
 389                                        status: ActiveValue::set(0),
 390                                        status_kind: ActiveValue::set(status_kind),
 391                                        first_status: ActiveValue::set(first_status),
 392                                        second_status: ActiveValue::set(second_status),
 393                                    }
 394                                })
 395                            },
 396                        ),
 397                    )
 398                    .on_conflict(
 399                        OnConflict::columns([
 400                            worktree_repository_statuses::Column::ProjectId,
 401                            worktree_repository_statuses::Column::WorktreeId,
 402                            worktree_repository_statuses::Column::WorkDirectoryId,
 403                            worktree_repository_statuses::Column::RepoPath,
 404                        ])
 405                        .update_columns([
 406                            worktree_repository_statuses::Column::ScanId,
 407                            worktree_repository_statuses::Column::StatusKind,
 408                            worktree_repository_statuses::Column::FirstStatus,
 409                            worktree_repository_statuses::Column::SecondStatus,
 410                        ])
 411                        .to_owned(),
 412                    )
 413                    .exec(&*tx)
 414                    .await?;
 415                }
 416
 417                let has_any_removed_statuses = update
 418                    .updated_repositories
 419                    .iter()
 420                    .any(|repository| !repository.removed_statuses.is_empty());
 421
 422                if has_any_removed_statuses {
 423                    worktree_repository_statuses::Entity::update_many()
 424                        .filter(
 425                            worktree_repository_statuses::Column::ProjectId
 426                                .eq(project_id)
 427                                .and(
 428                                    worktree_repository_statuses::Column::WorktreeId
 429                                        .eq(worktree_id),
 430                                )
 431                                .and(
 432                                    worktree_repository_statuses::Column::RepoPath.is_in(
 433                                        update.updated_repositories.iter().flat_map(|repository| {
 434                                            repository.removed_statuses.iter()
 435                                        }),
 436                                    ),
 437                                ),
 438                        )
 439                        .set(worktree_repository_statuses::ActiveModel {
 440                            is_deleted: ActiveValue::Set(true),
 441                            scan_id: ActiveValue::Set(update.scan_id as i64),
 442                            ..Default::default()
 443                        })
 444                        .exec(&*tx)
 445                        .await?;
 446                }
 447            }
 448
 449            if !update.removed_repositories.is_empty() {
 450                worktree_repository::Entity::update_many()
 451                    .filter(
 452                        worktree_repository::Column::ProjectId
 453                            .eq(project_id)
 454                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 455                            .and(
 456                                worktree_repository::Column::WorkDirectoryId
 457                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 458                            ),
 459                    )
 460                    .set(worktree_repository::ActiveModel {
 461                        is_deleted: ActiveValue::Set(true),
 462                        scan_id: ActiveValue::Set(update.scan_id as i64),
 463                        ..Default::default()
 464                    })
 465                    .exec(&*tx)
 466                    .await?;
 467            }
 468
 469            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 470            Ok(connection_ids)
 471        })
 472        .await
 473    }
 474
 475    /// Updates the diagnostic summary for the given connection.
 476    pub async fn update_diagnostic_summary(
 477        &self,
 478        update: &proto::UpdateDiagnosticSummary,
 479        connection: ConnectionId,
 480    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 481        let project_id = ProjectId::from_proto(update.project_id);
 482        let worktree_id = update.worktree_id as i64;
 483        self.project_transaction(project_id, |tx| async move {
 484            let summary = update
 485                .summary
 486                .as_ref()
 487                .ok_or_else(|| anyhow!("invalid summary"))?;
 488
 489            // Ensure the update comes from the host.
 490            let project = project::Entity::find_by_id(project_id)
 491                .one(&*tx)
 492                .await?
 493                .ok_or_else(|| anyhow!("no such project"))?;
 494            if project.host_connection()? != connection {
 495                return Err(anyhow!("can't update a project hosted by someone else"))?;
 496            }
 497
 498            // Update summary.
 499            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 500                project_id: ActiveValue::set(project_id),
 501                worktree_id: ActiveValue::set(worktree_id),
 502                path: ActiveValue::set(summary.path.clone()),
 503                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 504                error_count: ActiveValue::set(summary.error_count as i32),
 505                warning_count: ActiveValue::set(summary.warning_count as i32),
 506            })
 507            .on_conflict(
 508                OnConflict::columns([
 509                    worktree_diagnostic_summary::Column::ProjectId,
 510                    worktree_diagnostic_summary::Column::WorktreeId,
 511                    worktree_diagnostic_summary::Column::Path,
 512                ])
 513                .update_columns([
 514                    worktree_diagnostic_summary::Column::LanguageServerId,
 515                    worktree_diagnostic_summary::Column::ErrorCount,
 516                    worktree_diagnostic_summary::Column::WarningCount,
 517                ])
 518                .to_owned(),
 519            )
 520            .exec(&*tx)
 521            .await?;
 522
 523            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 524            Ok(connection_ids)
 525        })
 526        .await
 527    }
 528
 529    /// Starts the language server for the given connection.
 530    pub async fn start_language_server(
 531        &self,
 532        update: &proto::StartLanguageServer,
 533        connection: ConnectionId,
 534    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 535        let project_id = ProjectId::from_proto(update.project_id);
 536        self.project_transaction(project_id, |tx| async move {
 537            let server = update
 538                .server
 539                .as_ref()
 540                .ok_or_else(|| anyhow!("invalid language server"))?;
 541
 542            // Ensure the update comes from the host.
 543            let project = project::Entity::find_by_id(project_id)
 544                .one(&*tx)
 545                .await?
 546                .ok_or_else(|| anyhow!("no such project"))?;
 547            if project.host_connection()? != connection {
 548                return Err(anyhow!("can't update a project hosted by someone else"))?;
 549            }
 550
 551            // Add the newly-started language server.
 552            language_server::Entity::insert(language_server::ActiveModel {
 553                project_id: ActiveValue::set(project_id),
 554                id: ActiveValue::set(server.id as i64),
 555                name: ActiveValue::set(server.name.clone()),
 556            })
 557            .on_conflict(
 558                OnConflict::columns([
 559                    language_server::Column::ProjectId,
 560                    language_server::Column::Id,
 561                ])
 562                .update_column(language_server::Column::Name)
 563                .to_owned(),
 564            )
 565            .exec(&*tx)
 566            .await?;
 567
 568            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 569            Ok(connection_ids)
 570        })
 571        .await
 572    }
 573
 574    /// Updates the worktree settings for the given connection.
 575    pub async fn update_worktree_settings(
 576        &self,
 577        update: &proto::UpdateWorktreeSettings,
 578        connection: ConnectionId,
 579    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 580        let project_id = ProjectId::from_proto(update.project_id);
 581        let kind = match update.kind {
 582            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 583                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 584            None => proto::LocalSettingsKind::Settings,
 585        };
 586        let kind = LocalSettingsKind::from_proto(kind);
 587        self.project_transaction(project_id, |tx| async move {
 588            // Ensure the update comes from the host.
 589            let project = project::Entity::find_by_id(project_id)
 590                .one(&*tx)
 591                .await?
 592                .ok_or_else(|| anyhow!("no such project"))?;
 593            if project.host_connection()? != connection {
 594                return Err(anyhow!("can't update a project hosted by someone else"))?;
 595            }
 596
 597            if let Some(content) = &update.content {
 598                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 599                    project_id: ActiveValue::Set(project_id),
 600                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 601                    path: ActiveValue::Set(update.path.clone()),
 602                    content: ActiveValue::Set(content.clone()),
 603                    kind: ActiveValue::Set(kind),
 604                })
 605                .on_conflict(
 606                    OnConflict::columns([
 607                        worktree_settings_file::Column::ProjectId,
 608                        worktree_settings_file::Column::WorktreeId,
 609                        worktree_settings_file::Column::Path,
 610                    ])
 611                    .update_column(worktree_settings_file::Column::Content)
 612                    .to_owned(),
 613                )
 614                .exec(&*tx)
 615                .await?;
 616            } else {
 617                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 618                    project_id: ActiveValue::Set(project_id),
 619                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 620                    path: ActiveValue::Set(update.path.clone()),
 621                    ..Default::default()
 622                })
 623                .exec(&*tx)
 624                .await?;
 625            }
 626
 627            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 628            Ok(connection_ids)
 629        })
 630        .await
 631    }
 632
 633    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 634        self.transaction(|tx| async move {
 635            Ok(project::Entity::find_by_id(id)
 636                .one(&*tx)
 637                .await?
 638                .ok_or_else(|| anyhow!("no such project"))?)
 639        })
 640        .await
 641    }
 642
 643    /// Adds the given connection to the specified project
 644    /// in the current room.
 645    pub async fn join_project(
 646        &self,
 647        project_id: ProjectId,
 648        connection: ConnectionId,
 649        user_id: UserId,
 650    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 651        self.project_transaction(project_id, |tx| async move {
 652            let (project, role) = self
 653                .access_project(project_id, connection, Capability::ReadOnly, &tx)
 654                .await?;
 655            self.join_project_internal(project, user_id, connection, role, &tx)
 656                .await
 657        })
 658        .await
 659    }
 660
 661    async fn join_project_internal(
 662        &self,
 663        project: project::Model,
 664        user_id: UserId,
 665        connection: ConnectionId,
 666        role: ChannelRole,
 667        tx: &DatabaseTransaction,
 668    ) -> Result<(Project, ReplicaId)> {
 669        let mut collaborators = project
 670            .find_related(project_collaborator::Entity)
 671            .all(tx)
 672            .await?;
 673        let replica_ids = collaborators
 674            .iter()
 675            .map(|c| c.replica_id)
 676            .collect::<HashSet<_>>();
 677        let mut replica_id = ReplicaId(1);
 678        while replica_ids.contains(&replica_id) {
 679            replica_id.0 += 1;
 680        }
 681        let new_collaborator = project_collaborator::ActiveModel {
 682            project_id: ActiveValue::set(project.id),
 683            connection_id: ActiveValue::set(connection.id as i32),
 684            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 685            user_id: ActiveValue::set(user_id),
 686            replica_id: ActiveValue::set(replica_id),
 687            is_host: ActiveValue::set(false),
 688            ..Default::default()
 689        }
 690        .insert(tx)
 691        .await?;
 692        collaborators.push(new_collaborator);
 693
 694        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 695        let mut worktrees = db_worktrees
 696            .into_iter()
 697            .map(|db_worktree| {
 698                (
 699                    db_worktree.id as u64,
 700                    Worktree {
 701                        id: db_worktree.id as u64,
 702                        abs_path: db_worktree.abs_path,
 703                        root_name: db_worktree.root_name,
 704                        visible: db_worktree.visible,
 705                        entries: Default::default(),
 706                        repository_entries: Default::default(),
 707                        diagnostic_summaries: Default::default(),
 708                        settings_files: Default::default(),
 709                        scan_id: db_worktree.scan_id as u64,
 710                        completed_scan_id: db_worktree.completed_scan_id as u64,
 711                    },
 712                )
 713            })
 714            .collect::<BTreeMap<_, _>>();
 715
 716        // Populate worktree entries.
 717        {
 718            let mut db_entries = worktree_entry::Entity::find()
 719                .filter(
 720                    Condition::all()
 721                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 722                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 723                )
 724                .stream(tx)
 725                .await?;
 726            while let Some(db_entry) = db_entries.next().await {
 727                let db_entry = db_entry?;
 728                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 729                    worktree.entries.push(proto::Entry {
 730                        id: db_entry.id as u64,
 731                        is_dir: db_entry.is_dir,
 732                        path: db_entry.path,
 733                        inode: db_entry.inode as u64,
 734                        mtime: Some(proto::Timestamp {
 735                            seconds: db_entry.mtime_seconds as u64,
 736                            nanos: db_entry.mtime_nanos as u32,
 737                        }),
 738                        canonical_path: db_entry.canonical_path,
 739                        is_ignored: db_entry.is_ignored,
 740                        is_external: db_entry.is_external,
 741                        // This is only used in the summarization backlog, so if it's None,
 742                        // that just means we won't be able to detect when to resummarize
 743                        // based on total number of backlogged bytes - instead, we'd go
 744                        // on number of files only. That shouldn't be a huge deal in practice.
 745                        size: None,
 746                        is_fifo: db_entry.is_fifo,
 747                    });
 748                }
 749            }
 750        }
 751
 752        // Populate repository entries.
 753        {
 754            let db_repository_entries = worktree_repository::Entity::find()
 755                .filter(
 756                    Condition::all()
 757                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 758                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 759                )
 760                .all(tx)
 761                .await?;
 762            for db_repository_entry in db_repository_entries {
 763                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 764                {
 765                    let mut repository_statuses = worktree_repository_statuses::Entity::find()
 766                        .filter(
 767                            Condition::all()
 768                                .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
 769                                .add(
 770                                    worktree_repository_statuses::Column::WorktreeId
 771                                        .eq(worktree.id),
 772                                )
 773                                .add(
 774                                    worktree_repository_statuses::Column::WorkDirectoryId
 775                                        .eq(db_repository_entry.work_directory_id),
 776                                )
 777                                .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
 778                        )
 779                        .stream(tx)
 780                        .await?;
 781                    let mut updated_statuses = Vec::new();
 782                    while let Some(status_entry) = repository_statuses.next().await {
 783                        let status_entry: worktree_repository_statuses::Model = status_entry?;
 784                        updated_statuses.push(db_status_to_proto(status_entry)?);
 785                    }
 786
 787                    let current_merge_conflicts = db_repository_entry
 788                        .current_merge_conflicts
 789                        .as_ref()
 790                        .map(|conflicts| serde_json::from_str(&conflicts))
 791                        .transpose()?
 792                        .unwrap_or_default();
 793
 794                    let branch_summary = db_repository_entry
 795                        .branch_summary
 796                        .as_ref()
 797                        .map(|branch_summary| serde_json::from_str(&branch_summary))
 798                        .transpose()?
 799                        .unwrap_or_default();
 800
 801                    worktree.repository_entries.insert(
 802                        db_repository_entry.work_directory_id as u64,
 803                        proto::RepositoryEntry {
 804                            work_directory_id: db_repository_entry.work_directory_id as u64,
 805                            branch: db_repository_entry.branch,
 806                            updated_statuses,
 807                            removed_statuses: Vec::new(),
 808                            current_merge_conflicts,
 809                            branch_summary,
 810                        },
 811                    );
 812                }
 813            }
 814        }
 815
 816        // Populate worktree diagnostic summaries.
 817        {
 818            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 819                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 820                .stream(tx)
 821                .await?;
 822            while let Some(db_summary) = db_summaries.next().await {
 823                let db_summary = db_summary?;
 824                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 825                    worktree
 826                        .diagnostic_summaries
 827                        .push(proto::DiagnosticSummary {
 828                            path: db_summary.path,
 829                            language_server_id: db_summary.language_server_id as u64,
 830                            error_count: db_summary.error_count as u32,
 831                            warning_count: db_summary.warning_count as u32,
 832                        });
 833                }
 834            }
 835        }
 836
 837        // Populate worktree settings files
 838        {
 839            let mut db_settings_files = worktree_settings_file::Entity::find()
 840                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 841                .stream(tx)
 842                .await?;
 843            while let Some(db_settings_file) = db_settings_files.next().await {
 844                let db_settings_file = db_settings_file?;
 845                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 846                    worktree.settings_files.push(WorktreeSettingsFile {
 847                        path: db_settings_file.path,
 848                        content: db_settings_file.content,
 849                        kind: db_settings_file.kind,
 850                    });
 851                }
 852            }
 853        }
 854
 855        // Populate language servers.
 856        let language_servers = project
 857            .find_related(language_server::Entity)
 858            .all(tx)
 859            .await?;
 860
 861        let project = Project {
 862            id: project.id,
 863            role,
 864            collaborators: collaborators
 865                .into_iter()
 866                .map(|collaborator| ProjectCollaborator {
 867                    connection_id: collaborator.connection(),
 868                    user_id: collaborator.user_id,
 869                    replica_id: collaborator.replica_id,
 870                    is_host: collaborator.is_host,
 871                })
 872                .collect(),
 873            worktrees,
 874            language_servers: language_servers
 875                .into_iter()
 876                .map(|language_server| proto::LanguageServer {
 877                    id: language_server.id as u64,
 878                    name: language_server.name,
 879                    worktree_id: None,
 880                })
 881                .collect(),
 882        };
 883        Ok((project, replica_id as ReplicaId))
 884    }
 885
 886    /// Removes the given connection from the specified project.
 887    pub async fn leave_project(
 888        &self,
 889        project_id: ProjectId,
 890        connection: ConnectionId,
 891    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 892        self.project_transaction(project_id, |tx| async move {
 893            let result = project_collaborator::Entity::delete_many()
 894                .filter(
 895                    Condition::all()
 896                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 897                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 898                        .add(
 899                            project_collaborator::Column::ConnectionServerId
 900                                .eq(connection.owner_id as i32),
 901                        ),
 902                )
 903                .exec(&*tx)
 904                .await?;
 905            if result.rows_affected == 0 {
 906                Err(anyhow!("not a collaborator on this project"))?;
 907            }
 908
 909            let project = project::Entity::find_by_id(project_id)
 910                .one(&*tx)
 911                .await?
 912                .ok_or_else(|| anyhow!("no such project"))?;
 913            let collaborators = project
 914                .find_related(project_collaborator::Entity)
 915                .all(&*tx)
 916                .await?;
 917            let connection_ids: Vec<ConnectionId> = collaborators
 918                .into_iter()
 919                .map(|collaborator| collaborator.connection())
 920                .collect();
 921
 922            follower::Entity::delete_many()
 923                .filter(
 924                    Condition::any()
 925                        .add(
 926                            Condition::all()
 927                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 928                                .add(
 929                                    follower::Column::LeaderConnectionServerId
 930                                        .eq(connection.owner_id),
 931                                )
 932                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 933                        )
 934                        .add(
 935                            Condition::all()
 936                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 937                                .add(
 938                                    follower::Column::FollowerConnectionServerId
 939                                        .eq(connection.owner_id),
 940                                )
 941                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 942                        ),
 943                )
 944                .exec(&*tx)
 945                .await?;
 946
 947            let room = if let Some(room_id) = project.room_id {
 948                Some(self.get_room(room_id, &tx).await?)
 949            } else {
 950                None
 951            };
 952
 953            let left_project = LeftProject {
 954                id: project_id,
 955                should_unshare: connection == project.host_connection()?,
 956                connection_ids,
 957            };
 958            Ok((room, left_project))
 959        })
 960        .await
 961    }
 962
 963    pub async fn check_user_is_project_host(
 964        &self,
 965        project_id: ProjectId,
 966        connection_id: ConnectionId,
 967    ) -> Result<()> {
 968        self.project_transaction(project_id, |tx| async move {
 969            project::Entity::find()
 970                .filter(
 971                    Condition::all()
 972                        .add(project::Column::Id.eq(project_id))
 973                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 974                        .add(
 975                            project::Column::HostConnectionServerId
 976                                .eq(Some(connection_id.owner_id as i32)),
 977                        ),
 978                )
 979                .one(&*tx)
 980                .await?
 981                .ok_or_else(|| anyhow!("failed to read project host"))?;
 982
 983            Ok(())
 984        })
 985        .await
 986        .map(|guard| guard.into_inner())
 987    }
 988
 989    /// Returns the current project if the given user is authorized to access it with the specified capability.
 990    pub async fn access_project(
 991        &self,
 992        project_id: ProjectId,
 993        connection_id: ConnectionId,
 994        capability: Capability,
 995        tx: &DatabaseTransaction,
 996    ) -> Result<(project::Model, ChannelRole)> {
 997        let project = project::Entity::find_by_id(project_id)
 998            .one(tx)
 999            .await?
1000            .ok_or_else(|| anyhow!("no such project"))?;
1001
1002        let role_from_room = if let Some(room_id) = project.room_id {
1003            room_participant::Entity::find()
1004                .filter(room_participant::Column::RoomId.eq(room_id))
1005                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1006                .one(tx)
1007                .await?
1008                .and_then(|participant| participant.role)
1009        } else {
1010            None
1011        };
1012
1013        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1014
1015        match capability {
1016            Capability::ReadWrite => {
1017                if !role.can_edit_projects() {
1018                    return Err(anyhow!("not authorized to edit projects"))?;
1019                }
1020            }
1021            Capability::ReadOnly => {
1022                if !role.can_read_projects() {
1023                    return Err(anyhow!("not authorized to read projects"))?;
1024                }
1025            }
1026        }
1027
1028        Ok((project, role))
1029    }
1030
1031    /// Returns the host connection for a read-only request to join a shared project.
1032    pub async fn host_for_read_only_project_request(
1033        &self,
1034        project_id: ProjectId,
1035        connection_id: ConnectionId,
1036    ) -> Result<ConnectionId> {
1037        self.project_transaction(project_id, |tx| async move {
1038            let (project, _) = self
1039                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1040                .await?;
1041            project.host_connection()
1042        })
1043        .await
1044        .map(|guard| guard.into_inner())
1045    }
1046
1047    /// Returns the host connection for a request to join a shared project.
1048    pub async fn host_for_mutating_project_request(
1049        &self,
1050        project_id: ProjectId,
1051        connection_id: ConnectionId,
1052    ) -> Result<ConnectionId> {
1053        self.project_transaction(project_id, |tx| async move {
1054            let (project, _) = self
1055                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1056                .await?;
1057            project.host_connection()
1058        })
1059        .await
1060        .map(|guard| guard.into_inner())
1061    }
1062
1063    pub async fn connections_for_buffer_update(
1064        &self,
1065        project_id: ProjectId,
1066        connection_id: ConnectionId,
1067        capability: Capability,
1068    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1069        self.project_transaction(project_id, |tx| async move {
1070            // Authorize
1071            let (project, _) = self
1072                .access_project(project_id, connection_id, capability, &tx)
1073                .await?;
1074
1075            let host_connection_id = project.host_connection()?;
1076
1077            let collaborators = project_collaborator::Entity::find()
1078                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1079                .all(&*tx)
1080                .await?;
1081
1082            let guest_connection_ids = collaborators
1083                .into_iter()
1084                .filter_map(|collaborator| {
1085                    if collaborator.is_host {
1086                        None
1087                    } else {
1088                        Some(collaborator.connection())
1089                    }
1090                })
1091                .collect();
1092
1093            Ok((host_connection_id, guest_connection_ids))
1094        })
1095        .await
1096    }
1097
1098    /// Returns the connection IDs in the given project.
1099    ///
1100    /// The provided `connection_id` must also be a collaborator in the project,
1101    /// otherwise an error will be returned.
1102    pub async fn project_connection_ids(
1103        &self,
1104        project_id: ProjectId,
1105        connection_id: ConnectionId,
1106        exclude_dev_server: bool,
1107    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1108        self.project_transaction(project_id, |tx| async move {
1109            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1110                .await
1111        })
1112        .await
1113    }
1114
1115    async fn internal_project_connection_ids(
1116        &self,
1117        project_id: ProjectId,
1118        connection_id: ConnectionId,
1119        exclude_dev_server: bool,
1120        tx: &DatabaseTransaction,
1121    ) -> Result<HashSet<ConnectionId>> {
1122        let project = project::Entity::find_by_id(project_id)
1123            .one(tx)
1124            .await?
1125            .ok_or_else(|| anyhow!("no such project"))?;
1126
1127        let mut collaborators = project_collaborator::Entity::find()
1128            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1129            .stream(tx)
1130            .await?;
1131
1132        let mut connection_ids = HashSet::default();
1133        if let Some(host_connection) = project.host_connection().log_err() {
1134            if !exclude_dev_server {
1135                connection_ids.insert(host_connection);
1136            }
1137        }
1138
1139        while let Some(collaborator) = collaborators.next().await {
1140            let collaborator = collaborator?;
1141            connection_ids.insert(collaborator.connection());
1142        }
1143
1144        if connection_ids.contains(&connection_id)
1145            || Some(connection_id) == project.host_connection().ok()
1146        {
1147            Ok(connection_ids)
1148        } else {
1149            Err(anyhow!(
1150                "can only send project updates to a project you're in"
1151            ))?
1152        }
1153    }
1154
1155    async fn project_guest_connection_ids(
1156        &self,
1157        project_id: ProjectId,
1158        tx: &DatabaseTransaction,
1159    ) -> Result<Vec<ConnectionId>> {
1160        let mut collaborators = project_collaborator::Entity::find()
1161            .filter(
1162                project_collaborator::Column::ProjectId
1163                    .eq(project_id)
1164                    .and(project_collaborator::Column::IsHost.eq(false)),
1165            )
1166            .stream(tx)
1167            .await?;
1168
1169        let mut guest_connection_ids = Vec::new();
1170        while let Some(collaborator) = collaborators.next().await {
1171            let collaborator = collaborator?;
1172            guest_connection_ids.push(collaborator.connection());
1173        }
1174        Ok(guest_connection_ids)
1175    }
1176
1177    /// Returns the [`RoomId`] for the given project.
1178    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1179        self.transaction(|tx| async move {
1180            Ok(project::Entity::find_by_id(project_id)
1181                .one(&*tx)
1182                .await?
1183                .and_then(|project| project.room_id))
1184        })
1185        .await
1186    }
1187
1188    pub async fn check_room_participants(
1189        &self,
1190        room_id: RoomId,
1191        leader_id: ConnectionId,
1192        follower_id: ConnectionId,
1193    ) -> Result<()> {
1194        self.transaction(|tx| async move {
1195            use room_participant::Column;
1196
1197            let count = room_participant::Entity::find()
1198                .filter(
1199                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1200                        Condition::any()
1201                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1202                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1203                            ))
1204                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1205                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1206                            )),
1207                    ),
1208                )
1209                .count(&*tx)
1210                .await?;
1211
1212            if count < 2 {
1213                Err(anyhow!("not room participants"))?;
1214            }
1215
1216            Ok(())
1217        })
1218        .await
1219    }
1220
1221    /// Adds the given follower connection as a follower of the given leader connection.
1222    pub async fn follow(
1223        &self,
1224        room_id: RoomId,
1225        project_id: ProjectId,
1226        leader_connection: ConnectionId,
1227        follower_connection: ConnectionId,
1228    ) -> Result<TransactionGuard<proto::Room>> {
1229        self.room_transaction(room_id, |tx| async move {
1230            follower::ActiveModel {
1231                room_id: ActiveValue::set(room_id),
1232                project_id: ActiveValue::set(project_id),
1233                leader_connection_server_id: ActiveValue::set(ServerId(
1234                    leader_connection.owner_id as i32,
1235                )),
1236                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1237                follower_connection_server_id: ActiveValue::set(ServerId(
1238                    follower_connection.owner_id as i32,
1239                )),
1240                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1241                ..Default::default()
1242            }
1243            .insert(&*tx)
1244            .await?;
1245
1246            let room = self.get_room(room_id, &tx).await?;
1247            Ok(room)
1248        })
1249        .await
1250    }
1251
1252    /// Removes the given follower connection as a follower of the given leader connection.
1253    pub async fn unfollow(
1254        &self,
1255        room_id: RoomId,
1256        project_id: ProjectId,
1257        leader_connection: ConnectionId,
1258        follower_connection: ConnectionId,
1259    ) -> Result<TransactionGuard<proto::Room>> {
1260        self.room_transaction(room_id, |tx| async move {
1261            follower::Entity::delete_many()
1262                .filter(
1263                    Condition::all()
1264                        .add(follower::Column::RoomId.eq(room_id))
1265                        .add(follower::Column::ProjectId.eq(project_id))
1266                        .add(
1267                            follower::Column::LeaderConnectionServerId
1268                                .eq(leader_connection.owner_id),
1269                        )
1270                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1271                        .add(
1272                            follower::Column::FollowerConnectionServerId
1273                                .eq(follower_connection.owner_id),
1274                        )
1275                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1276                )
1277                .exec(&*tx)
1278                .await?;
1279
1280            let room = self.get_room(room_id, &tx).await?;
1281            Ok(room)
1282        })
1283        .await
1284    }
1285}