projects.rs

   1use anyhow::Context as _;
   2
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                ..Default::default()
 102            }
 103            .insert(&*tx)
 104            .await?;
 105
 106            let room = self.get_room(room_id, &tx).await?;
 107            Ok((project.id, room))
 108        })
 109        .await
 110    }
 111
 112    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 113        self.weak_transaction(|tx| async move {
 114            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 115            Ok(())
 116        })
 117        .await
 118    }
 119
 120    /// Unshares the given project.
 121    pub async fn unshare_project(
 122        &self,
 123        project_id: ProjectId,
 124        connection: ConnectionId,
 125    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 126        self.project_transaction(project_id, |tx| async move {
 127            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 128            let project = project::Entity::find_by_id(project_id)
 129                .one(&*tx)
 130                .await?
 131                .ok_or_else(|| anyhow!("project not found"))?;
 132            let room = if let Some(room_id) = project.room_id {
 133                Some(self.get_room(room_id, &tx).await?)
 134            } else {
 135                None
 136            };
 137            if project.host_connection()? == connection {
 138                return Ok((true, room, guest_connection_ids));
 139            }
 140            Err(anyhow!("cannot unshare a project hosted by another user"))?
 141        })
 142        .await
 143    }
 144
 145    /// Updates the worktrees associated with the given project.
 146    pub async fn update_project(
 147        &self,
 148        project_id: ProjectId,
 149        connection: ConnectionId,
 150        worktrees: &[proto::WorktreeMetadata],
 151    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 152        self.project_transaction(project_id, |tx| async move {
 153            let project = project::Entity::find_by_id(project_id)
 154                .filter(
 155                    Condition::all()
 156                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 157                        .add(
 158                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 159                        ),
 160                )
 161                .one(&*tx)
 162                .await?
 163                .ok_or_else(|| anyhow!("no such project"))?;
 164
 165            self.update_project_worktrees(project.id, worktrees, &tx)
 166                .await?;
 167
 168            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 169
 170            let room = if let Some(room_id) = project.room_id {
 171                Some(self.get_room(room_id, &tx).await?)
 172            } else {
 173                None
 174            };
 175
 176            Ok((room, guest_connection_ids))
 177        })
 178        .await
 179    }
 180
 181    pub(in crate::db) async fn update_project_worktrees(
 182        &self,
 183        project_id: ProjectId,
 184        worktrees: &[proto::WorktreeMetadata],
 185        tx: &DatabaseTransaction,
 186    ) -> Result<()> {
 187        if !worktrees.is_empty() {
 188            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 189                id: ActiveValue::set(worktree.id as i64),
 190                project_id: ActiveValue::set(project_id),
 191                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 192                root_name: ActiveValue::set(worktree.root_name.clone()),
 193                visible: ActiveValue::set(worktree.visible),
 194                scan_id: ActiveValue::set(0),
 195                completed_scan_id: ActiveValue::set(0),
 196            }))
 197            .on_conflict(
 198                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 199                    .update_column(worktree::Column::RootName)
 200                    .to_owned(),
 201            )
 202            .exec(tx)
 203            .await?;
 204        }
 205
 206        worktree::Entity::delete_many()
 207            .filter(worktree::Column::ProjectId.eq(project_id).and(
 208                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 209            ))
 210            .exec(tx)
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    pub async fn update_worktree(
 217        &self,
 218        update: &proto::UpdateWorktree,
 219        connection: ConnectionId,
 220    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 221        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 222            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 223        {
 224            return Err(anyhow!(
 225                "invalid worktree update. removed entries: {}, updated entries: {}",
 226                update.removed_entries.len(),
 227                update.updated_entries.len()
 228            ))?;
 229        }
 230
 231        let project_id = ProjectId::from_proto(update.project_id);
 232        let worktree_id = update.worktree_id as i64;
 233        self.project_transaction(project_id, |tx| async move {
 234            // Ensure the update comes from the host.
 235            let _project = project::Entity::find_by_id(project_id)
 236                .filter(
 237                    Condition::all()
 238                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 239                        .add(
 240                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 241                        ),
 242                )
 243                .one(&*tx)
 244                .await?
 245                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 246
 247            // Update metadata.
 248            worktree::Entity::update(worktree::ActiveModel {
 249                id: ActiveValue::set(worktree_id),
 250                project_id: ActiveValue::set(project_id),
 251                root_name: ActiveValue::set(update.root_name.clone()),
 252                scan_id: ActiveValue::set(update.scan_id as i64),
 253                completed_scan_id: if update.is_last_update {
 254                    ActiveValue::set(update.scan_id as i64)
 255                } else {
 256                    ActiveValue::default()
 257                },
 258                abs_path: ActiveValue::set(update.abs_path.clone()),
 259                ..Default::default()
 260            })
 261            .exec(&*tx)
 262            .await?;
 263
 264            if !update.updated_entries.is_empty() {
 265                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 266                    let mtime = entry.mtime.clone().unwrap_or_default();
 267                    worktree_entry::ActiveModel {
 268                        project_id: ActiveValue::set(project_id),
 269                        worktree_id: ActiveValue::set(worktree_id),
 270                        id: ActiveValue::set(entry.id as i64),
 271                        is_dir: ActiveValue::set(entry.is_dir),
 272                        path: ActiveValue::set(entry.path.clone()),
 273                        inode: ActiveValue::set(entry.inode as i64),
 274                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 275                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 276                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 277                        is_ignored: ActiveValue::set(entry.is_ignored),
 278                        git_status: ActiveValue::set(None),
 279                        is_external: ActiveValue::set(entry.is_external),
 280                        is_deleted: ActiveValue::set(false),
 281                        scan_id: ActiveValue::set(update.scan_id as i64),
 282                        is_fifo: ActiveValue::set(entry.is_fifo),
 283                    }
 284                }))
 285                .on_conflict(
 286                    OnConflict::columns([
 287                        worktree_entry::Column::ProjectId,
 288                        worktree_entry::Column::WorktreeId,
 289                        worktree_entry::Column::Id,
 290                    ])
 291                    .update_columns([
 292                        worktree_entry::Column::IsDir,
 293                        worktree_entry::Column::Path,
 294                        worktree_entry::Column::Inode,
 295                        worktree_entry::Column::MtimeSeconds,
 296                        worktree_entry::Column::MtimeNanos,
 297                        worktree_entry::Column::CanonicalPath,
 298                        worktree_entry::Column::IsIgnored,
 299                        worktree_entry::Column::ScanId,
 300                    ])
 301                    .to_owned(),
 302                )
 303                .exec(&*tx)
 304                .await?;
 305            }
 306
 307            if !update.removed_entries.is_empty() {
 308                worktree_entry::Entity::update_many()
 309                    .filter(
 310                        worktree_entry::Column::ProjectId
 311                            .eq(project_id)
 312                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 313                            .and(
 314                                worktree_entry::Column::Id
 315                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 316                            ),
 317                    )
 318                    .set(worktree_entry::ActiveModel {
 319                        is_deleted: ActiveValue::Set(true),
 320                        scan_id: ActiveValue::Set(update.scan_id as i64),
 321                        ..Default::default()
 322                    })
 323                    .exec(&*tx)
 324                    .await?;
 325            }
 326
 327            if !update.updated_repositories.is_empty() {
 328                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 329                    |repository| worktree_repository::ActiveModel {
 330                        project_id: ActiveValue::set(project_id),
 331                        worktree_id: ActiveValue::set(worktree_id),
 332                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 333                        scan_id: ActiveValue::set(update.scan_id as i64),
 334                        branch: ActiveValue::set(repository.branch.clone()),
 335                        is_deleted: ActiveValue::set(false),
 336                    },
 337                ))
 338                .on_conflict(
 339                    OnConflict::columns([
 340                        worktree_repository::Column::ProjectId,
 341                        worktree_repository::Column::WorktreeId,
 342                        worktree_repository::Column::WorkDirectoryId,
 343                    ])
 344                    .update_columns([
 345                        worktree_repository::Column::ScanId,
 346                        worktree_repository::Column::Branch,
 347                    ])
 348                    .to_owned(),
 349                )
 350                .exec(&*tx)
 351                .await?;
 352
 353                let has_any_statuses = update
 354                    .updated_repositories
 355                    .iter()
 356                    .any(|repository| !repository.updated_statuses.is_empty());
 357
 358                if has_any_statuses {
 359                    worktree_repository_statuses::Entity::insert_many(
 360                        update.updated_repositories.iter().flat_map(
 361                            |repository: &proto::RepositoryEntry| {
 362                                repository.updated_statuses.iter().map(|status_entry| {
 363                                    worktree_repository_statuses::ActiveModel {
 364                                        project_id: ActiveValue::set(project_id),
 365                                        worktree_id: ActiveValue::set(worktree_id),
 366                                        work_directory_id: ActiveValue::set(
 367                                            repository.work_directory_id as i64,
 368                                        ),
 369                                        scan_id: ActiveValue::set(update.scan_id as i64),
 370                                        is_deleted: ActiveValue::set(false),
 371                                        repo_path: ActiveValue::set(status_entry.repo_path.clone()),
 372                                        status: ActiveValue::set(status_entry.status as i64),
 373                                    }
 374                                })
 375                            },
 376                        ),
 377                    )
 378                    .on_conflict(
 379                        OnConflict::columns([
 380                            worktree_repository_statuses::Column::ProjectId,
 381                            worktree_repository_statuses::Column::WorktreeId,
 382                            worktree_repository_statuses::Column::WorkDirectoryId,
 383                            worktree_repository_statuses::Column::RepoPath,
 384                        ])
 385                        .update_columns([
 386                            worktree_repository_statuses::Column::ScanId,
 387                            worktree_repository_statuses::Column::Status,
 388                        ])
 389                        .to_owned(),
 390                    )
 391                    .exec(&*tx)
 392                    .await?;
 393                }
 394
 395                let has_any_removed_statuses = update
 396                    .updated_repositories
 397                    .iter()
 398                    .any(|repository| !repository.removed_statuses.is_empty());
 399
 400                if has_any_removed_statuses {
 401                    worktree_repository_statuses::Entity::update_many()
 402                        .filter(
 403                            worktree_repository_statuses::Column::ProjectId
 404                                .eq(project_id)
 405                                .and(
 406                                    worktree_repository_statuses::Column::WorktreeId
 407                                        .eq(worktree_id),
 408                                )
 409                                .and(
 410                                    worktree_repository_statuses::Column::RepoPath.is_in(
 411                                        update.updated_repositories.iter().flat_map(|repository| {
 412                                            repository.removed_statuses.iter()
 413                                        }),
 414                                    ),
 415                                ),
 416                        )
 417                        .set(worktree_repository_statuses::ActiveModel {
 418                            is_deleted: ActiveValue::Set(true),
 419                            scan_id: ActiveValue::Set(update.scan_id as i64),
 420                            ..Default::default()
 421                        })
 422                        .exec(&*tx)
 423                        .await?;
 424                }
 425            }
 426
 427            if !update.removed_repositories.is_empty() {
 428                worktree_repository::Entity::update_many()
 429                    .filter(
 430                        worktree_repository::Column::ProjectId
 431                            .eq(project_id)
 432                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 433                            .and(
 434                                worktree_repository::Column::WorkDirectoryId
 435                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 436                            ),
 437                    )
 438                    .set(worktree_repository::ActiveModel {
 439                        is_deleted: ActiveValue::Set(true),
 440                        scan_id: ActiveValue::Set(update.scan_id as i64),
 441                        ..Default::default()
 442                    })
 443                    .exec(&*tx)
 444                    .await?;
 445            }
 446
 447            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 448            Ok(connection_ids)
 449        })
 450        .await
 451    }
 452
 453    /// Updates the diagnostic summary for the given connection.
 454    pub async fn update_diagnostic_summary(
 455        &self,
 456        update: &proto::UpdateDiagnosticSummary,
 457        connection: ConnectionId,
 458    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 459        let project_id = ProjectId::from_proto(update.project_id);
 460        let worktree_id = update.worktree_id as i64;
 461        self.project_transaction(project_id, |tx| async move {
 462            let summary = update
 463                .summary
 464                .as_ref()
 465                .ok_or_else(|| anyhow!("invalid summary"))?;
 466
 467            // Ensure the update comes from the host.
 468            let project = project::Entity::find_by_id(project_id)
 469                .one(&*tx)
 470                .await?
 471                .ok_or_else(|| anyhow!("no such project"))?;
 472            if project.host_connection()? != connection {
 473                return Err(anyhow!("can't update a project hosted by someone else"))?;
 474            }
 475
 476            // Update summary.
 477            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 478                project_id: ActiveValue::set(project_id),
 479                worktree_id: ActiveValue::set(worktree_id),
 480                path: ActiveValue::set(summary.path.clone()),
 481                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 482                error_count: ActiveValue::set(summary.error_count as i32),
 483                warning_count: ActiveValue::set(summary.warning_count as i32),
 484            })
 485            .on_conflict(
 486                OnConflict::columns([
 487                    worktree_diagnostic_summary::Column::ProjectId,
 488                    worktree_diagnostic_summary::Column::WorktreeId,
 489                    worktree_diagnostic_summary::Column::Path,
 490                ])
 491                .update_columns([
 492                    worktree_diagnostic_summary::Column::LanguageServerId,
 493                    worktree_diagnostic_summary::Column::ErrorCount,
 494                    worktree_diagnostic_summary::Column::WarningCount,
 495                ])
 496                .to_owned(),
 497            )
 498            .exec(&*tx)
 499            .await?;
 500
 501            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 502            Ok(connection_ids)
 503        })
 504        .await
 505    }
 506
 507    /// Starts the language server for the given connection.
 508    pub async fn start_language_server(
 509        &self,
 510        update: &proto::StartLanguageServer,
 511        connection: ConnectionId,
 512    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 513        let project_id = ProjectId::from_proto(update.project_id);
 514        self.project_transaction(project_id, |tx| async move {
 515            let server = update
 516                .server
 517                .as_ref()
 518                .ok_or_else(|| anyhow!("invalid language server"))?;
 519
 520            // Ensure the update comes from the host.
 521            let project = project::Entity::find_by_id(project_id)
 522                .one(&*tx)
 523                .await?
 524                .ok_or_else(|| anyhow!("no such project"))?;
 525            if project.host_connection()? != connection {
 526                return Err(anyhow!("can't update a project hosted by someone else"))?;
 527            }
 528
 529            // Add the newly-started language server.
 530            language_server::Entity::insert(language_server::ActiveModel {
 531                project_id: ActiveValue::set(project_id),
 532                id: ActiveValue::set(server.id as i64),
 533                name: ActiveValue::set(server.name.clone()),
 534            })
 535            .on_conflict(
 536                OnConflict::columns([
 537                    language_server::Column::ProjectId,
 538                    language_server::Column::Id,
 539                ])
 540                .update_column(language_server::Column::Name)
 541                .to_owned(),
 542            )
 543            .exec(&*tx)
 544            .await?;
 545
 546            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 547            Ok(connection_ids)
 548        })
 549        .await
 550    }
 551
 552    /// Updates the worktree settings for the given connection.
 553    pub async fn update_worktree_settings(
 554        &self,
 555        update: &proto::UpdateWorktreeSettings,
 556        connection: ConnectionId,
 557    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 558        let project_id = ProjectId::from_proto(update.project_id);
 559        let kind = match update.kind {
 560            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 561                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 562            None => proto::LocalSettingsKind::Settings,
 563        };
 564        let kind = LocalSettingsKind::from_proto(kind);
 565        self.project_transaction(project_id, |tx| async move {
 566            // Ensure the update comes from the host.
 567            let project = project::Entity::find_by_id(project_id)
 568                .one(&*tx)
 569                .await?
 570                .ok_or_else(|| anyhow!("no such project"))?;
 571            if project.host_connection()? != connection {
 572                return Err(anyhow!("can't update a project hosted by someone else"))?;
 573            }
 574
 575            if let Some(content) = &update.content {
 576                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 577                    project_id: ActiveValue::Set(project_id),
 578                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 579                    path: ActiveValue::Set(update.path.clone()),
 580                    content: ActiveValue::Set(content.clone()),
 581                    kind: ActiveValue::Set(kind),
 582                })
 583                .on_conflict(
 584                    OnConflict::columns([
 585                        worktree_settings_file::Column::ProjectId,
 586                        worktree_settings_file::Column::WorktreeId,
 587                        worktree_settings_file::Column::Path,
 588                    ])
 589                    .update_column(worktree_settings_file::Column::Content)
 590                    .to_owned(),
 591                )
 592                .exec(&*tx)
 593                .await?;
 594            } else {
 595                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 596                    project_id: ActiveValue::Set(project_id),
 597                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 598                    path: ActiveValue::Set(update.path.clone()),
 599                    ..Default::default()
 600                })
 601                .exec(&*tx)
 602                .await?;
 603            }
 604
 605            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 606            Ok(connection_ids)
 607        })
 608        .await
 609    }
 610
 611    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 612        self.transaction(|tx| async move {
 613            Ok(project::Entity::find_by_id(id)
 614                .one(&*tx)
 615                .await?
 616                .ok_or_else(|| anyhow!("no such project"))?)
 617        })
 618        .await
 619    }
 620
 621    /// Adds the given connection to the specified project
 622    /// in the current room.
 623    pub async fn join_project(
 624        &self,
 625        project_id: ProjectId,
 626        connection: ConnectionId,
 627        user_id: UserId,
 628    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 629        self.project_transaction(project_id, |tx| async move {
 630            let (project, role) = self
 631                .access_project(project_id, connection, Capability::ReadOnly, &tx)
 632                .await?;
 633            self.join_project_internal(project, user_id, connection, role, &tx)
 634                .await
 635        })
 636        .await
 637    }
 638
 639    async fn join_project_internal(
 640        &self,
 641        project: project::Model,
 642        user_id: UserId,
 643        connection: ConnectionId,
 644        role: ChannelRole,
 645        tx: &DatabaseTransaction,
 646    ) -> Result<(Project, ReplicaId)> {
 647        let mut collaborators = project
 648            .find_related(project_collaborator::Entity)
 649            .all(tx)
 650            .await?;
 651        let replica_ids = collaborators
 652            .iter()
 653            .map(|c| c.replica_id)
 654            .collect::<HashSet<_>>();
 655        let mut replica_id = ReplicaId(1);
 656        while replica_ids.contains(&replica_id) {
 657            replica_id.0 += 1;
 658        }
 659        let new_collaborator = project_collaborator::ActiveModel {
 660            project_id: ActiveValue::set(project.id),
 661            connection_id: ActiveValue::set(connection.id as i32),
 662            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 663            user_id: ActiveValue::set(user_id),
 664            replica_id: ActiveValue::set(replica_id),
 665            is_host: ActiveValue::set(false),
 666            ..Default::default()
 667        }
 668        .insert(tx)
 669        .await?;
 670        collaborators.push(new_collaborator);
 671
 672        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 673        let mut worktrees = db_worktrees
 674            .into_iter()
 675            .map(|db_worktree| {
 676                (
 677                    db_worktree.id as u64,
 678                    Worktree {
 679                        id: db_worktree.id as u64,
 680                        abs_path: db_worktree.abs_path,
 681                        root_name: db_worktree.root_name,
 682                        visible: db_worktree.visible,
 683                        entries: Default::default(),
 684                        repository_entries: Default::default(),
 685                        diagnostic_summaries: Default::default(),
 686                        settings_files: Default::default(),
 687                        scan_id: db_worktree.scan_id as u64,
 688                        completed_scan_id: db_worktree.completed_scan_id as u64,
 689                    },
 690                )
 691            })
 692            .collect::<BTreeMap<_, _>>();
 693
 694        // Populate worktree entries.
 695        {
 696            let mut db_entries = worktree_entry::Entity::find()
 697                .filter(
 698                    Condition::all()
 699                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 700                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 701                )
 702                .stream(tx)
 703                .await?;
 704            while let Some(db_entry) = db_entries.next().await {
 705                let db_entry = db_entry?;
 706                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 707                    worktree.entries.push(proto::Entry {
 708                        id: db_entry.id as u64,
 709                        is_dir: db_entry.is_dir,
 710                        path: db_entry.path,
 711                        inode: db_entry.inode as u64,
 712                        mtime: Some(proto::Timestamp {
 713                            seconds: db_entry.mtime_seconds as u64,
 714                            nanos: db_entry.mtime_nanos as u32,
 715                        }),
 716                        canonical_path: db_entry.canonical_path,
 717                        is_ignored: db_entry.is_ignored,
 718                        is_external: db_entry.is_external,
 719                        // This is only used in the summarization backlog, so if it's None,
 720                        // that just means we won't be able to detect when to resummarize
 721                        // based on total number of backlogged bytes - instead, we'd go
 722                        // on number of files only. That shouldn't be a huge deal in practice.
 723                        size: None,
 724                        is_fifo: db_entry.is_fifo,
 725                    });
 726                }
 727            }
 728        }
 729
 730        // Populate repository entries.
 731        {
 732            let db_repository_entries = worktree_repository::Entity::find()
 733                .filter(
 734                    Condition::all()
 735                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 736                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 737                )
 738                .all(tx)
 739                .await?;
 740            for db_repository_entry in db_repository_entries {
 741                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 742                {
 743                    let mut repository_statuses = worktree_repository_statuses::Entity::find()
 744                        .filter(
 745                            Condition::all()
 746                                .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
 747                                .add(
 748                                    worktree_repository_statuses::Column::WorktreeId
 749                                        .eq(worktree.id),
 750                                )
 751                                .add(
 752                                    worktree_repository_statuses::Column::WorkDirectoryId
 753                                        .eq(db_repository_entry.work_directory_id),
 754                                )
 755                                .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
 756                        )
 757                        .stream(tx)
 758                        .await?;
 759                    let mut updated_statuses = Vec::new();
 760                    while let Some(status_entry) = repository_statuses.next().await {
 761                        let status_entry: worktree_repository_statuses::Model = status_entry?;
 762                        updated_statuses.push(proto::StatusEntry {
 763                            repo_path: status_entry.repo_path,
 764                            status: status_entry.status as i32,
 765                        });
 766                    }
 767
 768                    worktree.repository_entries.insert(
 769                        db_repository_entry.work_directory_id as u64,
 770                        proto::RepositoryEntry {
 771                            work_directory_id: db_repository_entry.work_directory_id as u64,
 772                            branch: db_repository_entry.branch,
 773                            updated_statuses,
 774                            removed_statuses: Vec::new(),
 775                        },
 776                    );
 777                }
 778            }
 779        }
 780
 781        // Populate worktree diagnostic summaries.
 782        {
 783            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 784                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 785                .stream(tx)
 786                .await?;
 787            while let Some(db_summary) = db_summaries.next().await {
 788                let db_summary = db_summary?;
 789                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 790                    worktree
 791                        .diagnostic_summaries
 792                        .push(proto::DiagnosticSummary {
 793                            path: db_summary.path,
 794                            language_server_id: db_summary.language_server_id as u64,
 795                            error_count: db_summary.error_count as u32,
 796                            warning_count: db_summary.warning_count as u32,
 797                        });
 798                }
 799            }
 800        }
 801
 802        // Populate worktree settings files
 803        {
 804            let mut db_settings_files = worktree_settings_file::Entity::find()
 805                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 806                .stream(tx)
 807                .await?;
 808            while let Some(db_settings_file) = db_settings_files.next().await {
 809                let db_settings_file = db_settings_file?;
 810                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 811                    worktree.settings_files.push(WorktreeSettingsFile {
 812                        path: db_settings_file.path,
 813                        content: db_settings_file.content,
 814                        kind: db_settings_file.kind,
 815                    });
 816                }
 817            }
 818        }
 819
 820        // Populate language servers.
 821        let language_servers = project
 822            .find_related(language_server::Entity)
 823            .all(tx)
 824            .await?;
 825
 826        let project = Project {
 827            id: project.id,
 828            role,
 829            collaborators: collaborators
 830                .into_iter()
 831                .map(|collaborator| ProjectCollaborator {
 832                    connection_id: collaborator.connection(),
 833                    user_id: collaborator.user_id,
 834                    replica_id: collaborator.replica_id,
 835                    is_host: collaborator.is_host,
 836                })
 837                .collect(),
 838            worktrees,
 839            language_servers: language_servers
 840                .into_iter()
 841                .map(|language_server| proto::LanguageServer {
 842                    id: language_server.id as u64,
 843                    name: language_server.name,
 844                    worktree_id: None,
 845                })
 846                .collect(),
 847        };
 848        Ok((project, replica_id as ReplicaId))
 849    }
 850
 851    /// Removes the given connection from the specified project.
 852    pub async fn leave_project(
 853        &self,
 854        project_id: ProjectId,
 855        connection: ConnectionId,
 856    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 857        self.project_transaction(project_id, |tx| async move {
 858            let result = project_collaborator::Entity::delete_many()
 859                .filter(
 860                    Condition::all()
 861                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 862                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 863                        .add(
 864                            project_collaborator::Column::ConnectionServerId
 865                                .eq(connection.owner_id as i32),
 866                        ),
 867                )
 868                .exec(&*tx)
 869                .await?;
 870            if result.rows_affected == 0 {
 871                Err(anyhow!("not a collaborator on this project"))?;
 872            }
 873
 874            let project = project::Entity::find_by_id(project_id)
 875                .one(&*tx)
 876                .await?
 877                .ok_or_else(|| anyhow!("no such project"))?;
 878            let collaborators = project
 879                .find_related(project_collaborator::Entity)
 880                .all(&*tx)
 881                .await?;
 882            let connection_ids: Vec<ConnectionId> = collaborators
 883                .into_iter()
 884                .map(|collaborator| collaborator.connection())
 885                .collect();
 886
 887            follower::Entity::delete_many()
 888                .filter(
 889                    Condition::any()
 890                        .add(
 891                            Condition::all()
 892                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 893                                .add(
 894                                    follower::Column::LeaderConnectionServerId
 895                                        .eq(connection.owner_id),
 896                                )
 897                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 898                        )
 899                        .add(
 900                            Condition::all()
 901                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 902                                .add(
 903                                    follower::Column::FollowerConnectionServerId
 904                                        .eq(connection.owner_id),
 905                                )
 906                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 907                        ),
 908                )
 909                .exec(&*tx)
 910                .await?;
 911
 912            let room = if let Some(room_id) = project.room_id {
 913                Some(self.get_room(room_id, &tx).await?)
 914            } else {
 915                None
 916            };
 917
 918            let left_project = LeftProject {
 919                id: project_id,
 920                should_unshare: connection == project.host_connection()?,
 921                connection_ids,
 922            };
 923            Ok((room, left_project))
 924        })
 925        .await
 926    }
 927
 928    pub async fn check_user_is_project_host(
 929        &self,
 930        project_id: ProjectId,
 931        connection_id: ConnectionId,
 932    ) -> Result<()> {
 933        self.project_transaction(project_id, |tx| async move {
 934            project::Entity::find()
 935                .filter(
 936                    Condition::all()
 937                        .add(project::Column::Id.eq(project_id))
 938                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 939                        .add(
 940                            project::Column::HostConnectionServerId
 941                                .eq(Some(connection_id.owner_id as i32)),
 942                        ),
 943                )
 944                .one(&*tx)
 945                .await?
 946                .ok_or_else(|| anyhow!("failed to read project host"))?;
 947
 948            Ok(())
 949        })
 950        .await
 951        .map(|guard| guard.into_inner())
 952    }
 953
 954    /// Returns the current project if the given user is authorized to access it with the specified capability.
 955    pub async fn access_project(
 956        &self,
 957        project_id: ProjectId,
 958        connection_id: ConnectionId,
 959        capability: Capability,
 960        tx: &DatabaseTransaction,
 961    ) -> Result<(project::Model, ChannelRole)> {
 962        let project = project::Entity::find_by_id(project_id)
 963            .one(tx)
 964            .await?
 965            .ok_or_else(|| anyhow!("no such project"))?;
 966
 967        let role_from_room = if let Some(room_id) = project.room_id {
 968            room_participant::Entity::find()
 969                .filter(room_participant::Column::RoomId.eq(room_id))
 970                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 971                .one(tx)
 972                .await?
 973                .and_then(|participant| participant.role)
 974        } else {
 975            None
 976        };
 977
 978        let role = role_from_room.unwrap_or(ChannelRole::Banned);
 979
 980        match capability {
 981            Capability::ReadWrite => {
 982                if !role.can_edit_projects() {
 983                    return Err(anyhow!("not authorized to edit projects"))?;
 984                }
 985            }
 986            Capability::ReadOnly => {
 987                if !role.can_read_projects() {
 988                    return Err(anyhow!("not authorized to read projects"))?;
 989                }
 990            }
 991        }
 992
 993        Ok((project, role))
 994    }
 995
 996    /// Returns the host connection for a read-only request to join a shared project.
 997    pub async fn host_for_read_only_project_request(
 998        &self,
 999        project_id: ProjectId,
1000        connection_id: ConnectionId,
1001    ) -> Result<ConnectionId> {
1002        self.project_transaction(project_id, |tx| async move {
1003            let (project, _) = self
1004                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1005                .await?;
1006            project.host_connection()
1007        })
1008        .await
1009        .map(|guard| guard.into_inner())
1010    }
1011
1012    /// Returns the host connection for a request to join a shared project.
1013    pub async fn host_for_mutating_project_request(
1014        &self,
1015        project_id: ProjectId,
1016        connection_id: ConnectionId,
1017    ) -> Result<ConnectionId> {
1018        self.project_transaction(project_id, |tx| async move {
1019            let (project, _) = self
1020                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1021                .await?;
1022            project.host_connection()
1023        })
1024        .await
1025        .map(|guard| guard.into_inner())
1026    }
1027
1028    pub async fn connections_for_buffer_update(
1029        &self,
1030        project_id: ProjectId,
1031        connection_id: ConnectionId,
1032        capability: Capability,
1033    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1034        self.project_transaction(project_id, |tx| async move {
1035            // Authorize
1036            let (project, _) = self
1037                .access_project(project_id, connection_id, capability, &tx)
1038                .await?;
1039
1040            let host_connection_id = project.host_connection()?;
1041
1042            let collaborators = project_collaborator::Entity::find()
1043                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1044                .all(&*tx)
1045                .await?;
1046
1047            let guest_connection_ids = collaborators
1048                .into_iter()
1049                .filter_map(|collaborator| {
1050                    if collaborator.is_host {
1051                        None
1052                    } else {
1053                        Some(collaborator.connection())
1054                    }
1055                })
1056                .collect();
1057
1058            Ok((host_connection_id, guest_connection_ids))
1059        })
1060        .await
1061    }
1062
1063    /// Returns the connection IDs in the given project.
1064    ///
1065    /// The provided `connection_id` must also be a collaborator in the project,
1066    /// otherwise an error will be returned.
1067    pub async fn project_connection_ids(
1068        &self,
1069        project_id: ProjectId,
1070        connection_id: ConnectionId,
1071        exclude_dev_server: bool,
1072    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1073        self.project_transaction(project_id, |tx| async move {
1074            let project = project::Entity::find_by_id(project_id)
1075                .one(&*tx)
1076                .await?
1077                .ok_or_else(|| anyhow!("no such project"))?;
1078
1079            let mut collaborators = project_collaborator::Entity::find()
1080                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1081                .stream(&*tx)
1082                .await?;
1083
1084            let mut connection_ids = HashSet::default();
1085            if let Some(host_connection) = project.host_connection().log_err() {
1086                if !exclude_dev_server {
1087                    connection_ids.insert(host_connection);
1088                }
1089            }
1090
1091            while let Some(collaborator) = collaborators.next().await {
1092                let collaborator = collaborator?;
1093                connection_ids.insert(collaborator.connection());
1094            }
1095
1096            if connection_ids.contains(&connection_id)
1097                || Some(connection_id) == project.host_connection().ok()
1098            {
1099                Ok(connection_ids)
1100            } else {
1101                Err(anyhow!(
1102                    "can only send project updates to a project you're in"
1103                ))?
1104            }
1105        })
1106        .await
1107    }
1108
1109    async fn project_guest_connection_ids(
1110        &self,
1111        project_id: ProjectId,
1112        tx: &DatabaseTransaction,
1113    ) -> Result<Vec<ConnectionId>> {
1114        let mut collaborators = project_collaborator::Entity::find()
1115            .filter(
1116                project_collaborator::Column::ProjectId
1117                    .eq(project_id)
1118                    .and(project_collaborator::Column::IsHost.eq(false)),
1119            )
1120            .stream(tx)
1121            .await?;
1122
1123        let mut guest_connection_ids = Vec::new();
1124        while let Some(collaborator) = collaborators.next().await {
1125            let collaborator = collaborator?;
1126            guest_connection_ids.push(collaborator.connection());
1127        }
1128        Ok(guest_connection_ids)
1129    }
1130
1131    /// Returns the [`RoomId`] for the given project.
1132    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1133        self.transaction(|tx| async move {
1134            Ok(project::Entity::find_by_id(project_id)
1135                .one(&*tx)
1136                .await?
1137                .and_then(|project| project.room_id))
1138        })
1139        .await
1140    }
1141
1142    pub async fn check_room_participants(
1143        &self,
1144        room_id: RoomId,
1145        leader_id: ConnectionId,
1146        follower_id: ConnectionId,
1147    ) -> Result<()> {
1148        self.transaction(|tx| async move {
1149            use room_participant::Column;
1150
1151            let count = room_participant::Entity::find()
1152                .filter(
1153                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1154                        Condition::any()
1155                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1156                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1157                            ))
1158                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1159                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1160                            )),
1161                    ),
1162                )
1163                .count(&*tx)
1164                .await?;
1165
1166            if count < 2 {
1167                Err(anyhow!("not room participants"))?;
1168            }
1169
1170            Ok(())
1171        })
1172        .await
1173    }
1174
1175    /// Adds the given follower connection as a follower of the given leader connection.
1176    pub async fn follow(
1177        &self,
1178        room_id: RoomId,
1179        project_id: ProjectId,
1180        leader_connection: ConnectionId,
1181        follower_connection: ConnectionId,
1182    ) -> Result<TransactionGuard<proto::Room>> {
1183        self.room_transaction(room_id, |tx| async move {
1184            follower::ActiveModel {
1185                room_id: ActiveValue::set(room_id),
1186                project_id: ActiveValue::set(project_id),
1187                leader_connection_server_id: ActiveValue::set(ServerId(
1188                    leader_connection.owner_id as i32,
1189                )),
1190                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1191                follower_connection_server_id: ActiveValue::set(ServerId(
1192                    follower_connection.owner_id as i32,
1193                )),
1194                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1195                ..Default::default()
1196            }
1197            .insert(&*tx)
1198            .await?;
1199
1200            let room = self.get_room(room_id, &tx).await?;
1201            Ok(room)
1202        })
1203        .await
1204    }
1205
1206    /// Removes the given follower connection as a follower of the given leader connection.
1207    pub async fn unfollow(
1208        &self,
1209        room_id: RoomId,
1210        project_id: ProjectId,
1211        leader_connection: ConnectionId,
1212        follower_connection: ConnectionId,
1213    ) -> Result<TransactionGuard<proto::Room>> {
1214        self.room_transaction(room_id, |tx| async move {
1215            follower::Entity::delete_many()
1216                .filter(
1217                    Condition::all()
1218                        .add(follower::Column::RoomId.eq(room_id))
1219                        .add(follower::Column::ProjectId.eq(project_id))
1220                        .add(
1221                            follower::Column::LeaderConnectionServerId
1222                                .eq(leader_connection.owner_id),
1223                        )
1224                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1225                        .add(
1226                            follower::Column::FollowerConnectionServerId
1227                                .eq(follower_connection.owner_id),
1228                        )
1229                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1230                )
1231                .exec(&*tx)
1232                .await?;
1233
1234            let room = self.get_room(room_id, &tx).await?;
1235            Ok(room)
1236        })
1237        .await
1238    }
1239}