projects.rs

   1use util::ResultExt;
   2
   3use super::*;
   4
   5impl Database {
   6    /// Returns the count of all projects, excluding ones marked as admin.
   7    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   8        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
   9        enum QueryAs {
  10            Count,
  11        }
  12
  13        self.transaction(|tx| async move {
  14            Ok(project::Entity::find()
  15                .select_only()
  16                .column_as(project::Column::Id.count(), QueryAs::Count)
  17                .inner_join(user::Entity)
  18                .filter(user::Column::Admin.eq(false))
  19                .into_values::<_, QueryAs>()
  20                .one(&*tx)
  21                .await?
  22                .unwrap_or(0i64) as usize)
  23        })
  24        .await
  25    }
  26
  27    /// Shares a project with the given room.
  28    pub async fn share_project(
  29        &self,
  30        room_id: RoomId,
  31        connection: ConnectionId,
  32        worktrees: &[proto::WorktreeMetadata],
  33        is_ssh_project: bool,
  34        dev_server_project_id: Option<DevServerProjectId>,
  35    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  36        self.room_transaction(room_id, |tx| async move {
  37            let participant = room_participant::Entity::find()
  38                .filter(
  39                    Condition::all()
  40                        .add(
  41                            room_participant::Column::AnsweringConnectionId
  42                                .eq(connection.id as i32),
  43                        )
  44                        .add(
  45                            room_participant::Column::AnsweringConnectionServerId
  46                                .eq(connection.owner_id as i32),
  47                        ),
  48                )
  49                .one(&*tx)
  50                .await?
  51                .ok_or_else(|| anyhow!("could not find participant"))?;
  52            if participant.room_id != room_id {
  53                return Err(anyhow!("shared project on unexpected room"))?;
  54            }
  55            if !participant
  56                .role
  57                .unwrap_or(ChannelRole::Member)
  58                .can_edit_projects()
  59            {
  60                return Err(anyhow!("guests cannot share projects"))?;
  61            }
  62
  63            if let Some(dev_server_project_id) = dev_server_project_id {
  64                let project = project::Entity::find()
  65                    .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
  66                    .one(&*tx)
  67                    .await?
  68                    .ok_or_else(|| anyhow!("no remote project"))?;
  69
  70                let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
  71                    .find_also_related(dev_server::Entity)
  72                    .one(&*tx)
  73                    .await?
  74                    .ok_or_else(|| anyhow!("no dev_server_project"))?;
  75
  76                if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
  77                    return Err(anyhow!("not your dev server"))?;
  78                }
  79
  80                if project.room_id.is_some() {
  81                    return Err(anyhow!("project already shared"))?;
  82                };
  83
  84                let project = project::Entity::update(project::ActiveModel {
  85                    room_id: ActiveValue::Set(Some(room_id)),
  86                    ..project.into_active_model()
  87                })
  88                .exec(&*tx)
  89                .await?;
  90
  91                let room = self.get_room(room_id, &tx).await?;
  92                return Ok((project.id, room));
  93            }
  94
  95            let project = project::ActiveModel {
  96                room_id: ActiveValue::set(Some(participant.room_id)),
  97                host_user_id: ActiveValue::set(Some(participant.user_id)),
  98                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  99                host_connection_server_id: ActiveValue::set(Some(ServerId(
 100                    connection.owner_id as i32,
 101                ))),
 102                id: ActiveValue::NotSet,
 103                hosted_project_id: ActiveValue::Set(None),
 104                dev_server_project_id: ActiveValue::Set(None),
 105            }
 106            .insert(&*tx)
 107            .await?;
 108
 109            if !worktrees.is_empty() {
 110                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
 111                    worktree::ActiveModel {
 112                        id: ActiveValue::set(worktree.id as i64),
 113                        project_id: ActiveValue::set(project.id),
 114                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
 115                        root_name: ActiveValue::set(worktree.root_name.clone()),
 116                        visible: ActiveValue::set(worktree.visible),
 117                        scan_id: ActiveValue::set(0),
 118                        completed_scan_id: ActiveValue::set(0),
 119                    }
 120                }))
 121                .exec(&*tx)
 122                .await?;
 123            }
 124
 125            let replica_id = if is_ssh_project { 1 } else { 0 };
 126
 127            project_collaborator::ActiveModel {
 128                project_id: ActiveValue::set(project.id),
 129                connection_id: ActiveValue::set(connection.id as i32),
 130                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 131                user_id: ActiveValue::set(participant.user_id),
 132                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 133                is_host: ActiveValue::set(true),
 134                ..Default::default()
 135            }
 136            .insert(&*tx)
 137            .await?;
 138
 139            let room = self.get_room(room_id, &tx).await?;
 140            Ok((project.id, room))
 141        })
 142        .await
 143    }
 144
 145    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 146        self.weak_transaction(|tx| async move {
 147            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 148            Ok(())
 149        })
 150        .await
 151    }
 152
 153    /// Unshares the given project.
 154    pub async fn unshare_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        user_id: Option<UserId>,
 159    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 162            let project = project::Entity::find_by_id(project_id)
 163                .one(&*tx)
 164                .await?
 165                .ok_or_else(|| anyhow!("project not found"))?;
 166            let room = if let Some(room_id) = project.room_id {
 167                Some(self.get_room(room_id, &tx).await?)
 168            } else {
 169                None
 170            };
 171            if project.host_connection()? == connection {
 172                return Ok((true, room, guest_connection_ids));
 173            }
 174            if let Some(dev_server_project_id) = project.dev_server_project_id {
 175                if let Some(user_id) = user_id {
 176                    if user_id
 177                        != self
 178                            .owner_for_dev_server_project(dev_server_project_id, &tx)
 179                            .await?
 180                    {
 181                        Err(anyhow!("cannot unshare a project hosted by another user"))?
 182                    }
 183                    project::Entity::update(project::ActiveModel {
 184                        room_id: ActiveValue::Set(None),
 185                        ..project.into_active_model()
 186                    })
 187                    .exec(&*tx)
 188                    .await?;
 189                    return Ok((false, room, guest_connection_ids));
 190                }
 191            }
 192
 193            Err(anyhow!("cannot unshare a project hosted by another user"))?
 194        })
 195        .await
 196    }
 197
 198    /// Updates the worktrees associated with the given project.
 199    pub async fn update_project(
 200        &self,
 201        project_id: ProjectId,
 202        connection: ConnectionId,
 203        worktrees: &[proto::WorktreeMetadata],
 204    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 205        self.project_transaction(project_id, |tx| async move {
 206            let project = project::Entity::find_by_id(project_id)
 207                .filter(
 208                    Condition::all()
 209                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 210                        .add(
 211                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 212                        ),
 213                )
 214                .one(&*tx)
 215                .await?
 216                .ok_or_else(|| anyhow!("no such project"))?;
 217
 218            self.update_project_worktrees(project.id, worktrees, &tx)
 219                .await?;
 220
 221            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 222
 223            let room = if let Some(room_id) = project.room_id {
 224                Some(self.get_room(room_id, &tx).await?)
 225            } else {
 226                None
 227            };
 228
 229            Ok((room, guest_connection_ids))
 230        })
 231        .await
 232    }
 233
 234    pub(in crate::db) async fn update_project_worktrees(
 235        &self,
 236        project_id: ProjectId,
 237        worktrees: &[proto::WorktreeMetadata],
 238        tx: &DatabaseTransaction,
 239    ) -> Result<()> {
 240        if !worktrees.is_empty() {
 241            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 242                id: ActiveValue::set(worktree.id as i64),
 243                project_id: ActiveValue::set(project_id),
 244                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 245                root_name: ActiveValue::set(worktree.root_name.clone()),
 246                visible: ActiveValue::set(worktree.visible),
 247                scan_id: ActiveValue::set(0),
 248                completed_scan_id: ActiveValue::set(0),
 249            }))
 250            .on_conflict(
 251                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 252                    .update_column(worktree::Column::RootName)
 253                    .to_owned(),
 254            )
 255            .exec(tx)
 256            .await?;
 257        }
 258
 259        worktree::Entity::delete_many()
 260            .filter(worktree::Column::ProjectId.eq(project_id).and(
 261                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 262            ))
 263            .exec(tx)
 264            .await?;
 265
 266        Ok(())
 267    }
 268
 269    pub async fn update_worktree(
 270        &self,
 271        update: &proto::UpdateWorktree,
 272        connection: ConnectionId,
 273    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 274        let project_id = ProjectId::from_proto(update.project_id);
 275        let worktree_id = update.worktree_id as i64;
 276        self.project_transaction(project_id, |tx| async move {
 277            // Ensure the update comes from the host.
 278            let _project = project::Entity::find_by_id(project_id)
 279                .filter(
 280                    Condition::all()
 281                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 282                        .add(
 283                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 284                        ),
 285                )
 286                .one(&*tx)
 287                .await?
 288                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 289
 290            // Update metadata.
 291            worktree::Entity::update(worktree::ActiveModel {
 292                id: ActiveValue::set(worktree_id),
 293                project_id: ActiveValue::set(project_id),
 294                root_name: ActiveValue::set(update.root_name.clone()),
 295                scan_id: ActiveValue::set(update.scan_id as i64),
 296                completed_scan_id: if update.is_last_update {
 297                    ActiveValue::set(update.scan_id as i64)
 298                } else {
 299                    ActiveValue::default()
 300                },
 301                abs_path: ActiveValue::set(update.abs_path.clone()),
 302                ..Default::default()
 303            })
 304            .exec(&*tx)
 305            .await?;
 306
 307            if !update.updated_entries.is_empty() {
 308                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 309                    let mtime = entry.mtime.clone().unwrap_or_default();
 310                    worktree_entry::ActiveModel {
 311                        project_id: ActiveValue::set(project_id),
 312                        worktree_id: ActiveValue::set(worktree_id),
 313                        id: ActiveValue::set(entry.id as i64),
 314                        is_dir: ActiveValue::set(entry.is_dir),
 315                        path: ActiveValue::set(entry.path.clone()),
 316                        inode: ActiveValue::set(entry.inode as i64),
 317                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 318                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 319                        is_symlink: ActiveValue::set(entry.is_symlink),
 320                        is_ignored: ActiveValue::set(entry.is_ignored),
 321                        is_external: ActiveValue::set(entry.is_external),
 322                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 323                        is_deleted: ActiveValue::set(false),
 324                        scan_id: ActiveValue::set(update.scan_id as i64),
 325                        is_fifo: ActiveValue::set(entry.is_fifo),
 326                    }
 327                }))
 328                .on_conflict(
 329                    OnConflict::columns([
 330                        worktree_entry::Column::ProjectId,
 331                        worktree_entry::Column::WorktreeId,
 332                        worktree_entry::Column::Id,
 333                    ])
 334                    .update_columns([
 335                        worktree_entry::Column::IsDir,
 336                        worktree_entry::Column::Path,
 337                        worktree_entry::Column::Inode,
 338                        worktree_entry::Column::MtimeSeconds,
 339                        worktree_entry::Column::MtimeNanos,
 340                        worktree_entry::Column::IsSymlink,
 341                        worktree_entry::Column::IsIgnored,
 342                        worktree_entry::Column::GitStatus,
 343                        worktree_entry::Column::ScanId,
 344                    ])
 345                    .to_owned(),
 346                )
 347                .exec(&*tx)
 348                .await?;
 349            }
 350
 351            if !update.removed_entries.is_empty() {
 352                worktree_entry::Entity::update_many()
 353                    .filter(
 354                        worktree_entry::Column::ProjectId
 355                            .eq(project_id)
 356                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 357                            .and(
 358                                worktree_entry::Column::Id
 359                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 360                            ),
 361                    )
 362                    .set(worktree_entry::ActiveModel {
 363                        is_deleted: ActiveValue::Set(true),
 364                        scan_id: ActiveValue::Set(update.scan_id as i64),
 365                        ..Default::default()
 366                    })
 367                    .exec(&*tx)
 368                    .await?;
 369            }
 370
 371            if !update.updated_repositories.is_empty() {
 372                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 373                    |repository| worktree_repository::ActiveModel {
 374                        project_id: ActiveValue::set(project_id),
 375                        worktree_id: ActiveValue::set(worktree_id),
 376                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 377                        scan_id: ActiveValue::set(update.scan_id as i64),
 378                        branch: ActiveValue::set(repository.branch.clone()),
 379                        is_deleted: ActiveValue::set(false),
 380                    },
 381                ))
 382                .on_conflict(
 383                    OnConflict::columns([
 384                        worktree_repository::Column::ProjectId,
 385                        worktree_repository::Column::WorktreeId,
 386                        worktree_repository::Column::WorkDirectoryId,
 387                    ])
 388                    .update_columns([
 389                        worktree_repository::Column::ScanId,
 390                        worktree_repository::Column::Branch,
 391                    ])
 392                    .to_owned(),
 393                )
 394                .exec(&*tx)
 395                .await?;
 396            }
 397
 398            if !update.removed_repositories.is_empty() {
 399                worktree_repository::Entity::update_many()
 400                    .filter(
 401                        worktree_repository::Column::ProjectId
 402                            .eq(project_id)
 403                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 404                            .and(
 405                                worktree_repository::Column::WorkDirectoryId
 406                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 407                            ),
 408                    )
 409                    .set(worktree_repository::ActiveModel {
 410                        is_deleted: ActiveValue::Set(true),
 411                        scan_id: ActiveValue::Set(update.scan_id as i64),
 412                        ..Default::default()
 413                    })
 414                    .exec(&*tx)
 415                    .await?;
 416            }
 417
 418            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 419            Ok(connection_ids)
 420        })
 421        .await
 422    }
 423
 424    /// Updates the diagnostic summary for the given connection.
 425    pub async fn update_diagnostic_summary(
 426        &self,
 427        update: &proto::UpdateDiagnosticSummary,
 428        connection: ConnectionId,
 429    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 430        let project_id = ProjectId::from_proto(update.project_id);
 431        let worktree_id = update.worktree_id as i64;
 432        self.project_transaction(project_id, |tx| async move {
 433            let summary = update
 434                .summary
 435                .as_ref()
 436                .ok_or_else(|| anyhow!("invalid summary"))?;
 437
 438            // Ensure the update comes from the host.
 439            let project = project::Entity::find_by_id(project_id)
 440                .one(&*tx)
 441                .await?
 442                .ok_or_else(|| anyhow!("no such project"))?;
 443            if project.host_connection()? != connection {
 444                return Err(anyhow!("can't update a project hosted by someone else"))?;
 445            }
 446
 447            // Update summary.
 448            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 449                project_id: ActiveValue::set(project_id),
 450                worktree_id: ActiveValue::set(worktree_id),
 451                path: ActiveValue::set(summary.path.clone()),
 452                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 453                error_count: ActiveValue::set(summary.error_count as i32),
 454                warning_count: ActiveValue::set(summary.warning_count as i32),
 455            })
 456            .on_conflict(
 457                OnConflict::columns([
 458                    worktree_diagnostic_summary::Column::ProjectId,
 459                    worktree_diagnostic_summary::Column::WorktreeId,
 460                    worktree_diagnostic_summary::Column::Path,
 461                ])
 462                .update_columns([
 463                    worktree_diagnostic_summary::Column::LanguageServerId,
 464                    worktree_diagnostic_summary::Column::ErrorCount,
 465                    worktree_diagnostic_summary::Column::WarningCount,
 466                ])
 467                .to_owned(),
 468            )
 469            .exec(&*tx)
 470            .await?;
 471
 472            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 473            Ok(connection_ids)
 474        })
 475        .await
 476    }
 477
 478    /// Starts the language server for the given connection.
 479    pub async fn start_language_server(
 480        &self,
 481        update: &proto::StartLanguageServer,
 482        connection: ConnectionId,
 483    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 484        let project_id = ProjectId::from_proto(update.project_id);
 485        self.project_transaction(project_id, |tx| async move {
 486            let server = update
 487                .server
 488                .as_ref()
 489                .ok_or_else(|| anyhow!("invalid language server"))?;
 490
 491            // Ensure the update comes from the host.
 492            let project = project::Entity::find_by_id(project_id)
 493                .one(&*tx)
 494                .await?
 495                .ok_or_else(|| anyhow!("no such project"))?;
 496            if project.host_connection()? != connection {
 497                return Err(anyhow!("can't update a project hosted by someone else"))?;
 498            }
 499
 500            // Add the newly-started language server.
 501            language_server::Entity::insert(language_server::ActiveModel {
 502                project_id: ActiveValue::set(project_id),
 503                id: ActiveValue::set(server.id as i64),
 504                name: ActiveValue::set(server.name.clone()),
 505            })
 506            .on_conflict(
 507                OnConflict::columns([
 508                    language_server::Column::ProjectId,
 509                    language_server::Column::Id,
 510                ])
 511                .update_column(language_server::Column::Name)
 512                .to_owned(),
 513            )
 514            .exec(&*tx)
 515            .await?;
 516
 517            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 518            Ok(connection_ids)
 519        })
 520        .await
 521    }
 522
 523    /// Updates the worktree settings for the given connection.
 524    pub async fn update_worktree_settings(
 525        &self,
 526        update: &proto::UpdateWorktreeSettings,
 527        connection: ConnectionId,
 528    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 529        let project_id = ProjectId::from_proto(update.project_id);
 530        self.project_transaction(project_id, |tx| async move {
 531            // Ensure the update comes from the host.
 532            let project = project::Entity::find_by_id(project_id)
 533                .one(&*tx)
 534                .await?
 535                .ok_or_else(|| anyhow!("no such project"))?;
 536            if project.host_connection()? != connection {
 537                return Err(anyhow!("can't update a project hosted by someone else"))?;
 538            }
 539
 540            if let Some(content) = &update.content {
 541                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 542                    project_id: ActiveValue::Set(project_id),
 543                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 544                    path: ActiveValue::Set(update.path.clone()),
 545                    content: ActiveValue::Set(content.clone()),
 546                })
 547                .on_conflict(
 548                    OnConflict::columns([
 549                        worktree_settings_file::Column::ProjectId,
 550                        worktree_settings_file::Column::WorktreeId,
 551                        worktree_settings_file::Column::Path,
 552                    ])
 553                    .update_column(worktree_settings_file::Column::Content)
 554                    .to_owned(),
 555                )
 556                .exec(&*tx)
 557                .await?;
 558            } else {
 559                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 560                    project_id: ActiveValue::Set(project_id),
 561                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 562                    path: ActiveValue::Set(update.path.clone()),
 563                    ..Default::default()
 564                })
 565                .exec(&*tx)
 566                .await?;
 567            }
 568
 569            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 570            Ok(connection_ids)
 571        })
 572        .await
 573    }
 574
 575    /// Adds the given connection to the specified hosted project
 576    pub async fn join_hosted_project(
 577        &self,
 578        id: ProjectId,
 579        user_id: UserId,
 580        connection: ConnectionId,
 581    ) -> Result<(Project, ReplicaId)> {
 582        self.transaction(|tx| async move {
 583            let (project, hosted_project) = project::Entity::find_by_id(id)
 584                .find_also_related(hosted_project::Entity)
 585                .one(&*tx)
 586                .await?
 587                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 588
 589            let Some(hosted_project) = hosted_project else {
 590                return Err(anyhow!("project is not hosted"))?;
 591            };
 592
 593            let channel = channel::Entity::find_by_id(hosted_project.channel_id)
 594                .one(&*tx)
 595                .await?
 596                .ok_or_else(|| anyhow!("no such channel"))?;
 597
 598            let role = self
 599                .check_user_is_channel_participant(&channel, user_id, &tx)
 600                .await?;
 601
 602            self.join_project_internal(project, user_id, connection, role, &tx)
 603                .await
 604        })
 605        .await
 606    }
 607
 608    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 609        self.transaction(|tx| async move {
 610            Ok(project::Entity::find_by_id(id)
 611                .one(&*tx)
 612                .await?
 613                .ok_or_else(|| anyhow!("no such project"))?)
 614        })
 615        .await
 616    }
 617
 618    pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
 619        self.transaction(|tx| async move {
 620            Ok(project::Entity::find()
 621                .filter(project::Column::DevServerProjectId.eq(id))
 622                .one(&*tx)
 623                .await?
 624                .ok_or_else(|| anyhow!("no such project"))?)
 625        })
 626        .await
 627    }
 628
 629    /// Adds the given connection to the specified project
 630    /// in the current room.
 631    pub async fn join_project(
 632        &self,
 633        project_id: ProjectId,
 634        connection: ConnectionId,
 635        user_id: UserId,
 636    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 637        self.project_transaction(project_id, |tx| async move {
 638            let (project, role) = self
 639                .access_project(
 640                    project_id,
 641                    connection,
 642                    PrincipalId::UserId(user_id),
 643                    Capability::ReadOnly,
 644                    &tx,
 645                )
 646                .await?;
 647            self.join_project_internal(project, user_id, connection, role, &tx)
 648                .await
 649        })
 650        .await
 651    }
 652
 653    async fn join_project_internal(
 654        &self,
 655        project: project::Model,
 656        user_id: UserId,
 657        connection: ConnectionId,
 658        role: ChannelRole,
 659        tx: &DatabaseTransaction,
 660    ) -> Result<(Project, ReplicaId)> {
 661        let mut collaborators = project
 662            .find_related(project_collaborator::Entity)
 663            .all(tx)
 664            .await?;
 665        let replica_ids = collaborators
 666            .iter()
 667            .map(|c| c.replica_id)
 668            .collect::<HashSet<_>>();
 669        let mut replica_id = ReplicaId(1);
 670        while replica_ids.contains(&replica_id) {
 671            replica_id.0 += 1;
 672        }
 673        let new_collaborator = project_collaborator::ActiveModel {
 674            project_id: ActiveValue::set(project.id),
 675            connection_id: ActiveValue::set(connection.id as i32),
 676            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 677            user_id: ActiveValue::set(user_id),
 678            replica_id: ActiveValue::set(replica_id),
 679            is_host: ActiveValue::set(false),
 680            ..Default::default()
 681        }
 682        .insert(tx)
 683        .await?;
 684        collaborators.push(new_collaborator);
 685
 686        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 687        let mut worktrees = db_worktrees
 688            .into_iter()
 689            .map(|db_worktree| {
 690                (
 691                    db_worktree.id as u64,
 692                    Worktree {
 693                        id: db_worktree.id as u64,
 694                        abs_path: db_worktree.abs_path,
 695                        root_name: db_worktree.root_name,
 696                        visible: db_worktree.visible,
 697                        entries: Default::default(),
 698                        repository_entries: Default::default(),
 699                        diagnostic_summaries: Default::default(),
 700                        settings_files: Default::default(),
 701                        scan_id: db_worktree.scan_id as u64,
 702                        completed_scan_id: db_worktree.completed_scan_id as u64,
 703                    },
 704                )
 705            })
 706            .collect::<BTreeMap<_, _>>();
 707
 708        // Populate worktree entries.
 709        {
 710            let mut db_entries = worktree_entry::Entity::find()
 711                .filter(
 712                    Condition::all()
 713                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 714                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 715                )
 716                .stream(tx)
 717                .await?;
 718            while let Some(db_entry) = db_entries.next().await {
 719                let db_entry = db_entry?;
 720                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 721                    worktree.entries.push(proto::Entry {
 722                        id: db_entry.id as u64,
 723                        is_dir: db_entry.is_dir,
 724                        path: db_entry.path,
 725                        inode: db_entry.inode as u64,
 726                        mtime: Some(proto::Timestamp {
 727                            seconds: db_entry.mtime_seconds as u64,
 728                            nanos: db_entry.mtime_nanos as u32,
 729                        }),
 730                        is_symlink: db_entry.is_symlink,
 731                        is_ignored: db_entry.is_ignored,
 732                        is_external: db_entry.is_external,
 733                        git_status: db_entry.git_status.map(|status| status as i32),
 734                        // This is only used in the summarization backlog, so if it's None,
 735                        // that just means we won't be able to detect when to resummarize
 736                        // based on total number of backlogged bytes - instead, we'd go
 737                        // on number of files only. That shouldn't be a huge deal in practice.
 738                        size: None,
 739                        is_fifo: db_entry.is_fifo,
 740                    });
 741                }
 742            }
 743        }
 744
 745        // Populate repository entries.
 746        {
 747            let mut db_repository_entries = worktree_repository::Entity::find()
 748                .filter(
 749                    Condition::all()
 750                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 751                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 752                )
 753                .stream(tx)
 754                .await?;
 755            while let Some(db_repository_entry) = db_repository_entries.next().await {
 756                let db_repository_entry = db_repository_entry?;
 757                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 758                {
 759                    worktree.repository_entries.insert(
 760                        db_repository_entry.work_directory_id as u64,
 761                        proto::RepositoryEntry {
 762                            work_directory_id: db_repository_entry.work_directory_id as u64,
 763                            branch: db_repository_entry.branch,
 764                        },
 765                    );
 766                }
 767            }
 768        }
 769
 770        // Populate worktree diagnostic summaries.
 771        {
 772            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 773                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 774                .stream(tx)
 775                .await?;
 776            while let Some(db_summary) = db_summaries.next().await {
 777                let db_summary = db_summary?;
 778                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 779                    worktree
 780                        .diagnostic_summaries
 781                        .push(proto::DiagnosticSummary {
 782                            path: db_summary.path,
 783                            language_server_id: db_summary.language_server_id as u64,
 784                            error_count: db_summary.error_count as u32,
 785                            warning_count: db_summary.warning_count as u32,
 786                        });
 787                }
 788            }
 789        }
 790
 791        // Populate worktree settings files
 792        {
 793            let mut db_settings_files = worktree_settings_file::Entity::find()
 794                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 795                .stream(tx)
 796                .await?;
 797            while let Some(db_settings_file) = db_settings_files.next().await {
 798                let db_settings_file = db_settings_file?;
 799                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 800                    worktree.settings_files.push(WorktreeSettingsFile {
 801                        path: db_settings_file.path,
 802                        content: db_settings_file.content,
 803                    });
 804                }
 805            }
 806        }
 807
 808        // Populate language servers.
 809        let language_servers = project
 810            .find_related(language_server::Entity)
 811            .all(tx)
 812            .await?;
 813
 814        let project = Project {
 815            id: project.id,
 816            role,
 817            collaborators: collaborators
 818                .into_iter()
 819                .map(|collaborator| ProjectCollaborator {
 820                    connection_id: collaborator.connection(),
 821                    user_id: collaborator.user_id,
 822                    replica_id: collaborator.replica_id,
 823                    is_host: collaborator.is_host,
 824                })
 825                .collect(),
 826            worktrees,
 827            language_servers: language_servers
 828                .into_iter()
 829                .map(|language_server| proto::LanguageServer {
 830                    id: language_server.id as u64,
 831                    name: language_server.name,
 832                })
 833                .collect(),
 834            dev_server_project_id: project.dev_server_project_id,
 835        };
 836        Ok((project, replica_id as ReplicaId))
 837    }
 838
 839    pub async fn leave_hosted_project(
 840        &self,
 841        project_id: ProjectId,
 842        connection: ConnectionId,
 843    ) -> Result<LeftProject> {
 844        self.transaction(|tx| async move {
 845            let result = project_collaborator::Entity::delete_many()
 846                .filter(
 847                    Condition::all()
 848                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 849                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 850                        .add(
 851                            project_collaborator::Column::ConnectionServerId
 852                                .eq(connection.owner_id as i32),
 853                        ),
 854                )
 855                .exec(&*tx)
 856                .await?;
 857            if result.rows_affected == 0 {
 858                return Err(anyhow!("not in the project"))?;
 859            }
 860
 861            let project = project::Entity::find_by_id(project_id)
 862                .one(&*tx)
 863                .await?
 864                .ok_or_else(|| anyhow!("no such project"))?;
 865            let collaborators = project
 866                .find_related(project_collaborator::Entity)
 867                .all(&*tx)
 868                .await?;
 869            let connection_ids = collaborators
 870                .into_iter()
 871                .map(|collaborator| collaborator.connection())
 872                .collect();
 873            Ok(LeftProject {
 874                id: project.id,
 875                connection_ids,
 876                should_unshare: false,
 877            })
 878        })
 879        .await
 880    }
 881
 882    /// Removes the given connection from the specified project.
 883    pub async fn leave_project(
 884        &self,
 885        project_id: ProjectId,
 886        connection: ConnectionId,
 887    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 888        self.project_transaction(project_id, |tx| async move {
 889            let result = project_collaborator::Entity::delete_many()
 890                .filter(
 891                    Condition::all()
 892                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 893                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 894                        .add(
 895                            project_collaborator::Column::ConnectionServerId
 896                                .eq(connection.owner_id as i32),
 897                        ),
 898                )
 899                .exec(&*tx)
 900                .await?;
 901            if result.rows_affected == 0 {
 902                Err(anyhow!("not a collaborator on this project"))?;
 903            }
 904
 905            let project = project::Entity::find_by_id(project_id)
 906                .one(&*tx)
 907                .await?
 908                .ok_or_else(|| anyhow!("no such project"))?;
 909            let collaborators = project
 910                .find_related(project_collaborator::Entity)
 911                .all(&*tx)
 912                .await?;
 913            let connection_ids: Vec<ConnectionId> = collaborators
 914                .into_iter()
 915                .map(|collaborator| collaborator.connection())
 916                .collect();
 917
 918            follower::Entity::delete_many()
 919                .filter(
 920                    Condition::any()
 921                        .add(
 922                            Condition::all()
 923                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 924                                .add(
 925                                    follower::Column::LeaderConnectionServerId
 926                                        .eq(connection.owner_id),
 927                                )
 928                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 929                        )
 930                        .add(
 931                            Condition::all()
 932                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 933                                .add(
 934                                    follower::Column::FollowerConnectionServerId
 935                                        .eq(connection.owner_id),
 936                                )
 937                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 938                        ),
 939                )
 940                .exec(&*tx)
 941                .await?;
 942
 943            let room = if let Some(room_id) = project.room_id {
 944                Some(self.get_room(room_id, &tx).await?)
 945            } else {
 946                None
 947            };
 948
 949            let left_project = LeftProject {
 950                id: project_id,
 951                should_unshare: connection == project.host_connection()?,
 952                connection_ids,
 953            };
 954            Ok((room, left_project))
 955        })
 956        .await
 957    }
 958
 959    pub async fn check_user_is_project_host(
 960        &self,
 961        project_id: ProjectId,
 962        connection_id: ConnectionId,
 963    ) -> Result<()> {
 964        self.project_transaction(project_id, |tx| async move {
 965            project::Entity::find()
 966                .filter(
 967                    Condition::all()
 968                        .add(project::Column::Id.eq(project_id))
 969                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 970                        .add(
 971                            project::Column::HostConnectionServerId
 972                                .eq(Some(connection_id.owner_id as i32)),
 973                        ),
 974                )
 975                .one(&*tx)
 976                .await?
 977                .ok_or_else(|| anyhow!("failed to read project host"))?;
 978
 979            Ok(())
 980        })
 981        .await
 982        .map(|guard| guard.into_inner())
 983    }
 984
 985    /// Returns the current project if the given user is authorized to access it with the specified capability.
 986    pub async fn access_project(
 987        &self,
 988        project_id: ProjectId,
 989        connection_id: ConnectionId,
 990        principal_id: PrincipalId,
 991        capability: Capability,
 992        tx: &DatabaseTransaction,
 993    ) -> Result<(project::Model, ChannelRole)> {
 994        let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
 995            .find_also_related(dev_server_project::Entity)
 996            .one(tx)
 997            .await?
 998            .ok_or_else(|| anyhow!("no such project"))?;
 999
1000        let user_id = match principal_id {
1001            PrincipalId::DevServerId(_) => {
1002                if project
1003                    .host_connection()
1004                    .is_ok_and(|connection| connection == connection_id)
1005                {
1006                    return Ok((project, ChannelRole::Admin));
1007                }
1008                return Err(anyhow!("not the project host"))?;
1009            }
1010            PrincipalId::UserId(user_id) => user_id,
1011        };
1012
1013        let role_from_room = if let Some(room_id) = project.room_id {
1014            room_participant::Entity::find()
1015                .filter(room_participant::Column::RoomId.eq(room_id))
1016                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1017                .one(tx)
1018                .await?
1019                .and_then(|participant| participant.role)
1020        } else {
1021            None
1022        };
1023        let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1024            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1025                .one(tx)
1026                .await?
1027                .ok_or_else(|| anyhow!("no such channel"))?;
1028            if user_id == dev_server.user_id {
1029                // If the user left the room "uncleanly" they may rejoin the
1030                // remote project before leave_room runs. IN that case kick
1031                // the project out of the room pre-emptively.
1032                if role_from_room.is_none() {
1033                    project = project::Entity::update(project::ActiveModel {
1034                        room_id: ActiveValue::Set(None),
1035                        ..project.into_active_model()
1036                    })
1037                    .exec(tx)
1038                    .await?;
1039                }
1040                Some(ChannelRole::Admin)
1041            } else {
1042                None
1043            }
1044        } else {
1045            None
1046        };
1047
1048        let role = role_from_dev_server
1049            .or(role_from_room)
1050            .unwrap_or(ChannelRole::Banned);
1051
1052        match capability {
1053            Capability::ReadWrite => {
1054                if !role.can_edit_projects() {
1055                    return Err(anyhow!("not authorized to edit projects"))?;
1056                }
1057            }
1058            Capability::ReadOnly => {
1059                if !role.can_read_projects() {
1060                    return Err(anyhow!("not authorized to read projects"))?;
1061                }
1062            }
1063        }
1064
1065        Ok((project, role))
1066    }
1067
1068    /// Returns the host connection for a read-only request to join a shared project.
1069    pub async fn host_for_read_only_project_request(
1070        &self,
1071        project_id: ProjectId,
1072        connection_id: ConnectionId,
1073        user_id: UserId,
1074    ) -> Result<ConnectionId> {
1075        self.project_transaction(project_id, |tx| async move {
1076            let (project, _) = self
1077                .access_project(
1078                    project_id,
1079                    connection_id,
1080                    PrincipalId::UserId(user_id),
1081                    Capability::ReadOnly,
1082                    &tx,
1083                )
1084                .await?;
1085            project.host_connection()
1086        })
1087        .await
1088        .map(|guard| guard.into_inner())
1089    }
1090
1091    /// Returns the host connection for a request to join a shared project.
1092    pub async fn host_for_mutating_project_request(
1093        &self,
1094        project_id: ProjectId,
1095        connection_id: ConnectionId,
1096        user_id: UserId,
1097    ) -> Result<ConnectionId> {
1098        self.project_transaction(project_id, |tx| async move {
1099            let (project, _) = self
1100                .access_project(
1101                    project_id,
1102                    connection_id,
1103                    PrincipalId::UserId(user_id),
1104                    Capability::ReadWrite,
1105                    &tx,
1106                )
1107                .await?;
1108            project.host_connection()
1109        })
1110        .await
1111        .map(|guard| guard.into_inner())
1112    }
1113
1114    /// Returns the host connection for a request to join a shared project.
1115    pub async fn host_for_owner_project_request(
1116        &self,
1117        project_id: ProjectId,
1118        _connection_id: ConnectionId,
1119        user_id: UserId,
1120    ) -> Result<ConnectionId> {
1121        self.project_transaction(project_id, |tx| async move {
1122            let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1123                .find_also_related(dev_server_project::Entity)
1124                .one(&*tx)
1125                .await?
1126                .ok_or_else(|| anyhow!("no such project"))?;
1127
1128            let Some(dev_server_project) = dev_server_project else {
1129                return Err(anyhow!("not a dev server project"))?;
1130            };
1131            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1132                .one(&*tx)
1133                .await?
1134                .ok_or_else(|| anyhow!("no such dev server"))?;
1135            if dev_server.user_id != user_id {
1136                return Err(anyhow!("not your project"))?;
1137            }
1138            project.host_connection()
1139        })
1140        .await
1141        .map(|guard| guard.into_inner())
1142    }
1143
1144    pub async fn connections_for_buffer_update(
1145        &self,
1146        project_id: ProjectId,
1147        principal_id: PrincipalId,
1148        connection_id: ConnectionId,
1149        capability: Capability,
1150    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1151        self.project_transaction(project_id, |tx| async move {
1152            // Authorize
1153            let (project, _) = self
1154                .access_project(project_id, connection_id, principal_id, capability, &tx)
1155                .await?;
1156
1157            let host_connection_id = project.host_connection()?;
1158
1159            let collaborators = project_collaborator::Entity::find()
1160                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1161                .all(&*tx)
1162                .await?;
1163
1164            let guest_connection_ids = collaborators
1165                .into_iter()
1166                .filter_map(|collaborator| {
1167                    if collaborator.is_host {
1168                        None
1169                    } else {
1170                        Some(collaborator.connection())
1171                    }
1172                })
1173                .collect();
1174
1175            Ok((host_connection_id, guest_connection_ids))
1176        })
1177        .await
1178    }
1179
1180    /// Returns the connection IDs in the given project.
1181    ///
1182    /// The provided `connection_id` must also be a collaborator in the project,
1183    /// otherwise an error will be returned.
1184    pub async fn project_connection_ids(
1185        &self,
1186        project_id: ProjectId,
1187        connection_id: ConnectionId,
1188        exclude_dev_server: bool,
1189    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1190        self.project_transaction(project_id, |tx| async move {
1191            let project = project::Entity::find_by_id(project_id)
1192                .one(&*tx)
1193                .await?
1194                .ok_or_else(|| anyhow!("no such project"))?;
1195
1196            let mut collaborators = project_collaborator::Entity::find()
1197                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1198                .stream(&*tx)
1199                .await?;
1200
1201            let mut connection_ids = HashSet::default();
1202            if let Some(host_connection) = project.host_connection().log_err() {
1203                if !exclude_dev_server {
1204                    connection_ids.insert(host_connection);
1205                }
1206            }
1207
1208            while let Some(collaborator) = collaborators.next().await {
1209                let collaborator = collaborator?;
1210                connection_ids.insert(collaborator.connection());
1211            }
1212
1213            if connection_ids.contains(&connection_id)
1214                || Some(connection_id) == project.host_connection().ok()
1215            {
1216                Ok(connection_ids)
1217            } else {
1218                Err(anyhow!(
1219                    "can only send project updates to a project you're in"
1220                ))?
1221            }
1222        })
1223        .await
1224    }
1225
1226    async fn project_guest_connection_ids(
1227        &self,
1228        project_id: ProjectId,
1229        tx: &DatabaseTransaction,
1230    ) -> Result<Vec<ConnectionId>> {
1231        let mut collaborators = project_collaborator::Entity::find()
1232            .filter(
1233                project_collaborator::Column::ProjectId
1234                    .eq(project_id)
1235                    .and(project_collaborator::Column::IsHost.eq(false)),
1236            )
1237            .stream(tx)
1238            .await?;
1239
1240        let mut guest_connection_ids = Vec::new();
1241        while let Some(collaborator) = collaborators.next().await {
1242            let collaborator = collaborator?;
1243            guest_connection_ids.push(collaborator.connection());
1244        }
1245        Ok(guest_connection_ids)
1246    }
1247
1248    /// Returns the [`RoomId`] for the given project.
1249    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1250        self.transaction(|tx| async move {
1251            Ok(project::Entity::find_by_id(project_id)
1252                .one(&*tx)
1253                .await?
1254                .and_then(|project| project.room_id))
1255        })
1256        .await
1257    }
1258
1259    pub async fn check_room_participants(
1260        &self,
1261        room_id: RoomId,
1262        leader_id: ConnectionId,
1263        follower_id: ConnectionId,
1264    ) -> Result<()> {
1265        self.transaction(|tx| async move {
1266            use room_participant::Column;
1267
1268            let count = room_participant::Entity::find()
1269                .filter(
1270                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1271                        Condition::any()
1272                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1273                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1274                            ))
1275                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1276                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1277                            )),
1278                    ),
1279                )
1280                .count(&*tx)
1281                .await?;
1282
1283            if count < 2 {
1284                Err(anyhow!("not room participants"))?;
1285            }
1286
1287            Ok(())
1288        })
1289        .await
1290    }
1291
1292    /// Adds the given follower connection as a follower of the given leader connection.
1293    pub async fn follow(
1294        &self,
1295        room_id: RoomId,
1296        project_id: ProjectId,
1297        leader_connection: ConnectionId,
1298        follower_connection: ConnectionId,
1299    ) -> Result<TransactionGuard<proto::Room>> {
1300        self.room_transaction(room_id, |tx| async move {
1301            follower::ActiveModel {
1302                room_id: ActiveValue::set(room_id),
1303                project_id: ActiveValue::set(project_id),
1304                leader_connection_server_id: ActiveValue::set(ServerId(
1305                    leader_connection.owner_id as i32,
1306                )),
1307                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1308                follower_connection_server_id: ActiveValue::set(ServerId(
1309                    follower_connection.owner_id as i32,
1310                )),
1311                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1312                ..Default::default()
1313            }
1314            .insert(&*tx)
1315            .await?;
1316
1317            let room = self.get_room(room_id, &tx).await?;
1318            Ok(room)
1319        })
1320        .await
1321    }
1322
1323    /// Removes the given follower connection as a follower of the given leader connection.
1324    pub async fn unfollow(
1325        &self,
1326        room_id: RoomId,
1327        project_id: ProjectId,
1328        leader_connection: ConnectionId,
1329        follower_connection: ConnectionId,
1330    ) -> Result<TransactionGuard<proto::Room>> {
1331        self.room_transaction(room_id, |tx| async move {
1332            follower::Entity::delete_many()
1333                .filter(
1334                    Condition::all()
1335                        .add(follower::Column::RoomId.eq(room_id))
1336                        .add(follower::Column::ProjectId.eq(project_id))
1337                        .add(
1338                            follower::Column::LeaderConnectionServerId
1339                                .eq(leader_connection.owner_id),
1340                        )
1341                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1342                        .add(
1343                            follower::Column::FollowerConnectionServerId
1344                                .eq(follower_connection.owner_id),
1345                        )
1346                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1347                )
1348                .exec(&*tx)
1349                .await?;
1350
1351            let room = self.get_room(room_id, &tx).await?;
1352            Ok(room)
1353        })
1354        .await
1355    }
1356}