projects.rs

   1use anyhow::Context as _;
   2
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                ..Default::default()
 102            }
 103            .insert(&*tx)
 104            .await?;
 105
 106            let room = self.get_room(room_id, &tx).await?;
 107            Ok((project.id, room))
 108        })
 109        .await
 110    }
 111
 112    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 113        self.weak_transaction(|tx| async move {
 114            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 115            Ok(())
 116        })
 117        .await
 118    }
 119
 120    /// Unshares the given project.
 121    pub async fn unshare_project(
 122        &self,
 123        project_id: ProjectId,
 124        connection: ConnectionId,
 125    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 126        self.project_transaction(project_id, |tx| async move {
 127            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 128            let project = project::Entity::find_by_id(project_id)
 129                .one(&*tx)
 130                .await?
 131                .ok_or_else(|| anyhow!("project not found"))?;
 132            let room = if let Some(room_id) = project.room_id {
 133                Some(self.get_room(room_id, &tx).await?)
 134            } else {
 135                None
 136            };
 137            if project.host_connection()? == connection {
 138                return Ok((true, room, guest_connection_ids));
 139            }
 140            Err(anyhow!("cannot unshare a project hosted by another user"))?
 141        })
 142        .await
 143    }
 144
 145    /// Updates the worktrees associated with the given project.
 146    pub async fn update_project(
 147        &self,
 148        project_id: ProjectId,
 149        connection: ConnectionId,
 150        worktrees: &[proto::WorktreeMetadata],
 151    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 152        self.project_transaction(project_id, |tx| async move {
 153            let project = project::Entity::find_by_id(project_id)
 154                .filter(
 155                    Condition::all()
 156                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 157                        .add(
 158                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 159                        ),
 160                )
 161                .one(&*tx)
 162                .await?
 163                .ok_or_else(|| anyhow!("no such project"))?;
 164
 165            self.update_project_worktrees(project.id, worktrees, &tx)
 166                .await?;
 167
 168            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 169
 170            let room = if let Some(room_id) = project.room_id {
 171                Some(self.get_room(room_id, &tx).await?)
 172            } else {
 173                None
 174            };
 175
 176            Ok((room, guest_connection_ids))
 177        })
 178        .await
 179    }
 180
 181    pub(in crate::db) async fn update_project_worktrees(
 182        &self,
 183        project_id: ProjectId,
 184        worktrees: &[proto::WorktreeMetadata],
 185        tx: &DatabaseTransaction,
 186    ) -> Result<()> {
 187        if !worktrees.is_empty() {
 188            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 189                id: ActiveValue::set(worktree.id as i64),
 190                project_id: ActiveValue::set(project_id),
 191                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 192                root_name: ActiveValue::set(worktree.root_name.clone()),
 193                visible: ActiveValue::set(worktree.visible),
 194                scan_id: ActiveValue::set(0),
 195                completed_scan_id: ActiveValue::set(0),
 196            }))
 197            .on_conflict(
 198                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 199                    .update_column(worktree::Column::RootName)
 200                    .to_owned(),
 201            )
 202            .exec(tx)
 203            .await?;
 204        }
 205
 206        worktree::Entity::delete_many()
 207            .filter(worktree::Column::ProjectId.eq(project_id).and(
 208                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 209            ))
 210            .exec(tx)
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    pub async fn update_worktree(
 217        &self,
 218        update: &proto::UpdateWorktree,
 219        connection: ConnectionId,
 220    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 221        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 222            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 223        {
 224            return Err(anyhow!(
 225                "invalid worktree update. removed entries: {}, updated entries: {}",
 226                update.removed_entries.len(),
 227                update.updated_entries.len()
 228            ))?;
 229        }
 230
 231        let project_id = ProjectId::from_proto(update.project_id);
 232        let worktree_id = update.worktree_id as i64;
 233        self.project_transaction(project_id, |tx| async move {
 234            // Ensure the update comes from the host.
 235            let _project = project::Entity::find_by_id(project_id)
 236                .filter(
 237                    Condition::all()
 238                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 239                        .add(
 240                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 241                        ),
 242                )
 243                .one(&*tx)
 244                .await?
 245                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 246
 247            // Update metadata.
 248            worktree::Entity::update(worktree::ActiveModel {
 249                id: ActiveValue::set(worktree_id),
 250                project_id: ActiveValue::set(project_id),
 251                root_name: ActiveValue::set(update.root_name.clone()),
 252                scan_id: ActiveValue::set(update.scan_id as i64),
 253                completed_scan_id: if update.is_last_update {
 254                    ActiveValue::set(update.scan_id as i64)
 255                } else {
 256                    ActiveValue::default()
 257                },
 258                abs_path: ActiveValue::set(update.abs_path.clone()),
 259                ..Default::default()
 260            })
 261            .exec(&*tx)
 262            .await?;
 263
 264            if !update.updated_entries.is_empty() {
 265                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 266                    let mtime = entry.mtime.clone().unwrap_or_default();
 267                    worktree_entry::ActiveModel {
 268                        project_id: ActiveValue::set(project_id),
 269                        worktree_id: ActiveValue::set(worktree_id),
 270                        id: ActiveValue::set(entry.id as i64),
 271                        is_dir: ActiveValue::set(entry.is_dir),
 272                        path: ActiveValue::set(entry.path.clone()),
 273                        inode: ActiveValue::set(entry.inode as i64),
 274                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 275                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 276                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 277                        is_ignored: ActiveValue::set(entry.is_ignored),
 278                        git_status: ActiveValue::set(None),
 279                        is_external: ActiveValue::set(entry.is_external),
 280                        is_deleted: ActiveValue::set(false),
 281                        scan_id: ActiveValue::set(update.scan_id as i64),
 282                        is_fifo: ActiveValue::set(entry.is_fifo),
 283                    }
 284                }))
 285                .on_conflict(
 286                    OnConflict::columns([
 287                        worktree_entry::Column::ProjectId,
 288                        worktree_entry::Column::WorktreeId,
 289                        worktree_entry::Column::Id,
 290                    ])
 291                    .update_columns([
 292                        worktree_entry::Column::IsDir,
 293                        worktree_entry::Column::Path,
 294                        worktree_entry::Column::Inode,
 295                        worktree_entry::Column::MtimeSeconds,
 296                        worktree_entry::Column::MtimeNanos,
 297                        worktree_entry::Column::CanonicalPath,
 298                        worktree_entry::Column::IsIgnored,
 299                        worktree_entry::Column::ScanId,
 300                    ])
 301                    .to_owned(),
 302                )
 303                .exec(&*tx)
 304                .await?;
 305            }
 306
 307            if !update.removed_entries.is_empty() {
 308                worktree_entry::Entity::update_many()
 309                    .filter(
 310                        worktree_entry::Column::ProjectId
 311                            .eq(project_id)
 312                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 313                            .and(
 314                                worktree_entry::Column::Id
 315                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 316                            ),
 317                    )
 318                    .set(worktree_entry::ActiveModel {
 319                        is_deleted: ActiveValue::Set(true),
 320                        scan_id: ActiveValue::Set(update.scan_id as i64),
 321                        ..Default::default()
 322                    })
 323                    .exec(&*tx)
 324                    .await?;
 325            }
 326
 327            if !update.updated_repositories.is_empty() {
 328                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 329                    |repository| worktree_repository::ActiveModel {
 330                        project_id: ActiveValue::set(project_id),
 331                        worktree_id: ActiveValue::set(worktree_id),
 332                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 333                        scan_id: ActiveValue::set(update.scan_id as i64),
 334                        branch: ActiveValue::set(repository.branch.clone()),
 335                        is_deleted: ActiveValue::set(false),
 336                    },
 337                ))
 338                .on_conflict(
 339                    OnConflict::columns([
 340                        worktree_repository::Column::ProjectId,
 341                        worktree_repository::Column::WorktreeId,
 342                        worktree_repository::Column::WorkDirectoryId,
 343                    ])
 344                    .update_columns([
 345                        worktree_repository::Column::ScanId,
 346                        worktree_repository::Column::Branch,
 347                    ])
 348                    .to_owned(),
 349                )
 350                .exec(&*tx)
 351                .await?;
 352
 353                let has_any_statuses = update
 354                    .updated_repositories
 355                    .iter()
 356                    .any(|repository| !repository.updated_statuses.is_empty());
 357
 358                if has_any_statuses {
 359                    worktree_repository_statuses::Entity::insert_many(
 360                        update.updated_repositories.iter().flat_map(
 361                            |repository: &proto::RepositoryEntry| {
 362                                repository.updated_statuses.iter().map(|status_entry| {
 363                                    let (repo_path, status_kind, first_status, second_status) =
 364                                        proto_status_to_db(status_entry.clone());
 365                                    worktree_repository_statuses::ActiveModel {
 366                                        project_id: ActiveValue::set(project_id),
 367                                        worktree_id: ActiveValue::set(worktree_id),
 368                                        work_directory_id: ActiveValue::set(
 369                                            repository.work_directory_id as i64,
 370                                        ),
 371                                        scan_id: ActiveValue::set(update.scan_id as i64),
 372                                        is_deleted: ActiveValue::set(false),
 373                                        repo_path: ActiveValue::set(repo_path),
 374                                        status: ActiveValue::set(0),
 375                                        status_kind: ActiveValue::set(status_kind),
 376                                        first_status: ActiveValue::set(first_status),
 377                                        second_status: ActiveValue::set(second_status),
 378                                    }
 379                                })
 380                            },
 381                        ),
 382                    )
 383                    .on_conflict(
 384                        OnConflict::columns([
 385                            worktree_repository_statuses::Column::ProjectId,
 386                            worktree_repository_statuses::Column::WorktreeId,
 387                            worktree_repository_statuses::Column::WorkDirectoryId,
 388                            worktree_repository_statuses::Column::RepoPath,
 389                        ])
 390                        .update_columns([
 391                            worktree_repository_statuses::Column::ScanId,
 392                            worktree_repository_statuses::Column::StatusKind,
 393                            worktree_repository_statuses::Column::FirstStatus,
 394                            worktree_repository_statuses::Column::SecondStatus,
 395                        ])
 396                        .to_owned(),
 397                    )
 398                    .exec(&*tx)
 399                    .await?;
 400                }
 401
 402                let has_any_removed_statuses = update
 403                    .updated_repositories
 404                    .iter()
 405                    .any(|repository| !repository.removed_statuses.is_empty());
 406
 407                if has_any_removed_statuses {
 408                    worktree_repository_statuses::Entity::update_many()
 409                        .filter(
 410                            worktree_repository_statuses::Column::ProjectId
 411                                .eq(project_id)
 412                                .and(
 413                                    worktree_repository_statuses::Column::WorktreeId
 414                                        .eq(worktree_id),
 415                                )
 416                                .and(
 417                                    worktree_repository_statuses::Column::RepoPath.is_in(
 418                                        update.updated_repositories.iter().flat_map(|repository| {
 419                                            repository.removed_statuses.iter()
 420                                        }),
 421                                    ),
 422                                ),
 423                        )
 424                        .set(worktree_repository_statuses::ActiveModel {
 425                            is_deleted: ActiveValue::Set(true),
 426                            scan_id: ActiveValue::Set(update.scan_id as i64),
 427                            ..Default::default()
 428                        })
 429                        .exec(&*tx)
 430                        .await?;
 431                }
 432            }
 433
 434            if !update.removed_repositories.is_empty() {
 435                worktree_repository::Entity::update_many()
 436                    .filter(
 437                        worktree_repository::Column::ProjectId
 438                            .eq(project_id)
 439                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 440                            .and(
 441                                worktree_repository::Column::WorkDirectoryId
 442                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 443                            ),
 444                    )
 445                    .set(worktree_repository::ActiveModel {
 446                        is_deleted: ActiveValue::Set(true),
 447                        scan_id: ActiveValue::Set(update.scan_id as i64),
 448                        ..Default::default()
 449                    })
 450                    .exec(&*tx)
 451                    .await?;
 452            }
 453
 454            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 455            Ok(connection_ids)
 456        })
 457        .await
 458    }
 459
 460    /// Updates the diagnostic summary for the given connection.
 461    pub async fn update_diagnostic_summary(
 462        &self,
 463        update: &proto::UpdateDiagnosticSummary,
 464        connection: ConnectionId,
 465    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 466        let project_id = ProjectId::from_proto(update.project_id);
 467        let worktree_id = update.worktree_id as i64;
 468        self.project_transaction(project_id, |tx| async move {
 469            let summary = update
 470                .summary
 471                .as_ref()
 472                .ok_or_else(|| anyhow!("invalid summary"))?;
 473
 474            // Ensure the update comes from the host.
 475            let project = project::Entity::find_by_id(project_id)
 476                .one(&*tx)
 477                .await?
 478                .ok_or_else(|| anyhow!("no such project"))?;
 479            if project.host_connection()? != connection {
 480                return Err(anyhow!("can't update a project hosted by someone else"))?;
 481            }
 482
 483            // Update summary.
 484            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 485                project_id: ActiveValue::set(project_id),
 486                worktree_id: ActiveValue::set(worktree_id),
 487                path: ActiveValue::set(summary.path.clone()),
 488                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 489                error_count: ActiveValue::set(summary.error_count as i32),
 490                warning_count: ActiveValue::set(summary.warning_count as i32),
 491            })
 492            .on_conflict(
 493                OnConflict::columns([
 494                    worktree_diagnostic_summary::Column::ProjectId,
 495                    worktree_diagnostic_summary::Column::WorktreeId,
 496                    worktree_diagnostic_summary::Column::Path,
 497                ])
 498                .update_columns([
 499                    worktree_diagnostic_summary::Column::LanguageServerId,
 500                    worktree_diagnostic_summary::Column::ErrorCount,
 501                    worktree_diagnostic_summary::Column::WarningCount,
 502                ])
 503                .to_owned(),
 504            )
 505            .exec(&*tx)
 506            .await?;
 507
 508            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 509            Ok(connection_ids)
 510        })
 511        .await
 512    }
 513
 514    /// Starts the language server for the given connection.
 515    pub async fn start_language_server(
 516        &self,
 517        update: &proto::StartLanguageServer,
 518        connection: ConnectionId,
 519    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 520        let project_id = ProjectId::from_proto(update.project_id);
 521        self.project_transaction(project_id, |tx| async move {
 522            let server = update
 523                .server
 524                .as_ref()
 525                .ok_or_else(|| anyhow!("invalid language server"))?;
 526
 527            // Ensure the update comes from the host.
 528            let project = project::Entity::find_by_id(project_id)
 529                .one(&*tx)
 530                .await?
 531                .ok_or_else(|| anyhow!("no such project"))?;
 532            if project.host_connection()? != connection {
 533                return Err(anyhow!("can't update a project hosted by someone else"))?;
 534            }
 535
 536            // Add the newly-started language server.
 537            language_server::Entity::insert(language_server::ActiveModel {
 538                project_id: ActiveValue::set(project_id),
 539                id: ActiveValue::set(server.id as i64),
 540                name: ActiveValue::set(server.name.clone()),
 541            })
 542            .on_conflict(
 543                OnConflict::columns([
 544                    language_server::Column::ProjectId,
 545                    language_server::Column::Id,
 546                ])
 547                .update_column(language_server::Column::Name)
 548                .to_owned(),
 549            )
 550            .exec(&*tx)
 551            .await?;
 552
 553            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 554            Ok(connection_ids)
 555        })
 556        .await
 557    }
 558
 559    /// Updates the worktree settings for the given connection.
 560    pub async fn update_worktree_settings(
 561        &self,
 562        update: &proto::UpdateWorktreeSettings,
 563        connection: ConnectionId,
 564    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 565        let project_id = ProjectId::from_proto(update.project_id);
 566        let kind = match update.kind {
 567            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 568                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 569            None => proto::LocalSettingsKind::Settings,
 570        };
 571        let kind = LocalSettingsKind::from_proto(kind);
 572        self.project_transaction(project_id, |tx| async move {
 573            // Ensure the update comes from the host.
 574            let project = project::Entity::find_by_id(project_id)
 575                .one(&*tx)
 576                .await?
 577                .ok_or_else(|| anyhow!("no such project"))?;
 578            if project.host_connection()? != connection {
 579                return Err(anyhow!("can't update a project hosted by someone else"))?;
 580            }
 581
 582            if let Some(content) = &update.content {
 583                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 584                    project_id: ActiveValue::Set(project_id),
 585                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 586                    path: ActiveValue::Set(update.path.clone()),
 587                    content: ActiveValue::Set(content.clone()),
 588                    kind: ActiveValue::Set(kind),
 589                })
 590                .on_conflict(
 591                    OnConflict::columns([
 592                        worktree_settings_file::Column::ProjectId,
 593                        worktree_settings_file::Column::WorktreeId,
 594                        worktree_settings_file::Column::Path,
 595                    ])
 596                    .update_column(worktree_settings_file::Column::Content)
 597                    .to_owned(),
 598                )
 599                .exec(&*tx)
 600                .await?;
 601            } else {
 602                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 603                    project_id: ActiveValue::Set(project_id),
 604                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 605                    path: ActiveValue::Set(update.path.clone()),
 606                    ..Default::default()
 607                })
 608                .exec(&*tx)
 609                .await?;
 610            }
 611
 612            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 613            Ok(connection_ids)
 614        })
 615        .await
 616    }
 617
 618    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 619        self.transaction(|tx| async move {
 620            Ok(project::Entity::find_by_id(id)
 621                .one(&*tx)
 622                .await?
 623                .ok_or_else(|| anyhow!("no such project"))?)
 624        })
 625        .await
 626    }
 627
 628    /// Adds the given connection to the specified project
 629    /// in the current room.
 630    pub async fn join_project(
 631        &self,
 632        project_id: ProjectId,
 633        connection: ConnectionId,
 634        user_id: UserId,
 635    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 636        self.project_transaction(project_id, |tx| async move {
 637            let (project, role) = self
 638                .access_project(project_id, connection, Capability::ReadOnly, &tx)
 639                .await?;
 640            self.join_project_internal(project, user_id, connection, role, &tx)
 641                .await
 642        })
 643        .await
 644    }
 645
 646    async fn join_project_internal(
 647        &self,
 648        project: project::Model,
 649        user_id: UserId,
 650        connection: ConnectionId,
 651        role: ChannelRole,
 652        tx: &DatabaseTransaction,
 653    ) -> Result<(Project, ReplicaId)> {
 654        let mut collaborators = project
 655            .find_related(project_collaborator::Entity)
 656            .all(tx)
 657            .await?;
 658        let replica_ids = collaborators
 659            .iter()
 660            .map(|c| c.replica_id)
 661            .collect::<HashSet<_>>();
 662        let mut replica_id = ReplicaId(1);
 663        while replica_ids.contains(&replica_id) {
 664            replica_id.0 += 1;
 665        }
 666        let new_collaborator = project_collaborator::ActiveModel {
 667            project_id: ActiveValue::set(project.id),
 668            connection_id: ActiveValue::set(connection.id as i32),
 669            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 670            user_id: ActiveValue::set(user_id),
 671            replica_id: ActiveValue::set(replica_id),
 672            is_host: ActiveValue::set(false),
 673            ..Default::default()
 674        }
 675        .insert(tx)
 676        .await?;
 677        collaborators.push(new_collaborator);
 678
 679        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 680        let mut worktrees = db_worktrees
 681            .into_iter()
 682            .map(|db_worktree| {
 683                (
 684                    db_worktree.id as u64,
 685                    Worktree {
 686                        id: db_worktree.id as u64,
 687                        abs_path: db_worktree.abs_path,
 688                        root_name: db_worktree.root_name,
 689                        visible: db_worktree.visible,
 690                        entries: Default::default(),
 691                        repository_entries: Default::default(),
 692                        diagnostic_summaries: Default::default(),
 693                        settings_files: Default::default(),
 694                        scan_id: db_worktree.scan_id as u64,
 695                        completed_scan_id: db_worktree.completed_scan_id as u64,
 696                    },
 697                )
 698            })
 699            .collect::<BTreeMap<_, _>>();
 700
 701        // Populate worktree entries.
 702        {
 703            let mut db_entries = worktree_entry::Entity::find()
 704                .filter(
 705                    Condition::all()
 706                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 707                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 708                )
 709                .stream(tx)
 710                .await?;
 711            while let Some(db_entry) = db_entries.next().await {
 712                let db_entry = db_entry?;
 713                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 714                    worktree.entries.push(proto::Entry {
 715                        id: db_entry.id as u64,
 716                        is_dir: db_entry.is_dir,
 717                        path: db_entry.path,
 718                        inode: db_entry.inode as u64,
 719                        mtime: Some(proto::Timestamp {
 720                            seconds: db_entry.mtime_seconds as u64,
 721                            nanos: db_entry.mtime_nanos as u32,
 722                        }),
 723                        canonical_path: db_entry.canonical_path,
 724                        is_ignored: db_entry.is_ignored,
 725                        is_external: db_entry.is_external,
 726                        // This is only used in the summarization backlog, so if it's None,
 727                        // that just means we won't be able to detect when to resummarize
 728                        // based on total number of backlogged bytes - instead, we'd go
 729                        // on number of files only. That shouldn't be a huge deal in practice.
 730                        size: None,
 731                        is_fifo: db_entry.is_fifo,
 732                    });
 733                }
 734            }
 735        }
 736
 737        // Populate repository entries.
 738        {
 739            let db_repository_entries = worktree_repository::Entity::find()
 740                .filter(
 741                    Condition::all()
 742                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 743                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 744                )
 745                .all(tx)
 746                .await?;
 747            for db_repository_entry in db_repository_entries {
 748                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 749                {
 750                    let mut repository_statuses = worktree_repository_statuses::Entity::find()
 751                        .filter(
 752                            Condition::all()
 753                                .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
 754                                .add(
 755                                    worktree_repository_statuses::Column::WorktreeId
 756                                        .eq(worktree.id),
 757                                )
 758                                .add(
 759                                    worktree_repository_statuses::Column::WorkDirectoryId
 760                                        .eq(db_repository_entry.work_directory_id),
 761                                )
 762                                .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
 763                        )
 764                        .stream(tx)
 765                        .await?;
 766                    let mut updated_statuses = Vec::new();
 767                    while let Some(status_entry) = repository_statuses.next().await {
 768                        let status_entry: worktree_repository_statuses::Model = status_entry?;
 769                        updated_statuses.push(db_status_to_proto(status_entry)?);
 770                    }
 771
 772                    worktree.repository_entries.insert(
 773                        db_repository_entry.work_directory_id as u64,
 774                        proto::RepositoryEntry {
 775                            work_directory_id: db_repository_entry.work_directory_id as u64,
 776                            branch: db_repository_entry.branch,
 777                            updated_statuses,
 778                            removed_statuses: Vec::new(),
 779                        },
 780                    );
 781                }
 782            }
 783        }
 784
 785        // Populate worktree diagnostic summaries.
 786        {
 787            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 788                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 789                .stream(tx)
 790                .await?;
 791            while let Some(db_summary) = db_summaries.next().await {
 792                let db_summary = db_summary?;
 793                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 794                    worktree
 795                        .diagnostic_summaries
 796                        .push(proto::DiagnosticSummary {
 797                            path: db_summary.path,
 798                            language_server_id: db_summary.language_server_id as u64,
 799                            error_count: db_summary.error_count as u32,
 800                            warning_count: db_summary.warning_count as u32,
 801                        });
 802                }
 803            }
 804        }
 805
 806        // Populate worktree settings files
 807        {
 808            let mut db_settings_files = worktree_settings_file::Entity::find()
 809                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 810                .stream(tx)
 811                .await?;
 812            while let Some(db_settings_file) = db_settings_files.next().await {
 813                let db_settings_file = db_settings_file?;
 814                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 815                    worktree.settings_files.push(WorktreeSettingsFile {
 816                        path: db_settings_file.path,
 817                        content: db_settings_file.content,
 818                        kind: db_settings_file.kind,
 819                    });
 820                }
 821            }
 822        }
 823
 824        // Populate language servers.
 825        let language_servers = project
 826            .find_related(language_server::Entity)
 827            .all(tx)
 828            .await?;
 829
 830        let project = Project {
 831            id: project.id,
 832            role,
 833            collaborators: collaborators
 834                .into_iter()
 835                .map(|collaborator| ProjectCollaborator {
 836                    connection_id: collaborator.connection(),
 837                    user_id: collaborator.user_id,
 838                    replica_id: collaborator.replica_id,
 839                    is_host: collaborator.is_host,
 840                })
 841                .collect(),
 842            worktrees,
 843            language_servers: language_servers
 844                .into_iter()
 845                .map(|language_server| proto::LanguageServer {
 846                    id: language_server.id as u64,
 847                    name: language_server.name,
 848                    worktree_id: None,
 849                })
 850                .collect(),
 851        };
 852        Ok((project, replica_id as ReplicaId))
 853    }
 854
 855    /// Removes the given connection from the specified project.
 856    pub async fn leave_project(
 857        &self,
 858        project_id: ProjectId,
 859        connection: ConnectionId,
 860    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 861        self.project_transaction(project_id, |tx| async move {
 862            let result = project_collaborator::Entity::delete_many()
 863                .filter(
 864                    Condition::all()
 865                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 866                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 867                        .add(
 868                            project_collaborator::Column::ConnectionServerId
 869                                .eq(connection.owner_id as i32),
 870                        ),
 871                )
 872                .exec(&*tx)
 873                .await?;
 874            if result.rows_affected == 0 {
 875                Err(anyhow!("not a collaborator on this project"))?;
 876            }
 877
 878            let project = project::Entity::find_by_id(project_id)
 879                .one(&*tx)
 880                .await?
 881                .ok_or_else(|| anyhow!("no such project"))?;
 882            let collaborators = project
 883                .find_related(project_collaborator::Entity)
 884                .all(&*tx)
 885                .await?;
 886            let connection_ids: Vec<ConnectionId> = collaborators
 887                .into_iter()
 888                .map(|collaborator| collaborator.connection())
 889                .collect();
 890
 891            follower::Entity::delete_many()
 892                .filter(
 893                    Condition::any()
 894                        .add(
 895                            Condition::all()
 896                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 897                                .add(
 898                                    follower::Column::LeaderConnectionServerId
 899                                        .eq(connection.owner_id),
 900                                )
 901                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 902                        )
 903                        .add(
 904                            Condition::all()
 905                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 906                                .add(
 907                                    follower::Column::FollowerConnectionServerId
 908                                        .eq(connection.owner_id),
 909                                )
 910                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 911                        ),
 912                )
 913                .exec(&*tx)
 914                .await?;
 915
 916            let room = if let Some(room_id) = project.room_id {
 917                Some(self.get_room(room_id, &tx).await?)
 918            } else {
 919                None
 920            };
 921
 922            let left_project = LeftProject {
 923                id: project_id,
 924                should_unshare: connection == project.host_connection()?,
 925                connection_ids,
 926            };
 927            Ok((room, left_project))
 928        })
 929        .await
 930    }
 931
 932    pub async fn check_user_is_project_host(
 933        &self,
 934        project_id: ProjectId,
 935        connection_id: ConnectionId,
 936    ) -> Result<()> {
 937        self.project_transaction(project_id, |tx| async move {
 938            project::Entity::find()
 939                .filter(
 940                    Condition::all()
 941                        .add(project::Column::Id.eq(project_id))
 942                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 943                        .add(
 944                            project::Column::HostConnectionServerId
 945                                .eq(Some(connection_id.owner_id as i32)),
 946                        ),
 947                )
 948                .one(&*tx)
 949                .await?
 950                .ok_or_else(|| anyhow!("failed to read project host"))?;
 951
 952            Ok(())
 953        })
 954        .await
 955        .map(|guard| guard.into_inner())
 956    }
 957
 958    /// Returns the current project if the given user is authorized to access it with the specified capability.
 959    pub async fn access_project(
 960        &self,
 961        project_id: ProjectId,
 962        connection_id: ConnectionId,
 963        capability: Capability,
 964        tx: &DatabaseTransaction,
 965    ) -> Result<(project::Model, ChannelRole)> {
 966        let project = project::Entity::find_by_id(project_id)
 967            .one(tx)
 968            .await?
 969            .ok_or_else(|| anyhow!("no such project"))?;
 970
 971        let role_from_room = if let Some(room_id) = project.room_id {
 972            room_participant::Entity::find()
 973                .filter(room_participant::Column::RoomId.eq(room_id))
 974                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 975                .one(tx)
 976                .await?
 977                .and_then(|participant| participant.role)
 978        } else {
 979            None
 980        };
 981
 982        let role = role_from_room.unwrap_or(ChannelRole::Banned);
 983
 984        match capability {
 985            Capability::ReadWrite => {
 986                if !role.can_edit_projects() {
 987                    return Err(anyhow!("not authorized to edit projects"))?;
 988                }
 989            }
 990            Capability::ReadOnly => {
 991                if !role.can_read_projects() {
 992                    return Err(anyhow!("not authorized to read projects"))?;
 993                }
 994            }
 995        }
 996
 997        Ok((project, role))
 998    }
 999
1000    /// Returns the host connection for a read-only request to join a shared project.
1001    pub async fn host_for_read_only_project_request(
1002        &self,
1003        project_id: ProjectId,
1004        connection_id: ConnectionId,
1005    ) -> Result<ConnectionId> {
1006        self.project_transaction(project_id, |tx| async move {
1007            let (project, _) = self
1008                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1009                .await?;
1010            project.host_connection()
1011        })
1012        .await
1013        .map(|guard| guard.into_inner())
1014    }
1015
1016    /// Returns the host connection for a request to join a shared project.
1017    pub async fn host_for_mutating_project_request(
1018        &self,
1019        project_id: ProjectId,
1020        connection_id: ConnectionId,
1021    ) -> Result<ConnectionId> {
1022        self.project_transaction(project_id, |tx| async move {
1023            let (project, _) = self
1024                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1025                .await?;
1026            project.host_connection()
1027        })
1028        .await
1029        .map(|guard| guard.into_inner())
1030    }
1031
1032    pub async fn connections_for_buffer_update(
1033        &self,
1034        project_id: ProjectId,
1035        connection_id: ConnectionId,
1036        capability: Capability,
1037    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1038        self.project_transaction(project_id, |tx| async move {
1039            // Authorize
1040            let (project, _) = self
1041                .access_project(project_id, connection_id, capability, &tx)
1042                .await?;
1043
1044            let host_connection_id = project.host_connection()?;
1045
1046            let collaborators = project_collaborator::Entity::find()
1047                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1048                .all(&*tx)
1049                .await?;
1050
1051            let guest_connection_ids = collaborators
1052                .into_iter()
1053                .filter_map(|collaborator| {
1054                    if collaborator.is_host {
1055                        None
1056                    } else {
1057                        Some(collaborator.connection())
1058                    }
1059                })
1060                .collect();
1061
1062            Ok((host_connection_id, guest_connection_ids))
1063        })
1064        .await
1065    }
1066
1067    /// Returns the connection IDs in the given project.
1068    ///
1069    /// The provided `connection_id` must also be a collaborator in the project,
1070    /// otherwise an error will be returned.
1071    pub async fn project_connection_ids(
1072        &self,
1073        project_id: ProjectId,
1074        connection_id: ConnectionId,
1075        exclude_dev_server: bool,
1076    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1077        self.project_transaction(project_id, |tx| async move {
1078            let project = project::Entity::find_by_id(project_id)
1079                .one(&*tx)
1080                .await?
1081                .ok_or_else(|| anyhow!("no such project"))?;
1082
1083            let mut collaborators = project_collaborator::Entity::find()
1084                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1085                .stream(&*tx)
1086                .await?;
1087
1088            let mut connection_ids = HashSet::default();
1089            if let Some(host_connection) = project.host_connection().log_err() {
1090                if !exclude_dev_server {
1091                    connection_ids.insert(host_connection);
1092                }
1093            }
1094
1095            while let Some(collaborator) = collaborators.next().await {
1096                let collaborator = collaborator?;
1097                connection_ids.insert(collaborator.connection());
1098            }
1099
1100            if connection_ids.contains(&connection_id)
1101                || Some(connection_id) == project.host_connection().ok()
1102            {
1103                Ok(connection_ids)
1104            } else {
1105                Err(anyhow!(
1106                    "can only send project updates to a project you're in"
1107                ))?
1108            }
1109        })
1110        .await
1111    }
1112
1113    async fn project_guest_connection_ids(
1114        &self,
1115        project_id: ProjectId,
1116        tx: &DatabaseTransaction,
1117    ) -> Result<Vec<ConnectionId>> {
1118        let mut collaborators = project_collaborator::Entity::find()
1119            .filter(
1120                project_collaborator::Column::ProjectId
1121                    .eq(project_id)
1122                    .and(project_collaborator::Column::IsHost.eq(false)),
1123            )
1124            .stream(tx)
1125            .await?;
1126
1127        let mut guest_connection_ids = Vec::new();
1128        while let Some(collaborator) = collaborators.next().await {
1129            let collaborator = collaborator?;
1130            guest_connection_ids.push(collaborator.connection());
1131        }
1132        Ok(guest_connection_ids)
1133    }
1134
1135    /// Returns the [`RoomId`] for the given project.
1136    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1137        self.transaction(|tx| async move {
1138            Ok(project::Entity::find_by_id(project_id)
1139                .one(&*tx)
1140                .await?
1141                .and_then(|project| project.room_id))
1142        })
1143        .await
1144    }
1145
1146    pub async fn check_room_participants(
1147        &self,
1148        room_id: RoomId,
1149        leader_id: ConnectionId,
1150        follower_id: ConnectionId,
1151    ) -> Result<()> {
1152        self.transaction(|tx| async move {
1153            use room_participant::Column;
1154
1155            let count = room_participant::Entity::find()
1156                .filter(
1157                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1158                        Condition::any()
1159                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1160                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1161                            ))
1162                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1163                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1164                            )),
1165                    ),
1166                )
1167                .count(&*tx)
1168                .await?;
1169
1170            if count < 2 {
1171                Err(anyhow!("not room participants"))?;
1172            }
1173
1174            Ok(())
1175        })
1176        .await
1177    }
1178
1179    /// Adds the given follower connection as a follower of the given leader connection.
1180    pub async fn follow(
1181        &self,
1182        room_id: RoomId,
1183        project_id: ProjectId,
1184        leader_connection: ConnectionId,
1185        follower_connection: ConnectionId,
1186    ) -> Result<TransactionGuard<proto::Room>> {
1187        self.room_transaction(room_id, |tx| async move {
1188            follower::ActiveModel {
1189                room_id: ActiveValue::set(room_id),
1190                project_id: ActiveValue::set(project_id),
1191                leader_connection_server_id: ActiveValue::set(ServerId(
1192                    leader_connection.owner_id as i32,
1193                )),
1194                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1195                follower_connection_server_id: ActiveValue::set(ServerId(
1196                    follower_connection.owner_id as i32,
1197                )),
1198                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1199                ..Default::default()
1200            }
1201            .insert(&*tx)
1202            .await?;
1203
1204            let room = self.get_room(room_id, &tx).await?;
1205            Ok(room)
1206        })
1207        .await
1208    }
1209
1210    /// Removes the given follower connection as a follower of the given leader connection.
1211    pub async fn unfollow(
1212        &self,
1213        room_id: RoomId,
1214        project_id: ProjectId,
1215        leader_connection: ConnectionId,
1216        follower_connection: ConnectionId,
1217    ) -> Result<TransactionGuard<proto::Room>> {
1218        self.room_transaction(room_id, |tx| async move {
1219            follower::Entity::delete_many()
1220                .filter(
1221                    Condition::all()
1222                        .add(follower::Column::RoomId.eq(room_id))
1223                        .add(follower::Column::ProjectId.eq(project_id))
1224                        .add(
1225                            follower::Column::LeaderConnectionServerId
1226                                .eq(leader_connection.owner_id),
1227                        )
1228                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1229                        .add(
1230                            follower::Column::FollowerConnectionServerId
1231                                .eq(follower_connection.owner_id),
1232                        )
1233                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1234                )
1235                .exec(&*tx)
1236                .await?;
1237
1238            let room = self.get_room(room_id, &tx).await?;
1239            Ok(room)
1240        })
1241        .await
1242    }
1243}