projects.rs

   1use util::ResultExt;
   2
   3use super::*;
   4
   5impl Database {
   6    /// Returns the count of all projects, excluding ones marked as admin.
   7    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   8        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
   9        enum QueryAs {
  10            Count,
  11        }
  12
  13        self.transaction(|tx| async move {
  14            Ok(project::Entity::find()
  15                .select_only()
  16                .column_as(project::Column::Id.count(), QueryAs::Count)
  17                .inner_join(user::Entity)
  18                .filter(user::Column::Admin.eq(false))
  19                .into_values::<_, QueryAs>()
  20                .one(&*tx)
  21                .await?
  22                .unwrap_or(0i64) as usize)
  23        })
  24        .await
  25    }
  26
  27    /// Shares a project with the given room.
  28    pub async fn share_project(
  29        &self,
  30        room_id: RoomId,
  31        connection: ConnectionId,
  32        worktrees: &[proto::WorktreeMetadata],
  33        dev_server_project_id: Option<DevServerProjectId>,
  34    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  35        self.room_transaction(room_id, |tx| async move {
  36            let participant = room_participant::Entity::find()
  37                .filter(
  38                    Condition::all()
  39                        .add(
  40                            room_participant::Column::AnsweringConnectionId
  41                                .eq(connection.id as i32),
  42                        )
  43                        .add(
  44                            room_participant::Column::AnsweringConnectionServerId
  45                                .eq(connection.owner_id as i32),
  46                        ),
  47                )
  48                .one(&*tx)
  49                .await?
  50                .ok_or_else(|| anyhow!("could not find participant"))?;
  51            if participant.room_id != room_id {
  52                return Err(anyhow!("shared project on unexpected room"))?;
  53            }
  54            if !participant
  55                .role
  56                .unwrap_or(ChannelRole::Member)
  57                .can_edit_projects()
  58            {
  59                return Err(anyhow!("guests cannot share projects"))?;
  60            }
  61
  62            if let Some(dev_server_project_id) = dev_server_project_id {
  63                let project = project::Entity::find()
  64                    .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
  65                    .one(&*tx)
  66                    .await?
  67                    .ok_or_else(|| anyhow!("no remote project"))?;
  68
  69                if project.room_id.is_some() {
  70                    return Err(anyhow!("project already shared"))?;
  71                };
  72
  73                let project = project::Entity::update(project::ActiveModel {
  74                    room_id: ActiveValue::Set(Some(room_id)),
  75                    ..project.into_active_model()
  76                })
  77                .exec(&*tx)
  78                .await?;
  79
  80                // todo! check user is a project-collaborator
  81
  82                let room = self.get_room(room_id, &tx).await?;
  83                return Ok((project.id, room));
  84            }
  85
  86            let project = project::ActiveModel {
  87                room_id: ActiveValue::set(Some(participant.room_id)),
  88                host_user_id: ActiveValue::set(Some(participant.user_id)),
  89                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  90                host_connection_server_id: ActiveValue::set(Some(ServerId(
  91                    connection.owner_id as i32,
  92                ))),
  93                id: ActiveValue::NotSet,
  94                hosted_project_id: ActiveValue::Set(None),
  95                dev_server_project_id: ActiveValue::Set(None),
  96            }
  97            .insert(&*tx)
  98            .await?;
  99
 100            if !worktrees.is_empty() {
 101                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
 102                    worktree::ActiveModel {
 103                        id: ActiveValue::set(worktree.id as i64),
 104                        project_id: ActiveValue::set(project.id),
 105                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
 106                        root_name: ActiveValue::set(worktree.root_name.clone()),
 107                        visible: ActiveValue::set(worktree.visible),
 108                        scan_id: ActiveValue::set(0),
 109                        completed_scan_id: ActiveValue::set(0),
 110                    }
 111                }))
 112                .exec(&*tx)
 113                .await?;
 114            }
 115
 116            project_collaborator::ActiveModel {
 117                project_id: ActiveValue::set(project.id),
 118                connection_id: ActiveValue::set(connection.id as i32),
 119                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 120                user_id: ActiveValue::set(participant.user_id),
 121                replica_id: ActiveValue::set(ReplicaId(0)),
 122                is_host: ActiveValue::set(true),
 123                ..Default::default()
 124            }
 125            .insert(&*tx)
 126            .await?;
 127
 128            let room = self.get_room(room_id, &tx).await?;
 129            Ok((project.id, room))
 130        })
 131        .await
 132    }
 133
 134    /// Unshares the given project.
 135    pub async fn unshare_project(
 136        &self,
 137        project_id: ProjectId,
 138        connection: ConnectionId,
 139        user_id: Option<UserId>,
 140    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 141        self.project_transaction(project_id, |tx| async move {
 142            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 143            let project = project::Entity::find_by_id(project_id)
 144                .one(&*tx)
 145                .await?
 146                .ok_or_else(|| anyhow!("project not found"))?;
 147            let room = if let Some(room_id) = project.room_id {
 148                Some(self.get_room(room_id, &tx).await?)
 149            } else {
 150                None
 151            };
 152            if project.host_connection()? == connection {
 153                project::Entity::delete(project.into_active_model())
 154                    .exec(&*tx)
 155                    .await?;
 156                return Ok((room, guest_connection_ids));
 157            }
 158            if let Some(dev_server_project_id) = project.dev_server_project_id {
 159                if let Some(user_id) = user_id {
 160                    if user_id
 161                        != self
 162                            .owner_for_dev_server_project(dev_server_project_id, &tx)
 163                            .await?
 164                    {
 165                        Err(anyhow!("cannot unshare a project hosted by another user"))?
 166                    }
 167                    project::Entity::update(project::ActiveModel {
 168                        room_id: ActiveValue::Set(None),
 169                        ..project.into_active_model()
 170                    })
 171                    .exec(&*tx)
 172                    .await?;
 173                    return Ok((room, guest_connection_ids));
 174                }
 175            }
 176
 177            Err(anyhow!("cannot unshare a project hosted by another user"))?
 178        })
 179        .await
 180    }
 181
 182    /// Updates the worktrees associated with the given project.
 183    pub async fn update_project(
 184        &self,
 185        project_id: ProjectId,
 186        connection: ConnectionId,
 187        worktrees: &[proto::WorktreeMetadata],
 188    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 189        self.project_transaction(project_id, |tx| async move {
 190            let project = project::Entity::find_by_id(project_id)
 191                .filter(
 192                    Condition::all()
 193                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 194                        .add(
 195                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 196                        ),
 197                )
 198                .one(&*tx)
 199                .await?
 200                .ok_or_else(|| anyhow!("no such project"))?;
 201
 202            self.update_project_worktrees(project.id, worktrees, &tx)
 203                .await?;
 204
 205            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 206
 207            let room = if let Some(room_id) = project.room_id {
 208                Some(self.get_room(room_id, &tx).await?)
 209            } else {
 210                None
 211            };
 212
 213            Ok((room, guest_connection_ids))
 214        })
 215        .await
 216    }
 217
 218    pub(in crate::db) async fn update_project_worktrees(
 219        &self,
 220        project_id: ProjectId,
 221        worktrees: &[proto::WorktreeMetadata],
 222        tx: &DatabaseTransaction,
 223    ) -> Result<()> {
 224        if !worktrees.is_empty() {
 225            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 226                id: ActiveValue::set(worktree.id as i64),
 227                project_id: ActiveValue::set(project_id),
 228                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 229                root_name: ActiveValue::set(worktree.root_name.clone()),
 230                visible: ActiveValue::set(worktree.visible),
 231                scan_id: ActiveValue::set(0),
 232                completed_scan_id: ActiveValue::set(0),
 233            }))
 234            .on_conflict(
 235                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 236                    .update_column(worktree::Column::RootName)
 237                    .to_owned(),
 238            )
 239            .exec(tx)
 240            .await?;
 241        }
 242
 243        worktree::Entity::delete_many()
 244            .filter(worktree::Column::ProjectId.eq(project_id).and(
 245                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 246            ))
 247            .exec(tx)
 248            .await?;
 249
 250        Ok(())
 251    }
 252
 253    pub async fn update_worktree(
 254        &self,
 255        update: &proto::UpdateWorktree,
 256        connection: ConnectionId,
 257    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 258        let project_id = ProjectId::from_proto(update.project_id);
 259        let worktree_id = update.worktree_id as i64;
 260        self.project_transaction(project_id, |tx| async move {
 261            // Ensure the update comes from the host.
 262            let _project = project::Entity::find_by_id(project_id)
 263                .filter(
 264                    Condition::all()
 265                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 266                        .add(
 267                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 268                        ),
 269                )
 270                .one(&*tx)
 271                .await?
 272                .ok_or_else(|| anyhow!("no such project"))?;
 273
 274            // Update metadata.
 275            worktree::Entity::update(worktree::ActiveModel {
 276                id: ActiveValue::set(worktree_id),
 277                project_id: ActiveValue::set(project_id),
 278                root_name: ActiveValue::set(update.root_name.clone()),
 279                scan_id: ActiveValue::set(update.scan_id as i64),
 280                completed_scan_id: if update.is_last_update {
 281                    ActiveValue::set(update.scan_id as i64)
 282                } else {
 283                    ActiveValue::default()
 284                },
 285                abs_path: ActiveValue::set(update.abs_path.clone()),
 286                ..Default::default()
 287            })
 288            .exec(&*tx)
 289            .await?;
 290
 291            if !update.updated_entries.is_empty() {
 292                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 293                    let mtime = entry.mtime.clone().unwrap_or_default();
 294                    worktree_entry::ActiveModel {
 295                        project_id: ActiveValue::set(project_id),
 296                        worktree_id: ActiveValue::set(worktree_id),
 297                        id: ActiveValue::set(entry.id as i64),
 298                        is_dir: ActiveValue::set(entry.is_dir),
 299                        path: ActiveValue::set(entry.path.clone()),
 300                        inode: ActiveValue::set(entry.inode as i64),
 301                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 302                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 303                        is_symlink: ActiveValue::set(entry.is_symlink),
 304                        is_ignored: ActiveValue::set(entry.is_ignored),
 305                        is_external: ActiveValue::set(entry.is_external),
 306                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 307                        is_deleted: ActiveValue::set(false),
 308                        scan_id: ActiveValue::set(update.scan_id as i64),
 309                    }
 310                }))
 311                .on_conflict(
 312                    OnConflict::columns([
 313                        worktree_entry::Column::ProjectId,
 314                        worktree_entry::Column::WorktreeId,
 315                        worktree_entry::Column::Id,
 316                    ])
 317                    .update_columns([
 318                        worktree_entry::Column::IsDir,
 319                        worktree_entry::Column::Path,
 320                        worktree_entry::Column::Inode,
 321                        worktree_entry::Column::MtimeSeconds,
 322                        worktree_entry::Column::MtimeNanos,
 323                        worktree_entry::Column::IsSymlink,
 324                        worktree_entry::Column::IsIgnored,
 325                        worktree_entry::Column::GitStatus,
 326                        worktree_entry::Column::ScanId,
 327                    ])
 328                    .to_owned(),
 329                )
 330                .exec(&*tx)
 331                .await?;
 332            }
 333
 334            if !update.removed_entries.is_empty() {
 335                worktree_entry::Entity::update_many()
 336                    .filter(
 337                        worktree_entry::Column::ProjectId
 338                            .eq(project_id)
 339                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 340                            .and(
 341                                worktree_entry::Column::Id
 342                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 343                            ),
 344                    )
 345                    .set(worktree_entry::ActiveModel {
 346                        is_deleted: ActiveValue::Set(true),
 347                        scan_id: ActiveValue::Set(update.scan_id as i64),
 348                        ..Default::default()
 349                    })
 350                    .exec(&*tx)
 351                    .await?;
 352            }
 353
 354            if !update.updated_repositories.is_empty() {
 355                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 356                    |repository| worktree_repository::ActiveModel {
 357                        project_id: ActiveValue::set(project_id),
 358                        worktree_id: ActiveValue::set(worktree_id),
 359                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 360                        scan_id: ActiveValue::set(update.scan_id as i64),
 361                        branch: ActiveValue::set(repository.branch.clone()),
 362                        is_deleted: ActiveValue::set(false),
 363                    },
 364                ))
 365                .on_conflict(
 366                    OnConflict::columns([
 367                        worktree_repository::Column::ProjectId,
 368                        worktree_repository::Column::WorktreeId,
 369                        worktree_repository::Column::WorkDirectoryId,
 370                    ])
 371                    .update_columns([
 372                        worktree_repository::Column::ScanId,
 373                        worktree_repository::Column::Branch,
 374                    ])
 375                    .to_owned(),
 376                )
 377                .exec(&*tx)
 378                .await?;
 379            }
 380
 381            if !update.removed_repositories.is_empty() {
 382                worktree_repository::Entity::update_many()
 383                    .filter(
 384                        worktree_repository::Column::ProjectId
 385                            .eq(project_id)
 386                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 387                            .and(
 388                                worktree_repository::Column::WorkDirectoryId
 389                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 390                            ),
 391                    )
 392                    .set(worktree_repository::ActiveModel {
 393                        is_deleted: ActiveValue::Set(true),
 394                        scan_id: ActiveValue::Set(update.scan_id as i64),
 395                        ..Default::default()
 396                    })
 397                    .exec(&*tx)
 398                    .await?;
 399            }
 400
 401            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 402            Ok(connection_ids)
 403        })
 404        .await
 405    }
 406
 407    /// Updates the diagnostic summary for the given connection.
 408    pub async fn update_diagnostic_summary(
 409        &self,
 410        update: &proto::UpdateDiagnosticSummary,
 411        connection: ConnectionId,
 412    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 413        let project_id = ProjectId::from_proto(update.project_id);
 414        let worktree_id = update.worktree_id as i64;
 415        self.project_transaction(project_id, |tx| async move {
 416            let summary = update
 417                .summary
 418                .as_ref()
 419                .ok_or_else(|| anyhow!("invalid summary"))?;
 420
 421            // Ensure the update comes from the host.
 422            let project = project::Entity::find_by_id(project_id)
 423                .one(&*tx)
 424                .await?
 425                .ok_or_else(|| anyhow!("no such project"))?;
 426            if project.host_connection()? != connection {
 427                return Err(anyhow!("can't update a project hosted by someone else"))?;
 428            }
 429
 430            // Update summary.
 431            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 432                project_id: ActiveValue::set(project_id),
 433                worktree_id: ActiveValue::set(worktree_id),
 434                path: ActiveValue::set(summary.path.clone()),
 435                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 436                error_count: ActiveValue::set(summary.error_count as i32),
 437                warning_count: ActiveValue::set(summary.warning_count as i32),
 438            })
 439            .on_conflict(
 440                OnConflict::columns([
 441                    worktree_diagnostic_summary::Column::ProjectId,
 442                    worktree_diagnostic_summary::Column::WorktreeId,
 443                    worktree_diagnostic_summary::Column::Path,
 444                ])
 445                .update_columns([
 446                    worktree_diagnostic_summary::Column::LanguageServerId,
 447                    worktree_diagnostic_summary::Column::ErrorCount,
 448                    worktree_diagnostic_summary::Column::WarningCount,
 449                ])
 450                .to_owned(),
 451            )
 452            .exec(&*tx)
 453            .await?;
 454
 455            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 456            Ok(connection_ids)
 457        })
 458        .await
 459    }
 460
 461    /// Starts the language server for the given connection.
 462    pub async fn start_language_server(
 463        &self,
 464        update: &proto::StartLanguageServer,
 465        connection: ConnectionId,
 466    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 467        let project_id = ProjectId::from_proto(update.project_id);
 468        self.project_transaction(project_id, |tx| async move {
 469            let server = update
 470                .server
 471                .as_ref()
 472                .ok_or_else(|| anyhow!("invalid language server"))?;
 473
 474            // Ensure the update comes from the host.
 475            let project = project::Entity::find_by_id(project_id)
 476                .one(&*tx)
 477                .await?
 478                .ok_or_else(|| anyhow!("no such project"))?;
 479            if project.host_connection()? != connection {
 480                return Err(anyhow!("can't update a project hosted by someone else"))?;
 481            }
 482
 483            // Add the newly-started language server.
 484            language_server::Entity::insert(language_server::ActiveModel {
 485                project_id: ActiveValue::set(project_id),
 486                id: ActiveValue::set(server.id as i64),
 487                name: ActiveValue::set(server.name.clone()),
 488            })
 489            .on_conflict(
 490                OnConflict::columns([
 491                    language_server::Column::ProjectId,
 492                    language_server::Column::Id,
 493                ])
 494                .update_column(language_server::Column::Name)
 495                .to_owned(),
 496            )
 497            .exec(&*tx)
 498            .await?;
 499
 500            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 501            Ok(connection_ids)
 502        })
 503        .await
 504    }
 505
 506    /// Updates the worktree settings for the given connection.
 507    pub async fn update_worktree_settings(
 508        &self,
 509        update: &proto::UpdateWorktreeSettings,
 510        connection: ConnectionId,
 511    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 512        let project_id = ProjectId::from_proto(update.project_id);
 513        self.project_transaction(project_id, |tx| async move {
 514            // Ensure the update comes from the host.
 515            let project = project::Entity::find_by_id(project_id)
 516                .one(&*tx)
 517                .await?
 518                .ok_or_else(|| anyhow!("no such project"))?;
 519            if project.host_connection()? != connection {
 520                return Err(anyhow!("can't update a project hosted by someone else"))?;
 521            }
 522
 523            if let Some(content) = &update.content {
 524                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 525                    project_id: ActiveValue::Set(project_id),
 526                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 527                    path: ActiveValue::Set(update.path.clone()),
 528                    content: ActiveValue::Set(content.clone()),
 529                })
 530                .on_conflict(
 531                    OnConflict::columns([
 532                        worktree_settings_file::Column::ProjectId,
 533                        worktree_settings_file::Column::WorktreeId,
 534                        worktree_settings_file::Column::Path,
 535                    ])
 536                    .update_column(worktree_settings_file::Column::Content)
 537                    .to_owned(),
 538                )
 539                .exec(&*tx)
 540                .await?;
 541            } else {
 542                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 543                    project_id: ActiveValue::Set(project_id),
 544                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 545                    path: ActiveValue::Set(update.path.clone()),
 546                    ..Default::default()
 547                })
 548                .exec(&*tx)
 549                .await?;
 550            }
 551
 552            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 553            Ok(connection_ids)
 554        })
 555        .await
 556    }
 557
 558    /// Adds the given connection to the specified hosted project
 559    pub async fn join_hosted_project(
 560        &self,
 561        id: ProjectId,
 562        user_id: UserId,
 563        connection: ConnectionId,
 564    ) -> Result<(Project, ReplicaId)> {
 565        self.transaction(|tx| async move {
 566            let (project, hosted_project) = project::Entity::find_by_id(id)
 567                .find_also_related(hosted_project::Entity)
 568                .one(&*tx)
 569                .await?
 570                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 571
 572            let Some(hosted_project) = hosted_project else {
 573                return Err(anyhow!("project is not hosted"))?;
 574            };
 575
 576            let channel = channel::Entity::find_by_id(hosted_project.channel_id)
 577                .one(&*tx)
 578                .await?
 579                .ok_or_else(|| anyhow!("no such channel"))?;
 580
 581            let role = self
 582                .check_user_is_channel_participant(&channel, user_id, &tx)
 583                .await?;
 584
 585            self.join_project_internal(project, user_id, connection, role, &tx)
 586                .await
 587        })
 588        .await
 589    }
 590
 591    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 592        self.transaction(|tx| async move {
 593            Ok(project::Entity::find_by_id(id)
 594                .one(&*tx)
 595                .await?
 596                .ok_or_else(|| anyhow!("no such project"))?)
 597        })
 598        .await
 599    }
 600
 601    pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
 602        self.transaction(|tx| async move {
 603            Ok(project::Entity::find()
 604                .filter(project::Column::DevServerProjectId.eq(id))
 605                .one(&*tx)
 606                .await?
 607                .ok_or_else(|| anyhow!("no such project"))?)
 608        })
 609        .await
 610    }
 611
 612    /// Adds the given connection to the specified project
 613    /// in the current room.
 614    pub async fn join_project(
 615        &self,
 616        project_id: ProjectId,
 617        connection: ConnectionId,
 618        user_id: UserId,
 619    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 620        self.project_transaction(project_id, |tx| async move {
 621            let (project, role) = self
 622                .access_project(
 623                    project_id,
 624                    connection,
 625                    PrincipalId::UserId(user_id),
 626                    Capability::ReadOnly,
 627                    &tx,
 628                )
 629                .await?;
 630            self.join_project_internal(project, user_id, connection, role, &tx)
 631                .await
 632        })
 633        .await
 634    }
 635
 636    async fn join_project_internal(
 637        &self,
 638        project: project::Model,
 639        user_id: UserId,
 640        connection: ConnectionId,
 641        role: ChannelRole,
 642        tx: &DatabaseTransaction,
 643    ) -> Result<(Project, ReplicaId)> {
 644        let mut collaborators = project
 645            .find_related(project_collaborator::Entity)
 646            .all(tx)
 647            .await?;
 648        let replica_ids = collaborators
 649            .iter()
 650            .map(|c| c.replica_id)
 651            .collect::<HashSet<_>>();
 652        let mut replica_id = ReplicaId(1);
 653        while replica_ids.contains(&replica_id) {
 654            replica_id.0 += 1;
 655        }
 656        let new_collaborator = project_collaborator::ActiveModel {
 657            project_id: ActiveValue::set(project.id),
 658            connection_id: ActiveValue::set(connection.id as i32),
 659            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 660            user_id: ActiveValue::set(user_id),
 661            replica_id: ActiveValue::set(replica_id),
 662            is_host: ActiveValue::set(false),
 663            ..Default::default()
 664        }
 665        .insert(tx)
 666        .await?;
 667        collaborators.push(new_collaborator);
 668
 669        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 670        let mut worktrees = db_worktrees
 671            .into_iter()
 672            .map(|db_worktree| {
 673                (
 674                    db_worktree.id as u64,
 675                    Worktree {
 676                        id: db_worktree.id as u64,
 677                        abs_path: db_worktree.abs_path,
 678                        root_name: db_worktree.root_name,
 679                        visible: db_worktree.visible,
 680                        entries: Default::default(),
 681                        repository_entries: Default::default(),
 682                        diagnostic_summaries: Default::default(),
 683                        settings_files: Default::default(),
 684                        scan_id: db_worktree.scan_id as u64,
 685                        completed_scan_id: db_worktree.completed_scan_id as u64,
 686                    },
 687                )
 688            })
 689            .collect::<BTreeMap<_, _>>();
 690
 691        // Populate worktree entries.
 692        {
 693            let mut db_entries = worktree_entry::Entity::find()
 694                .filter(
 695                    Condition::all()
 696                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 697                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 698                )
 699                .stream(tx)
 700                .await?;
 701            while let Some(db_entry) = db_entries.next().await {
 702                let db_entry = db_entry?;
 703                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 704                    worktree.entries.push(proto::Entry {
 705                        id: db_entry.id as u64,
 706                        is_dir: db_entry.is_dir,
 707                        path: db_entry.path,
 708                        inode: db_entry.inode as u64,
 709                        mtime: Some(proto::Timestamp {
 710                            seconds: db_entry.mtime_seconds as u64,
 711                            nanos: db_entry.mtime_nanos as u32,
 712                        }),
 713                        is_symlink: db_entry.is_symlink,
 714                        is_ignored: db_entry.is_ignored,
 715                        is_external: db_entry.is_external,
 716                        git_status: db_entry.git_status.map(|status| status as i32),
 717                    });
 718                }
 719            }
 720        }
 721
 722        // Populate repository entries.
 723        {
 724            let mut db_repository_entries = worktree_repository::Entity::find()
 725                .filter(
 726                    Condition::all()
 727                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 728                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 729                )
 730                .stream(tx)
 731                .await?;
 732            while let Some(db_repository_entry) = db_repository_entries.next().await {
 733                let db_repository_entry = db_repository_entry?;
 734                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 735                {
 736                    worktree.repository_entries.insert(
 737                        db_repository_entry.work_directory_id as u64,
 738                        proto::RepositoryEntry {
 739                            work_directory_id: db_repository_entry.work_directory_id as u64,
 740                            branch: db_repository_entry.branch,
 741                        },
 742                    );
 743                }
 744            }
 745        }
 746
 747        // Populate worktree diagnostic summaries.
 748        {
 749            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 750                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 751                .stream(tx)
 752                .await?;
 753            while let Some(db_summary) = db_summaries.next().await {
 754                let db_summary = db_summary?;
 755                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 756                    worktree
 757                        .diagnostic_summaries
 758                        .push(proto::DiagnosticSummary {
 759                            path: db_summary.path,
 760                            language_server_id: db_summary.language_server_id as u64,
 761                            error_count: db_summary.error_count as u32,
 762                            warning_count: db_summary.warning_count as u32,
 763                        });
 764                }
 765            }
 766        }
 767
 768        // Populate worktree settings files
 769        {
 770            let mut db_settings_files = worktree_settings_file::Entity::find()
 771                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 772                .stream(tx)
 773                .await?;
 774            while let Some(db_settings_file) = db_settings_files.next().await {
 775                let db_settings_file = db_settings_file?;
 776                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 777                    worktree.settings_files.push(WorktreeSettingsFile {
 778                        path: db_settings_file.path,
 779                        content: db_settings_file.content,
 780                    });
 781                }
 782            }
 783        }
 784
 785        // Populate language servers.
 786        let language_servers = project
 787            .find_related(language_server::Entity)
 788            .all(tx)
 789            .await?;
 790
 791        let project = Project {
 792            id: project.id,
 793            role,
 794            collaborators: collaborators
 795                .into_iter()
 796                .map(|collaborator| ProjectCollaborator {
 797                    connection_id: collaborator.connection(),
 798                    user_id: collaborator.user_id,
 799                    replica_id: collaborator.replica_id,
 800                    is_host: collaborator.is_host,
 801                })
 802                .collect(),
 803            worktrees,
 804            language_servers: language_servers
 805                .into_iter()
 806                .map(|language_server| proto::LanguageServer {
 807                    id: language_server.id as u64,
 808                    name: language_server.name,
 809                })
 810                .collect(),
 811            dev_server_project_id: project.dev_server_project_id,
 812        };
 813        Ok((project, replica_id as ReplicaId))
 814    }
 815
 816    pub async fn leave_hosted_project(
 817        &self,
 818        project_id: ProjectId,
 819        connection: ConnectionId,
 820    ) -> Result<LeftProject> {
 821        self.transaction(|tx| async move {
 822            let result = project_collaborator::Entity::delete_many()
 823                .filter(
 824                    Condition::all()
 825                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 826                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 827                        .add(
 828                            project_collaborator::Column::ConnectionServerId
 829                                .eq(connection.owner_id as i32),
 830                        ),
 831                )
 832                .exec(&*tx)
 833                .await?;
 834            if result.rows_affected == 0 {
 835                return Err(anyhow!("not in the project"))?;
 836            }
 837
 838            let project = project::Entity::find_by_id(project_id)
 839                .one(&*tx)
 840                .await?
 841                .ok_or_else(|| anyhow!("no such project"))?;
 842            let collaborators = project
 843                .find_related(project_collaborator::Entity)
 844                .all(&*tx)
 845                .await?;
 846            let connection_ids = collaborators
 847                .into_iter()
 848                .map(|collaborator| collaborator.connection())
 849                .collect();
 850            Ok(LeftProject {
 851                id: project.id,
 852                connection_ids,
 853                should_unshare: false,
 854            })
 855        })
 856        .await
 857    }
 858
 859    /// Removes the given connection from the specified project.
 860    pub async fn leave_project(
 861        &self,
 862        project_id: ProjectId,
 863        connection: ConnectionId,
 864    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 865        self.project_transaction(project_id, |tx| async move {
 866            let result = project_collaborator::Entity::delete_many()
 867                .filter(
 868                    Condition::all()
 869                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 870                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 871                        .add(
 872                            project_collaborator::Column::ConnectionServerId
 873                                .eq(connection.owner_id as i32),
 874                        ),
 875                )
 876                .exec(&*tx)
 877                .await?;
 878            if result.rows_affected == 0 {
 879                Err(anyhow!("not a collaborator on this project"))?;
 880            }
 881
 882            let project = project::Entity::find_by_id(project_id)
 883                .one(&*tx)
 884                .await?
 885                .ok_or_else(|| anyhow!("no such project"))?;
 886            let collaborators = project
 887                .find_related(project_collaborator::Entity)
 888                .all(&*tx)
 889                .await?;
 890            let connection_ids: Vec<ConnectionId> = collaborators
 891                .into_iter()
 892                .map(|collaborator| collaborator.connection())
 893                .collect();
 894
 895            follower::Entity::delete_many()
 896                .filter(
 897                    Condition::any()
 898                        .add(
 899                            Condition::all()
 900                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 901                                .add(
 902                                    follower::Column::LeaderConnectionServerId
 903                                        .eq(connection.owner_id),
 904                                )
 905                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 906                        )
 907                        .add(
 908                            Condition::all()
 909                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 910                                .add(
 911                                    follower::Column::FollowerConnectionServerId
 912                                        .eq(connection.owner_id),
 913                                )
 914                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 915                        ),
 916                )
 917                .exec(&*tx)
 918                .await?;
 919
 920            let room = if let Some(room_id) = project.room_id {
 921                Some(self.get_room(room_id, &tx).await?)
 922            } else {
 923                None
 924            };
 925
 926            let left_project = LeftProject {
 927                id: project_id,
 928                should_unshare: connection == project.host_connection()?,
 929                connection_ids,
 930            };
 931            Ok((room, left_project))
 932        })
 933        .await
 934    }
 935
 936    pub async fn check_user_is_project_host(
 937        &self,
 938        project_id: ProjectId,
 939        connection_id: ConnectionId,
 940    ) -> Result<()> {
 941        self.project_transaction(project_id, |tx| async move {
 942            project::Entity::find()
 943                .filter(
 944                    Condition::all()
 945                        .add(project::Column::Id.eq(project_id))
 946                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 947                        .add(
 948                            project::Column::HostConnectionServerId
 949                                .eq(Some(connection_id.owner_id as i32)),
 950                        ),
 951                )
 952                .one(&*tx)
 953                .await?
 954                .ok_or_else(|| anyhow!("failed to read project host"))?;
 955
 956            Ok(())
 957        })
 958        .await
 959        .map(|guard| guard.into_inner())
 960    }
 961
 962    /// Returns the current project if the given user is authorized to access it with the specified capability.
 963    pub async fn access_project(
 964        &self,
 965        project_id: ProjectId,
 966        connection_id: ConnectionId,
 967        principal_id: PrincipalId,
 968        capability: Capability,
 969        tx: &DatabaseTransaction,
 970    ) -> Result<(project::Model, ChannelRole)> {
 971        let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
 972            .find_also_related(dev_server_project::Entity)
 973            .one(tx)
 974            .await?
 975            .ok_or_else(|| anyhow!("no such project"))?;
 976
 977        let user_id = match principal_id {
 978            PrincipalId::DevServerId(_) => {
 979                if project
 980                    .host_connection()
 981                    .is_ok_and(|connection| connection == connection_id)
 982                {
 983                    return Ok((project, ChannelRole::Admin));
 984                }
 985                return Err(anyhow!("not the project host"))?;
 986            }
 987            PrincipalId::UserId(user_id) => user_id,
 988        };
 989
 990        let role_from_room = if let Some(room_id) = project.room_id {
 991            room_participant::Entity::find()
 992                .filter(room_participant::Column::RoomId.eq(room_id))
 993                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 994                .one(tx)
 995                .await?
 996                .and_then(|participant| participant.role)
 997        } else {
 998            None
 999        };
1000        let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1001            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1002                .one(tx)
1003                .await?
1004                .ok_or_else(|| anyhow!("no such channel"))?;
1005            if user_id == dev_server.user_id {
1006                // If the user left the room "uncleanly" they may rejoin the
1007                // remote project before leave_room runs. IN that case kick
1008                // the project out of the room pre-emptively.
1009                if role_from_room.is_none() {
1010                    project = project::Entity::update(project::ActiveModel {
1011                        room_id: ActiveValue::Set(None),
1012                        ..project.into_active_model()
1013                    })
1014                    .exec(tx)
1015                    .await?;
1016                }
1017                Some(ChannelRole::Admin)
1018            } else {
1019                None
1020            }
1021        } else {
1022            None
1023        };
1024
1025        let role = role_from_dev_server
1026            .or(role_from_room)
1027            .unwrap_or(ChannelRole::Banned);
1028
1029        match capability {
1030            Capability::ReadWrite => {
1031                if !role.can_edit_projects() {
1032                    return Err(anyhow!("not authorized to edit projects"))?;
1033                }
1034            }
1035            Capability::ReadOnly => {
1036                if !role.can_read_projects() {
1037                    return Err(anyhow!("not authorized to read projects"))?;
1038                }
1039            }
1040        }
1041
1042        Ok((project, role))
1043    }
1044
1045    /// Returns the host connection for a read-only request to join a shared project.
1046    pub async fn host_for_read_only_project_request(
1047        &self,
1048        project_id: ProjectId,
1049        connection_id: ConnectionId,
1050        user_id: UserId,
1051    ) -> Result<ConnectionId> {
1052        self.project_transaction(project_id, |tx| async move {
1053            let (project, _) = self
1054                .access_project(
1055                    project_id,
1056                    connection_id,
1057                    PrincipalId::UserId(user_id),
1058                    Capability::ReadOnly,
1059                    &tx,
1060                )
1061                .await?;
1062            project.host_connection()
1063        })
1064        .await
1065        .map(|guard| guard.into_inner())
1066    }
1067
1068    /// Returns the host connection for a request to join a shared project.
1069    pub async fn host_for_mutating_project_request(
1070        &self,
1071        project_id: ProjectId,
1072        connection_id: ConnectionId,
1073        user_id: UserId,
1074    ) -> Result<ConnectionId> {
1075        self.project_transaction(project_id, |tx| async move {
1076            let (project, _) = self
1077                .access_project(
1078                    project_id,
1079                    connection_id,
1080                    PrincipalId::UserId(user_id),
1081                    Capability::ReadWrite,
1082                    &tx,
1083                )
1084                .await?;
1085            project.host_connection()
1086        })
1087        .await
1088        .map(|guard| guard.into_inner())
1089    }
1090
1091    pub async fn connections_for_buffer_update(
1092        &self,
1093        project_id: ProjectId,
1094        principal_id: PrincipalId,
1095        connection_id: ConnectionId,
1096        capability: Capability,
1097    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1098        self.project_transaction(project_id, |tx| async move {
1099            // Authorize
1100            let (project, _) = self
1101                .access_project(project_id, connection_id, principal_id, capability, &tx)
1102                .await?;
1103
1104            let host_connection_id = project.host_connection()?;
1105
1106            let collaborators = project_collaborator::Entity::find()
1107                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1108                .all(&*tx)
1109                .await?;
1110
1111            let guest_connection_ids = collaborators
1112                .into_iter()
1113                .filter_map(|collaborator| {
1114                    if collaborator.is_host {
1115                        None
1116                    } else {
1117                        Some(collaborator.connection())
1118                    }
1119                })
1120                .collect();
1121
1122            Ok((host_connection_id, guest_connection_ids))
1123        })
1124        .await
1125    }
1126
1127    /// Returns the connection IDs in the given project.
1128    ///
1129    /// The provided `connection_id` must also be a collaborator in the project,
1130    /// otherwise an error will be returned.
1131    pub async fn project_connection_ids(
1132        &self,
1133        project_id: ProjectId,
1134        connection_id: ConnectionId,
1135        exclude_dev_server: bool,
1136    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1137        self.project_transaction(project_id, |tx| async move {
1138            let project = project::Entity::find_by_id(project_id)
1139                .one(&*tx)
1140                .await?
1141                .ok_or_else(|| anyhow!("no such project"))?;
1142
1143            let mut collaborators = project_collaborator::Entity::find()
1144                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1145                .stream(&*tx)
1146                .await?;
1147
1148            let mut connection_ids = HashSet::default();
1149            if let Some(host_connection) = project.host_connection().log_err() {
1150                if !exclude_dev_server {
1151                    connection_ids.insert(host_connection);
1152                }
1153            }
1154
1155            while let Some(collaborator) = collaborators.next().await {
1156                let collaborator = collaborator?;
1157                connection_ids.insert(collaborator.connection());
1158            }
1159
1160            if connection_ids.contains(&connection_id)
1161                || Some(connection_id) == project.host_connection().ok()
1162            {
1163                Ok(connection_ids)
1164            } else {
1165                Err(anyhow!(
1166                    "can only send project updates to a project you're in"
1167                ))?
1168            }
1169        })
1170        .await
1171    }
1172
1173    async fn project_guest_connection_ids(
1174        &self,
1175        project_id: ProjectId,
1176        tx: &DatabaseTransaction,
1177    ) -> Result<Vec<ConnectionId>> {
1178        let mut collaborators = project_collaborator::Entity::find()
1179            .filter(
1180                project_collaborator::Column::ProjectId
1181                    .eq(project_id)
1182                    .and(project_collaborator::Column::IsHost.eq(false)),
1183            )
1184            .stream(tx)
1185            .await?;
1186
1187        let mut guest_connection_ids = Vec::new();
1188        while let Some(collaborator) = collaborators.next().await {
1189            let collaborator = collaborator?;
1190            guest_connection_ids.push(collaborator.connection());
1191        }
1192        Ok(guest_connection_ids)
1193    }
1194
1195    /// Returns the [`RoomId`] for the given project.
1196    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1197        self.transaction(|tx| async move {
1198            Ok(project::Entity::find_by_id(project_id)
1199                .one(&*tx)
1200                .await?
1201                .and_then(|project| project.room_id))
1202        })
1203        .await
1204    }
1205
1206    pub async fn check_room_participants(
1207        &self,
1208        room_id: RoomId,
1209        leader_id: ConnectionId,
1210        follower_id: ConnectionId,
1211    ) -> Result<()> {
1212        self.transaction(|tx| async move {
1213            use room_participant::Column;
1214
1215            let count = room_participant::Entity::find()
1216                .filter(
1217                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1218                        Condition::any()
1219                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1220                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1221                            ))
1222                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1223                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1224                            )),
1225                    ),
1226                )
1227                .count(&*tx)
1228                .await?;
1229
1230            if count < 2 {
1231                Err(anyhow!("not room participants"))?;
1232            }
1233
1234            Ok(())
1235        })
1236        .await
1237    }
1238
1239    /// Adds the given follower connection as a follower of the given leader connection.
1240    pub async fn follow(
1241        &self,
1242        room_id: RoomId,
1243        project_id: ProjectId,
1244        leader_connection: ConnectionId,
1245        follower_connection: ConnectionId,
1246    ) -> Result<TransactionGuard<proto::Room>> {
1247        self.room_transaction(room_id, |tx| async move {
1248            follower::ActiveModel {
1249                room_id: ActiveValue::set(room_id),
1250                project_id: ActiveValue::set(project_id),
1251                leader_connection_server_id: ActiveValue::set(ServerId(
1252                    leader_connection.owner_id as i32,
1253                )),
1254                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1255                follower_connection_server_id: ActiveValue::set(ServerId(
1256                    follower_connection.owner_id as i32,
1257                )),
1258                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1259                ..Default::default()
1260            }
1261            .insert(&*tx)
1262            .await?;
1263
1264            let room = self.get_room(room_id, &tx).await?;
1265            Ok(room)
1266        })
1267        .await
1268    }
1269
1270    /// Removes the given follower connection as a follower of the given leader connection.
1271    pub async fn unfollow(
1272        &self,
1273        room_id: RoomId,
1274        project_id: ProjectId,
1275        leader_connection: ConnectionId,
1276        follower_connection: ConnectionId,
1277    ) -> Result<TransactionGuard<proto::Room>> {
1278        self.room_transaction(room_id, |tx| async move {
1279            follower::Entity::delete_many()
1280                .filter(
1281                    Condition::all()
1282                        .add(follower::Column::RoomId.eq(room_id))
1283                        .add(follower::Column::ProjectId.eq(project_id))
1284                        .add(
1285                            follower::Column::LeaderConnectionServerId
1286                                .eq(leader_connection.owner_id),
1287                        )
1288                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1289                        .add(
1290                            follower::Column::FollowerConnectionServerId
1291                                .eq(follower_connection.owner_id),
1292                        )
1293                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1294                )
1295                .exec(&*tx)
1296                .await?;
1297
1298            let room = self.get_room(room_id, &tx).await?;
1299            Ok(room)
1300        })
1301        .await
1302    }
1303}