projects.rs

   1use util::ResultExt;
   2
   3use super::*;
   4
   5impl Database {
   6    /// Returns the count of all projects, excluding ones marked as admin.
   7    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   8        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
   9        enum QueryAs {
  10            Count,
  11        }
  12
  13        self.transaction(|tx| async move {
  14            Ok(project::Entity::find()
  15                .select_only()
  16                .column_as(project::Column::Id.count(), QueryAs::Count)
  17                .inner_join(user::Entity)
  18                .filter(user::Column::Admin.eq(false))
  19                .into_values::<_, QueryAs>()
  20                .one(&*tx)
  21                .await?
  22                .unwrap_or(0i64) as usize)
  23        })
  24        .await
  25    }
  26
  27    /// Shares a project with the given room.
  28    pub async fn share_project(
  29        &self,
  30        room_id: RoomId,
  31        connection: ConnectionId,
  32        worktrees: &[proto::WorktreeMetadata],
  33        dev_server_project_id: Option<DevServerProjectId>,
  34    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  35        self.room_transaction(room_id, |tx| async move {
  36            let participant = room_participant::Entity::find()
  37                .filter(
  38                    Condition::all()
  39                        .add(
  40                            room_participant::Column::AnsweringConnectionId
  41                                .eq(connection.id as i32),
  42                        )
  43                        .add(
  44                            room_participant::Column::AnsweringConnectionServerId
  45                                .eq(connection.owner_id as i32),
  46                        ),
  47                )
  48                .one(&*tx)
  49                .await?
  50                .ok_or_else(|| anyhow!("could not find participant"))?;
  51            if participant.room_id != room_id {
  52                return Err(anyhow!("shared project on unexpected room"))?;
  53            }
  54            if !participant
  55                .role
  56                .unwrap_or(ChannelRole::Member)
  57                .can_edit_projects()
  58            {
  59                return Err(anyhow!("guests cannot share projects"))?;
  60            }
  61
  62            if let Some(dev_server_project_id) = dev_server_project_id {
  63                let project = project::Entity::find()
  64                    .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
  65                    .one(&*tx)
  66                    .await?
  67                    .ok_or_else(|| anyhow!("no remote project"))?;
  68
  69                if project.room_id.is_some() {
  70                    return Err(anyhow!("project already shared"))?;
  71                };
  72
  73                let project = project::Entity::update(project::ActiveModel {
  74                    room_id: ActiveValue::Set(Some(room_id)),
  75                    ..project.into_active_model()
  76                })
  77                .exec(&*tx)
  78                .await?;
  79
  80                // todo! check user is a project-collaborator
  81                let room = self.get_room(room_id, &tx).await?;
  82                return Ok((project.id, room));
  83            }
  84
  85            let project = project::ActiveModel {
  86                room_id: ActiveValue::set(Some(participant.room_id)),
  87                host_user_id: ActiveValue::set(Some(participant.user_id)),
  88                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  89                host_connection_server_id: ActiveValue::set(Some(ServerId(
  90                    connection.owner_id as i32,
  91                ))),
  92                id: ActiveValue::NotSet,
  93                hosted_project_id: ActiveValue::Set(None),
  94                dev_server_project_id: ActiveValue::Set(None),
  95            }
  96            .insert(&*tx)
  97            .await?;
  98
  99            if !worktrees.is_empty() {
 100                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
 101                    worktree::ActiveModel {
 102                        id: ActiveValue::set(worktree.id as i64),
 103                        project_id: ActiveValue::set(project.id),
 104                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
 105                        root_name: ActiveValue::set(worktree.root_name.clone()),
 106                        visible: ActiveValue::set(worktree.visible),
 107                        scan_id: ActiveValue::set(0),
 108                        completed_scan_id: ActiveValue::set(0),
 109                    }
 110                }))
 111                .exec(&*tx)
 112                .await?;
 113            }
 114
 115            project_collaborator::ActiveModel {
 116                project_id: ActiveValue::set(project.id),
 117                connection_id: ActiveValue::set(connection.id as i32),
 118                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 119                user_id: ActiveValue::set(participant.user_id),
 120                replica_id: ActiveValue::set(ReplicaId(0)),
 121                is_host: ActiveValue::set(true),
 122                ..Default::default()
 123            }
 124            .insert(&*tx)
 125            .await?;
 126
 127            let room = self.get_room(room_id, &tx).await?;
 128            Ok((project.id, room))
 129        })
 130        .await
 131    }
 132
 133    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 134        self.weak_transaction(|tx| async move {
 135            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 136            Ok(())
 137        })
 138        .await
 139    }
 140
 141    /// Unshares the given project.
 142    pub async fn unshare_project(
 143        &self,
 144        project_id: ProjectId,
 145        connection: ConnectionId,
 146        user_id: Option<UserId>,
 147    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 148        self.project_transaction(project_id, |tx| async move {
 149            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 150            let project = project::Entity::find_by_id(project_id)
 151                .one(&*tx)
 152                .await?
 153                .ok_or_else(|| anyhow!("project not found"))?;
 154            let room = if let Some(room_id) = project.room_id {
 155                Some(self.get_room(room_id, &tx).await?)
 156            } else {
 157                None
 158            };
 159            if project.host_connection()? == connection {
 160                return Ok((true, room, guest_connection_ids));
 161            }
 162            if let Some(dev_server_project_id) = project.dev_server_project_id {
 163                if let Some(user_id) = user_id {
 164                    if user_id
 165                        != self
 166                            .owner_for_dev_server_project(dev_server_project_id, &tx)
 167                            .await?
 168                    {
 169                        Err(anyhow!("cannot unshare a project hosted by another user"))?
 170                    }
 171                    project::Entity::update(project::ActiveModel {
 172                        room_id: ActiveValue::Set(None),
 173                        ..project.into_active_model()
 174                    })
 175                    .exec(&*tx)
 176                    .await?;
 177                    return Ok((false, room, guest_connection_ids));
 178                }
 179            }
 180
 181            Err(anyhow!("cannot unshare a project hosted by another user"))?
 182        })
 183        .await
 184    }
 185
 186    /// Updates the worktrees associated with the given project.
 187    pub async fn update_project(
 188        &self,
 189        project_id: ProjectId,
 190        connection: ConnectionId,
 191        worktrees: &[proto::WorktreeMetadata],
 192    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 193        self.project_transaction(project_id, |tx| async move {
 194            let project = project::Entity::find_by_id(project_id)
 195                .filter(
 196                    Condition::all()
 197                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 198                        .add(
 199                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 200                        ),
 201                )
 202                .one(&*tx)
 203                .await?
 204                .ok_or_else(|| anyhow!("no such project"))?;
 205
 206            self.update_project_worktrees(project.id, worktrees, &tx)
 207                .await?;
 208
 209            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 210
 211            let room = if let Some(room_id) = project.room_id {
 212                Some(self.get_room(room_id, &tx).await?)
 213            } else {
 214                None
 215            };
 216
 217            Ok((room, guest_connection_ids))
 218        })
 219        .await
 220    }
 221
 222    pub(in crate::db) async fn update_project_worktrees(
 223        &self,
 224        project_id: ProjectId,
 225        worktrees: &[proto::WorktreeMetadata],
 226        tx: &DatabaseTransaction,
 227    ) -> Result<()> {
 228        if !worktrees.is_empty() {
 229            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 230                id: ActiveValue::set(worktree.id as i64),
 231                project_id: ActiveValue::set(project_id),
 232                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 233                root_name: ActiveValue::set(worktree.root_name.clone()),
 234                visible: ActiveValue::set(worktree.visible),
 235                scan_id: ActiveValue::set(0),
 236                completed_scan_id: ActiveValue::set(0),
 237            }))
 238            .on_conflict(
 239                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 240                    .update_column(worktree::Column::RootName)
 241                    .to_owned(),
 242            )
 243            .exec(tx)
 244            .await?;
 245        }
 246
 247        worktree::Entity::delete_many()
 248            .filter(worktree::Column::ProjectId.eq(project_id).and(
 249                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 250            ))
 251            .exec(tx)
 252            .await?;
 253
 254        Ok(())
 255    }
 256
 257    pub async fn update_worktree(
 258        &self,
 259        update: &proto::UpdateWorktree,
 260        connection: ConnectionId,
 261    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 262        let project_id = ProjectId::from_proto(update.project_id);
 263        let worktree_id = update.worktree_id as i64;
 264        self.project_transaction(project_id, |tx| async move {
 265            // Ensure the update comes from the host.
 266            let _project = project::Entity::find_by_id(project_id)
 267                .filter(
 268                    Condition::all()
 269                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 270                        .add(
 271                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 272                        ),
 273                )
 274                .one(&*tx)
 275                .await?
 276                .ok_or_else(|| anyhow!("no such project"))?;
 277
 278            // Update metadata.
 279            worktree::Entity::update(worktree::ActiveModel {
 280                id: ActiveValue::set(worktree_id),
 281                project_id: ActiveValue::set(project_id),
 282                root_name: ActiveValue::set(update.root_name.clone()),
 283                scan_id: ActiveValue::set(update.scan_id as i64),
 284                completed_scan_id: if update.is_last_update {
 285                    ActiveValue::set(update.scan_id as i64)
 286                } else {
 287                    ActiveValue::default()
 288                },
 289                abs_path: ActiveValue::set(update.abs_path.clone()),
 290                ..Default::default()
 291            })
 292            .exec(&*tx)
 293            .await?;
 294
 295            if !update.updated_entries.is_empty() {
 296                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 297                    let mtime = entry.mtime.clone().unwrap_or_default();
 298                    worktree_entry::ActiveModel {
 299                        project_id: ActiveValue::set(project_id),
 300                        worktree_id: ActiveValue::set(worktree_id),
 301                        id: ActiveValue::set(entry.id as i64),
 302                        is_dir: ActiveValue::set(entry.is_dir),
 303                        path: ActiveValue::set(entry.path.clone()),
 304                        inode: ActiveValue::set(entry.inode as i64),
 305                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 306                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 307                        is_symlink: ActiveValue::set(entry.is_symlink),
 308                        is_ignored: ActiveValue::set(entry.is_ignored),
 309                        is_external: ActiveValue::set(entry.is_external),
 310                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 311                        is_deleted: ActiveValue::set(false),
 312                        scan_id: ActiveValue::set(update.scan_id as i64),
 313                    }
 314                }))
 315                .on_conflict(
 316                    OnConflict::columns([
 317                        worktree_entry::Column::ProjectId,
 318                        worktree_entry::Column::WorktreeId,
 319                        worktree_entry::Column::Id,
 320                    ])
 321                    .update_columns([
 322                        worktree_entry::Column::IsDir,
 323                        worktree_entry::Column::Path,
 324                        worktree_entry::Column::Inode,
 325                        worktree_entry::Column::MtimeSeconds,
 326                        worktree_entry::Column::MtimeNanos,
 327                        worktree_entry::Column::IsSymlink,
 328                        worktree_entry::Column::IsIgnored,
 329                        worktree_entry::Column::GitStatus,
 330                        worktree_entry::Column::ScanId,
 331                    ])
 332                    .to_owned(),
 333                )
 334                .exec(&*tx)
 335                .await?;
 336            }
 337
 338            if !update.removed_entries.is_empty() {
 339                worktree_entry::Entity::update_many()
 340                    .filter(
 341                        worktree_entry::Column::ProjectId
 342                            .eq(project_id)
 343                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 344                            .and(
 345                                worktree_entry::Column::Id
 346                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 347                            ),
 348                    )
 349                    .set(worktree_entry::ActiveModel {
 350                        is_deleted: ActiveValue::Set(true),
 351                        scan_id: ActiveValue::Set(update.scan_id as i64),
 352                        ..Default::default()
 353                    })
 354                    .exec(&*tx)
 355                    .await?;
 356            }
 357
 358            if !update.updated_repositories.is_empty() {
 359                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 360                    |repository| worktree_repository::ActiveModel {
 361                        project_id: ActiveValue::set(project_id),
 362                        worktree_id: ActiveValue::set(worktree_id),
 363                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 364                        scan_id: ActiveValue::set(update.scan_id as i64),
 365                        branch: ActiveValue::set(repository.branch.clone()),
 366                        is_deleted: ActiveValue::set(false),
 367                    },
 368                ))
 369                .on_conflict(
 370                    OnConflict::columns([
 371                        worktree_repository::Column::ProjectId,
 372                        worktree_repository::Column::WorktreeId,
 373                        worktree_repository::Column::WorkDirectoryId,
 374                    ])
 375                    .update_columns([
 376                        worktree_repository::Column::ScanId,
 377                        worktree_repository::Column::Branch,
 378                    ])
 379                    .to_owned(),
 380                )
 381                .exec(&*tx)
 382                .await?;
 383            }
 384
 385            if !update.removed_repositories.is_empty() {
 386                worktree_repository::Entity::update_many()
 387                    .filter(
 388                        worktree_repository::Column::ProjectId
 389                            .eq(project_id)
 390                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 391                            .and(
 392                                worktree_repository::Column::WorkDirectoryId
 393                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 394                            ),
 395                    )
 396                    .set(worktree_repository::ActiveModel {
 397                        is_deleted: ActiveValue::Set(true),
 398                        scan_id: ActiveValue::Set(update.scan_id as i64),
 399                        ..Default::default()
 400                    })
 401                    .exec(&*tx)
 402                    .await?;
 403            }
 404
 405            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 406            Ok(connection_ids)
 407        })
 408        .await
 409    }
 410
 411    /// Updates the diagnostic summary for the given connection.
 412    pub async fn update_diagnostic_summary(
 413        &self,
 414        update: &proto::UpdateDiagnosticSummary,
 415        connection: ConnectionId,
 416    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 417        let project_id = ProjectId::from_proto(update.project_id);
 418        let worktree_id = update.worktree_id as i64;
 419        self.project_transaction(project_id, |tx| async move {
 420            let summary = update
 421                .summary
 422                .as_ref()
 423                .ok_or_else(|| anyhow!("invalid summary"))?;
 424
 425            // Ensure the update comes from the host.
 426            let project = project::Entity::find_by_id(project_id)
 427                .one(&*tx)
 428                .await?
 429                .ok_or_else(|| anyhow!("no such project"))?;
 430            if project.host_connection()? != connection {
 431                return Err(anyhow!("can't update a project hosted by someone else"))?;
 432            }
 433
 434            // Update summary.
 435            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 436                project_id: ActiveValue::set(project_id),
 437                worktree_id: ActiveValue::set(worktree_id),
 438                path: ActiveValue::set(summary.path.clone()),
 439                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 440                error_count: ActiveValue::set(summary.error_count as i32),
 441                warning_count: ActiveValue::set(summary.warning_count as i32),
 442            })
 443            .on_conflict(
 444                OnConflict::columns([
 445                    worktree_diagnostic_summary::Column::ProjectId,
 446                    worktree_diagnostic_summary::Column::WorktreeId,
 447                    worktree_diagnostic_summary::Column::Path,
 448                ])
 449                .update_columns([
 450                    worktree_diagnostic_summary::Column::LanguageServerId,
 451                    worktree_diagnostic_summary::Column::ErrorCount,
 452                    worktree_diagnostic_summary::Column::WarningCount,
 453                ])
 454                .to_owned(),
 455            )
 456            .exec(&*tx)
 457            .await?;
 458
 459            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 460            Ok(connection_ids)
 461        })
 462        .await
 463    }
 464
 465    /// Starts the language server for the given connection.
 466    pub async fn start_language_server(
 467        &self,
 468        update: &proto::StartLanguageServer,
 469        connection: ConnectionId,
 470    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 471        let project_id = ProjectId::from_proto(update.project_id);
 472        self.project_transaction(project_id, |tx| async move {
 473            let server = update
 474                .server
 475                .as_ref()
 476                .ok_or_else(|| anyhow!("invalid language server"))?;
 477
 478            // Ensure the update comes from the host.
 479            let project = project::Entity::find_by_id(project_id)
 480                .one(&*tx)
 481                .await?
 482                .ok_or_else(|| anyhow!("no such project"))?;
 483            if project.host_connection()? != connection {
 484                return Err(anyhow!("can't update a project hosted by someone else"))?;
 485            }
 486
 487            // Add the newly-started language server.
 488            language_server::Entity::insert(language_server::ActiveModel {
 489                project_id: ActiveValue::set(project_id),
 490                id: ActiveValue::set(server.id as i64),
 491                name: ActiveValue::set(server.name.clone()),
 492            })
 493            .on_conflict(
 494                OnConflict::columns([
 495                    language_server::Column::ProjectId,
 496                    language_server::Column::Id,
 497                ])
 498                .update_column(language_server::Column::Name)
 499                .to_owned(),
 500            )
 501            .exec(&*tx)
 502            .await?;
 503
 504            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 505            Ok(connection_ids)
 506        })
 507        .await
 508    }
 509
 510    /// Updates the worktree settings for the given connection.
 511    pub async fn update_worktree_settings(
 512        &self,
 513        update: &proto::UpdateWorktreeSettings,
 514        connection: ConnectionId,
 515    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 516        let project_id = ProjectId::from_proto(update.project_id);
 517        self.project_transaction(project_id, |tx| async move {
 518            // Ensure the update comes from the host.
 519            let project = project::Entity::find_by_id(project_id)
 520                .one(&*tx)
 521                .await?
 522                .ok_or_else(|| anyhow!("no such project"))?;
 523            if project.host_connection()? != connection {
 524                return Err(anyhow!("can't update a project hosted by someone else"))?;
 525            }
 526
 527            if let Some(content) = &update.content {
 528                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 529                    project_id: ActiveValue::Set(project_id),
 530                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 531                    path: ActiveValue::Set(update.path.clone()),
 532                    content: ActiveValue::Set(content.clone()),
 533                })
 534                .on_conflict(
 535                    OnConflict::columns([
 536                        worktree_settings_file::Column::ProjectId,
 537                        worktree_settings_file::Column::WorktreeId,
 538                        worktree_settings_file::Column::Path,
 539                    ])
 540                    .update_column(worktree_settings_file::Column::Content)
 541                    .to_owned(),
 542                )
 543                .exec(&*tx)
 544                .await?;
 545            } else {
 546                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 547                    project_id: ActiveValue::Set(project_id),
 548                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 549                    path: ActiveValue::Set(update.path.clone()),
 550                    ..Default::default()
 551                })
 552                .exec(&*tx)
 553                .await?;
 554            }
 555
 556            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 557            Ok(connection_ids)
 558        })
 559        .await
 560    }
 561
 562    /// Adds the given connection to the specified hosted project
 563    pub async fn join_hosted_project(
 564        &self,
 565        id: ProjectId,
 566        user_id: UserId,
 567        connection: ConnectionId,
 568    ) -> Result<(Project, ReplicaId)> {
 569        self.transaction(|tx| async move {
 570            let (project, hosted_project) = project::Entity::find_by_id(id)
 571                .find_also_related(hosted_project::Entity)
 572                .one(&*tx)
 573                .await?
 574                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 575
 576            let Some(hosted_project) = hosted_project else {
 577                return Err(anyhow!("project is not hosted"))?;
 578            };
 579
 580            let channel = channel::Entity::find_by_id(hosted_project.channel_id)
 581                .one(&*tx)
 582                .await?
 583                .ok_or_else(|| anyhow!("no such channel"))?;
 584
 585            let role = self
 586                .check_user_is_channel_participant(&channel, user_id, &tx)
 587                .await?;
 588
 589            self.join_project_internal(project, user_id, connection, role, &tx)
 590                .await
 591        })
 592        .await
 593    }
 594
 595    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 596        self.transaction(|tx| async move {
 597            Ok(project::Entity::find_by_id(id)
 598                .one(&*tx)
 599                .await?
 600                .ok_or_else(|| anyhow!("no such project"))?)
 601        })
 602        .await
 603    }
 604
 605    pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
 606        self.transaction(|tx| async move {
 607            Ok(project::Entity::find()
 608                .filter(project::Column::DevServerProjectId.eq(id))
 609                .one(&*tx)
 610                .await?
 611                .ok_or_else(|| anyhow!("no such project"))?)
 612        })
 613        .await
 614    }
 615
 616    /// Adds the given connection to the specified project
 617    /// in the current room.
 618    pub async fn join_project(
 619        &self,
 620        project_id: ProjectId,
 621        connection: ConnectionId,
 622        user_id: UserId,
 623    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 624        self.project_transaction(project_id, |tx| async move {
 625            let (project, role) = self
 626                .access_project(
 627                    project_id,
 628                    connection,
 629                    PrincipalId::UserId(user_id),
 630                    Capability::ReadOnly,
 631                    &tx,
 632                )
 633                .await?;
 634            self.join_project_internal(project, user_id, connection, role, &tx)
 635                .await
 636        })
 637        .await
 638    }
 639
 640    async fn join_project_internal(
 641        &self,
 642        project: project::Model,
 643        user_id: UserId,
 644        connection: ConnectionId,
 645        role: ChannelRole,
 646        tx: &DatabaseTransaction,
 647    ) -> Result<(Project, ReplicaId)> {
 648        let mut collaborators = project
 649            .find_related(project_collaborator::Entity)
 650            .all(tx)
 651            .await?;
 652        let replica_ids = collaborators
 653            .iter()
 654            .map(|c| c.replica_id)
 655            .collect::<HashSet<_>>();
 656        let mut replica_id = ReplicaId(1);
 657        while replica_ids.contains(&replica_id) {
 658            replica_id.0 += 1;
 659        }
 660        let new_collaborator = project_collaborator::ActiveModel {
 661            project_id: ActiveValue::set(project.id),
 662            connection_id: ActiveValue::set(connection.id as i32),
 663            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 664            user_id: ActiveValue::set(user_id),
 665            replica_id: ActiveValue::set(replica_id),
 666            is_host: ActiveValue::set(false),
 667            ..Default::default()
 668        }
 669        .insert(tx)
 670        .await?;
 671        collaborators.push(new_collaborator);
 672
 673        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 674        let mut worktrees = db_worktrees
 675            .into_iter()
 676            .map(|db_worktree| {
 677                (
 678                    db_worktree.id as u64,
 679                    Worktree {
 680                        id: db_worktree.id as u64,
 681                        abs_path: db_worktree.abs_path,
 682                        root_name: db_worktree.root_name,
 683                        visible: db_worktree.visible,
 684                        entries: Default::default(),
 685                        repository_entries: Default::default(),
 686                        diagnostic_summaries: Default::default(),
 687                        settings_files: Default::default(),
 688                        scan_id: db_worktree.scan_id as u64,
 689                        completed_scan_id: db_worktree.completed_scan_id as u64,
 690                    },
 691                )
 692            })
 693            .collect::<BTreeMap<_, _>>();
 694
 695        // Populate worktree entries.
 696        {
 697            let mut db_entries = worktree_entry::Entity::find()
 698                .filter(
 699                    Condition::all()
 700                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 701                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 702                )
 703                .stream(tx)
 704                .await?;
 705            while let Some(db_entry) = db_entries.next().await {
 706                let db_entry = db_entry?;
 707                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 708                    worktree.entries.push(proto::Entry {
 709                        id: db_entry.id as u64,
 710                        is_dir: db_entry.is_dir,
 711                        path: db_entry.path,
 712                        inode: db_entry.inode as u64,
 713                        mtime: Some(proto::Timestamp {
 714                            seconds: db_entry.mtime_seconds as u64,
 715                            nanos: db_entry.mtime_nanos as u32,
 716                        }),
 717                        is_symlink: db_entry.is_symlink,
 718                        is_ignored: db_entry.is_ignored,
 719                        is_external: db_entry.is_external,
 720                        git_status: db_entry.git_status.map(|status| status as i32),
 721                    });
 722                }
 723            }
 724        }
 725
 726        // Populate repository entries.
 727        {
 728            let mut db_repository_entries = worktree_repository::Entity::find()
 729                .filter(
 730                    Condition::all()
 731                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 732                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 733                )
 734                .stream(tx)
 735                .await?;
 736            while let Some(db_repository_entry) = db_repository_entries.next().await {
 737                let db_repository_entry = db_repository_entry?;
 738                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 739                {
 740                    worktree.repository_entries.insert(
 741                        db_repository_entry.work_directory_id as u64,
 742                        proto::RepositoryEntry {
 743                            work_directory_id: db_repository_entry.work_directory_id as u64,
 744                            branch: db_repository_entry.branch,
 745                        },
 746                    );
 747                }
 748            }
 749        }
 750
 751        // Populate worktree diagnostic summaries.
 752        {
 753            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 754                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 755                .stream(tx)
 756                .await?;
 757            while let Some(db_summary) = db_summaries.next().await {
 758                let db_summary = db_summary?;
 759                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 760                    worktree
 761                        .diagnostic_summaries
 762                        .push(proto::DiagnosticSummary {
 763                            path: db_summary.path,
 764                            language_server_id: db_summary.language_server_id as u64,
 765                            error_count: db_summary.error_count as u32,
 766                            warning_count: db_summary.warning_count as u32,
 767                        });
 768                }
 769            }
 770        }
 771
 772        // Populate worktree settings files
 773        {
 774            let mut db_settings_files = worktree_settings_file::Entity::find()
 775                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 776                .stream(tx)
 777                .await?;
 778            while let Some(db_settings_file) = db_settings_files.next().await {
 779                let db_settings_file = db_settings_file?;
 780                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 781                    worktree.settings_files.push(WorktreeSettingsFile {
 782                        path: db_settings_file.path,
 783                        content: db_settings_file.content,
 784                    });
 785                }
 786            }
 787        }
 788
 789        // Populate language servers.
 790        let language_servers = project
 791            .find_related(language_server::Entity)
 792            .all(tx)
 793            .await?;
 794
 795        let project = Project {
 796            id: project.id,
 797            role,
 798            collaborators: collaborators
 799                .into_iter()
 800                .map(|collaborator| ProjectCollaborator {
 801                    connection_id: collaborator.connection(),
 802                    user_id: collaborator.user_id,
 803                    replica_id: collaborator.replica_id,
 804                    is_host: collaborator.is_host,
 805                })
 806                .collect(),
 807            worktrees,
 808            language_servers: language_servers
 809                .into_iter()
 810                .map(|language_server| proto::LanguageServer {
 811                    id: language_server.id as u64,
 812                    name: language_server.name,
 813                })
 814                .collect(),
 815            dev_server_project_id: project.dev_server_project_id,
 816        };
 817        Ok((project, replica_id as ReplicaId))
 818    }
 819
 820    pub async fn leave_hosted_project(
 821        &self,
 822        project_id: ProjectId,
 823        connection: ConnectionId,
 824    ) -> Result<LeftProject> {
 825        self.transaction(|tx| async move {
 826            let result = project_collaborator::Entity::delete_many()
 827                .filter(
 828                    Condition::all()
 829                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 830                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 831                        .add(
 832                            project_collaborator::Column::ConnectionServerId
 833                                .eq(connection.owner_id as i32),
 834                        ),
 835                )
 836                .exec(&*tx)
 837                .await?;
 838            if result.rows_affected == 0 {
 839                return Err(anyhow!("not in the project"))?;
 840            }
 841
 842            let project = project::Entity::find_by_id(project_id)
 843                .one(&*tx)
 844                .await?
 845                .ok_or_else(|| anyhow!("no such project"))?;
 846            let collaborators = project
 847                .find_related(project_collaborator::Entity)
 848                .all(&*tx)
 849                .await?;
 850            let connection_ids = collaborators
 851                .into_iter()
 852                .map(|collaborator| collaborator.connection())
 853                .collect();
 854            Ok(LeftProject {
 855                id: project.id,
 856                connection_ids,
 857                should_unshare: false,
 858            })
 859        })
 860        .await
 861    }
 862
 863    /// Removes the given connection from the specified project.
 864    pub async fn leave_project(
 865        &self,
 866        project_id: ProjectId,
 867        connection: ConnectionId,
 868    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
 869        self.project_transaction(project_id, |tx| async move {
 870            let result = project_collaborator::Entity::delete_many()
 871                .filter(
 872                    Condition::all()
 873                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 874                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 875                        .add(
 876                            project_collaborator::Column::ConnectionServerId
 877                                .eq(connection.owner_id as i32),
 878                        ),
 879                )
 880                .exec(&*tx)
 881                .await?;
 882            if result.rows_affected == 0 {
 883                Err(anyhow!("not a collaborator on this project"))?;
 884            }
 885
 886            let project = project::Entity::find_by_id(project_id)
 887                .one(&*tx)
 888                .await?
 889                .ok_or_else(|| anyhow!("no such project"))?;
 890            let collaborators = project
 891                .find_related(project_collaborator::Entity)
 892                .all(&*tx)
 893                .await?;
 894            let connection_ids: Vec<ConnectionId> = collaborators
 895                .into_iter()
 896                .map(|collaborator| collaborator.connection())
 897                .collect();
 898
 899            follower::Entity::delete_many()
 900                .filter(
 901                    Condition::any()
 902                        .add(
 903                            Condition::all()
 904                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 905                                .add(
 906                                    follower::Column::LeaderConnectionServerId
 907                                        .eq(connection.owner_id),
 908                                )
 909                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 910                        )
 911                        .add(
 912                            Condition::all()
 913                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 914                                .add(
 915                                    follower::Column::FollowerConnectionServerId
 916                                        .eq(connection.owner_id),
 917                                )
 918                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 919                        ),
 920                )
 921                .exec(&*tx)
 922                .await?;
 923
 924            let room = if let Some(room_id) = project.room_id {
 925                Some(self.get_room(room_id, &tx).await?)
 926            } else {
 927                None
 928            };
 929
 930            let left_project = LeftProject {
 931                id: project_id,
 932                should_unshare: connection == project.host_connection()?,
 933                connection_ids,
 934            };
 935            Ok((room, left_project))
 936        })
 937        .await
 938    }
 939
 940    pub async fn check_user_is_project_host(
 941        &self,
 942        project_id: ProjectId,
 943        connection_id: ConnectionId,
 944    ) -> Result<()> {
 945        self.project_transaction(project_id, |tx| async move {
 946            project::Entity::find()
 947                .filter(
 948                    Condition::all()
 949                        .add(project::Column::Id.eq(project_id))
 950                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
 951                        .add(
 952                            project::Column::HostConnectionServerId
 953                                .eq(Some(connection_id.owner_id as i32)),
 954                        ),
 955                )
 956                .one(&*tx)
 957                .await?
 958                .ok_or_else(|| anyhow!("failed to read project host"))?;
 959
 960            Ok(())
 961        })
 962        .await
 963        .map(|guard| guard.into_inner())
 964    }
 965
 966    /// Returns the current project if the given user is authorized to access it with the specified capability.
 967    pub async fn access_project(
 968        &self,
 969        project_id: ProjectId,
 970        connection_id: ConnectionId,
 971        principal_id: PrincipalId,
 972        capability: Capability,
 973        tx: &DatabaseTransaction,
 974    ) -> Result<(project::Model, ChannelRole)> {
 975        let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
 976            .find_also_related(dev_server_project::Entity)
 977            .one(tx)
 978            .await?
 979            .ok_or_else(|| anyhow!("no such project"))?;
 980
 981        let user_id = match principal_id {
 982            PrincipalId::DevServerId(_) => {
 983                if project
 984                    .host_connection()
 985                    .is_ok_and(|connection| connection == connection_id)
 986                {
 987                    return Ok((project, ChannelRole::Admin));
 988                }
 989                return Err(anyhow!("not the project host"))?;
 990            }
 991            PrincipalId::UserId(user_id) => user_id,
 992        };
 993
 994        let role_from_room = if let Some(room_id) = project.room_id {
 995            room_participant::Entity::find()
 996                .filter(room_participant::Column::RoomId.eq(room_id))
 997                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 998                .one(tx)
 999                .await?
1000                .and_then(|participant| participant.role)
1001        } else {
1002            None
1003        };
1004        let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1005            let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1006                .one(tx)
1007                .await?
1008                .ok_or_else(|| anyhow!("no such channel"))?;
1009            if user_id == dev_server.user_id {
1010                // If the user left the room "uncleanly" they may rejoin the
1011                // remote project before leave_room runs. IN that case kick
1012                // the project out of the room pre-emptively.
1013                if role_from_room.is_none() {
1014                    project = project::Entity::update(project::ActiveModel {
1015                        room_id: ActiveValue::Set(None),
1016                        ..project.into_active_model()
1017                    })
1018                    .exec(tx)
1019                    .await?;
1020                }
1021                Some(ChannelRole::Admin)
1022            } else {
1023                None
1024            }
1025        } else {
1026            None
1027        };
1028
1029        let role = role_from_dev_server
1030            .or(role_from_room)
1031            .unwrap_or(ChannelRole::Banned);
1032
1033        match capability {
1034            Capability::ReadWrite => {
1035                if !role.can_edit_projects() {
1036                    return Err(anyhow!("not authorized to edit projects"))?;
1037                }
1038            }
1039            Capability::ReadOnly => {
1040                if !role.can_read_projects() {
1041                    return Err(anyhow!("not authorized to read projects"))?;
1042                }
1043            }
1044        }
1045
1046        Ok((project, role))
1047    }
1048
1049    /// Returns the host connection for a read-only request to join a shared project.
1050    pub async fn host_for_read_only_project_request(
1051        &self,
1052        project_id: ProjectId,
1053        connection_id: ConnectionId,
1054        user_id: UserId,
1055    ) -> Result<ConnectionId> {
1056        self.project_transaction(project_id, |tx| async move {
1057            let (project, _) = self
1058                .access_project(
1059                    project_id,
1060                    connection_id,
1061                    PrincipalId::UserId(user_id),
1062                    Capability::ReadOnly,
1063                    &tx,
1064                )
1065                .await?;
1066            project.host_connection()
1067        })
1068        .await
1069        .map(|guard| guard.into_inner())
1070    }
1071
1072    /// Returns the host connection for a request to join a shared project.
1073    pub async fn host_for_mutating_project_request(
1074        &self,
1075        project_id: ProjectId,
1076        connection_id: ConnectionId,
1077        user_id: UserId,
1078    ) -> Result<ConnectionId> {
1079        self.project_transaction(project_id, |tx| async move {
1080            let (project, _) = self
1081                .access_project(
1082                    project_id,
1083                    connection_id,
1084                    PrincipalId::UserId(user_id),
1085                    Capability::ReadWrite,
1086                    &tx,
1087                )
1088                .await?;
1089            project.host_connection()
1090        })
1091        .await
1092        .map(|guard| guard.into_inner())
1093    }
1094
1095    pub async fn connections_for_buffer_update(
1096        &self,
1097        project_id: ProjectId,
1098        principal_id: PrincipalId,
1099        connection_id: ConnectionId,
1100        capability: Capability,
1101    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1102        self.project_transaction(project_id, |tx| async move {
1103            // Authorize
1104            let (project, _) = self
1105                .access_project(project_id, connection_id, principal_id, capability, &tx)
1106                .await?;
1107
1108            let host_connection_id = project.host_connection()?;
1109
1110            let collaborators = project_collaborator::Entity::find()
1111                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1112                .all(&*tx)
1113                .await?;
1114
1115            let guest_connection_ids = collaborators
1116                .into_iter()
1117                .filter_map(|collaborator| {
1118                    if collaborator.is_host {
1119                        None
1120                    } else {
1121                        Some(collaborator.connection())
1122                    }
1123                })
1124                .collect();
1125
1126            Ok((host_connection_id, guest_connection_ids))
1127        })
1128        .await
1129    }
1130
1131    /// Returns the connection IDs in the given project.
1132    ///
1133    /// The provided `connection_id` must also be a collaborator in the project,
1134    /// otherwise an error will be returned.
1135    pub async fn project_connection_ids(
1136        &self,
1137        project_id: ProjectId,
1138        connection_id: ConnectionId,
1139        exclude_dev_server: bool,
1140    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1141        self.project_transaction(project_id, |tx| async move {
1142            let project = project::Entity::find_by_id(project_id)
1143                .one(&*tx)
1144                .await?
1145                .ok_or_else(|| anyhow!("no such project"))?;
1146
1147            let mut collaborators = project_collaborator::Entity::find()
1148                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1149                .stream(&*tx)
1150                .await?;
1151
1152            let mut connection_ids = HashSet::default();
1153            if let Some(host_connection) = project.host_connection().log_err() {
1154                if !exclude_dev_server {
1155                    connection_ids.insert(host_connection);
1156                }
1157            }
1158
1159            while let Some(collaborator) = collaborators.next().await {
1160                let collaborator = collaborator?;
1161                connection_ids.insert(collaborator.connection());
1162            }
1163
1164            if connection_ids.contains(&connection_id)
1165                || Some(connection_id) == project.host_connection().ok()
1166            {
1167                Ok(connection_ids)
1168            } else {
1169                Err(anyhow!(
1170                    "can only send project updates to a project you're in"
1171                ))?
1172            }
1173        })
1174        .await
1175    }
1176
1177    async fn project_guest_connection_ids(
1178        &self,
1179        project_id: ProjectId,
1180        tx: &DatabaseTransaction,
1181    ) -> Result<Vec<ConnectionId>> {
1182        let mut collaborators = project_collaborator::Entity::find()
1183            .filter(
1184                project_collaborator::Column::ProjectId
1185                    .eq(project_id)
1186                    .and(project_collaborator::Column::IsHost.eq(false)),
1187            )
1188            .stream(tx)
1189            .await?;
1190
1191        let mut guest_connection_ids = Vec::new();
1192        while let Some(collaborator) = collaborators.next().await {
1193            let collaborator = collaborator?;
1194            guest_connection_ids.push(collaborator.connection());
1195        }
1196        Ok(guest_connection_ids)
1197    }
1198
1199    /// Returns the [`RoomId`] for the given project.
1200    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1201        self.transaction(|tx| async move {
1202            Ok(project::Entity::find_by_id(project_id)
1203                .one(&*tx)
1204                .await?
1205                .and_then(|project| project.room_id))
1206        })
1207        .await
1208    }
1209
1210    pub async fn check_room_participants(
1211        &self,
1212        room_id: RoomId,
1213        leader_id: ConnectionId,
1214        follower_id: ConnectionId,
1215    ) -> Result<()> {
1216        self.transaction(|tx| async move {
1217            use room_participant::Column;
1218
1219            let count = room_participant::Entity::find()
1220                .filter(
1221                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1222                        Condition::any()
1223                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1224                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1225                            ))
1226                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1227                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1228                            )),
1229                    ),
1230                )
1231                .count(&*tx)
1232                .await?;
1233
1234            if count < 2 {
1235                Err(anyhow!("not room participants"))?;
1236            }
1237
1238            Ok(())
1239        })
1240        .await
1241    }
1242
1243    /// Adds the given follower connection as a follower of the given leader connection.
1244    pub async fn follow(
1245        &self,
1246        room_id: RoomId,
1247        project_id: ProjectId,
1248        leader_connection: ConnectionId,
1249        follower_connection: ConnectionId,
1250    ) -> Result<TransactionGuard<proto::Room>> {
1251        self.room_transaction(room_id, |tx| async move {
1252            follower::ActiveModel {
1253                room_id: ActiveValue::set(room_id),
1254                project_id: ActiveValue::set(project_id),
1255                leader_connection_server_id: ActiveValue::set(ServerId(
1256                    leader_connection.owner_id as i32,
1257                )),
1258                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1259                follower_connection_server_id: ActiveValue::set(ServerId(
1260                    follower_connection.owner_id as i32,
1261                )),
1262                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1263                ..Default::default()
1264            }
1265            .insert(&*tx)
1266            .await?;
1267
1268            let room = self.get_room(room_id, &tx).await?;
1269            Ok(room)
1270        })
1271        .await
1272    }
1273
1274    /// Removes the given follower connection as a follower of the given leader connection.
1275    pub async fn unfollow(
1276        &self,
1277        room_id: RoomId,
1278        project_id: ProjectId,
1279        leader_connection: ConnectionId,
1280        follower_connection: ConnectionId,
1281    ) -> Result<TransactionGuard<proto::Room>> {
1282        self.room_transaction(room_id, |tx| async move {
1283            follower::Entity::delete_many()
1284                .filter(
1285                    Condition::all()
1286                        .add(follower::Column::RoomId.eq(room_id))
1287                        .add(follower::Column::ProjectId.eq(project_id))
1288                        .add(
1289                            follower::Column::LeaderConnectionServerId
1290                                .eq(leader_connection.owner_id),
1291                        )
1292                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1293                        .add(
1294                            follower::Column::FollowerConnectionServerId
1295                                .eq(follower_connection.owner_id),
1296                        )
1297                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1298                )
1299                .exec(&*tx)
1300                .await?;
1301
1302            let room = self.get_room(room_id, &tx).await?;
1303            Ok(room)
1304        })
1305        .await
1306    }
1307}