projects.rs

   1use super::*;
   2
   3impl Database {
   4    /// Returns the count of all projects, excluding ones marked as admin.
   5    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   6        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
   7        enum QueryAs {
   8            Count,
   9        }
  10
  11        self.transaction(|tx| async move {
  12            Ok(project::Entity::find()
  13                .select_only()
  14                .column_as(project::Column::Id.count(), QueryAs::Count)
  15                .inner_join(user::Entity)
  16                .filter(user::Column::Admin.eq(false))
  17                .into_values::<_, QueryAs>()
  18                .one(&*tx)
  19                .await?
  20                .unwrap_or(0i64) as usize)
  21        })
  22        .await
  23    }
  24
  25    /// Shares a project with the given room.
  26    pub async fn share_project(
  27        &self,
  28        room_id: RoomId,
  29        connection: ConnectionId,
  30        worktrees: &[proto::WorktreeMetadata],
  31    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
  32        self.room_transaction(room_id, |tx| async move {
  33            let participant = room_participant::Entity::find()
  34                .filter(
  35                    Condition::all()
  36                        .add(
  37                            room_participant::Column::AnsweringConnectionId
  38                                .eq(connection.id as i32),
  39                        )
  40                        .add(
  41                            room_participant::Column::AnsweringConnectionServerId
  42                                .eq(connection.owner_id as i32),
  43                        ),
  44                )
  45                .one(&*tx)
  46                .await?
  47                .ok_or_else(|| anyhow!("could not find participant"))?;
  48            if participant.room_id != room_id {
  49                return Err(anyhow!("shared project on unexpected room"))?;
  50            }
  51            if !participant
  52                .role
  53                .unwrap_or(ChannelRole::Member)
  54                .can_edit_projects()
  55            {
  56                return Err(anyhow!("guests cannot share projects"))?;
  57            }
  58
  59            let project = project::ActiveModel {
  60                room_id: ActiveValue::set(Some(participant.room_id)),
  61                host_user_id: ActiveValue::set(Some(participant.user_id)),
  62                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  63                host_connection_server_id: ActiveValue::set(Some(ServerId(
  64                    connection.owner_id as i32,
  65                ))),
  66                id: ActiveValue::NotSet,
  67                hosted_project_id: ActiveValue::Set(None),
  68            }
  69            .insert(&*tx)
  70            .await?;
  71
  72            if !worktrees.is_empty() {
  73                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  74                    worktree::ActiveModel {
  75                        id: ActiveValue::set(worktree.id as i64),
  76                        project_id: ActiveValue::set(project.id),
  77                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  78                        root_name: ActiveValue::set(worktree.root_name.clone()),
  79                        visible: ActiveValue::set(worktree.visible),
  80                        scan_id: ActiveValue::set(0),
  81                        completed_scan_id: ActiveValue::set(0),
  82                    }
  83                }))
  84                .exec(&*tx)
  85                .await?;
  86            }
  87
  88            project_collaborator::ActiveModel {
  89                project_id: ActiveValue::set(project.id),
  90                connection_id: ActiveValue::set(connection.id as i32),
  91                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  92                user_id: ActiveValue::set(participant.user_id),
  93                replica_id: ActiveValue::set(ReplicaId(0)),
  94                is_host: ActiveValue::set(true),
  95                ..Default::default()
  96            }
  97            .insert(&*tx)
  98            .await?;
  99
 100            let room = self.get_room(room_id, &tx).await?;
 101            Ok((project.id, room))
 102        })
 103        .await
 104    }
 105
 106    /// Unshares the given project.
 107    pub async fn unshare_project(
 108        &self,
 109        project_id: ProjectId,
 110        connection: ConnectionId,
 111    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
 112        let room_id = self.room_id_for_project(project_id).await?;
 113        self.room_transaction(room_id, |tx| async move {
 114            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 115
 116            let project = project::Entity::find_by_id(project_id)
 117                .one(&*tx)
 118                .await?
 119                .ok_or_else(|| anyhow!("project not found"))?;
 120            if project.host_connection()? == connection {
 121                project::Entity::delete(project.into_active_model())
 122                    .exec(&*tx)
 123                    .await?;
 124                let room = self.get_room(room_id, &tx).await?;
 125                Ok((room, guest_connection_ids))
 126            } else {
 127                Err(anyhow!("cannot unshare a project hosted by another user"))?
 128            }
 129        })
 130        .await
 131    }
 132
 133    /// Updates the worktrees associated with the given project.
 134    pub async fn update_project(
 135        &self,
 136        project_id: ProjectId,
 137        connection: ConnectionId,
 138        worktrees: &[proto::WorktreeMetadata],
 139    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
 140        let room_id = self.room_id_for_project(project_id).await?;
 141        self.room_transaction(room_id, |tx| async move {
 142            let project = project::Entity::find_by_id(project_id)
 143                .filter(
 144                    Condition::all()
 145                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 146                        .add(
 147                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 148                        ),
 149                )
 150                .one(&*tx)
 151                .await?
 152                .ok_or_else(|| anyhow!("no such project"))?;
 153
 154            self.update_project_worktrees(project.id, worktrees, &tx)
 155                .await?;
 156
 157            let room_id = project
 158                .room_id
 159                .ok_or_else(|| anyhow!("project not in a room"))?;
 160
 161            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 162            let room = self.get_room(room_id, &tx).await?;
 163            Ok((room, guest_connection_ids))
 164        })
 165        .await
 166    }
 167
 168    pub(in crate::db) async fn update_project_worktrees(
 169        &self,
 170        project_id: ProjectId,
 171        worktrees: &[proto::WorktreeMetadata],
 172        tx: &DatabaseTransaction,
 173    ) -> Result<()> {
 174        if !worktrees.is_empty() {
 175            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 176                id: ActiveValue::set(worktree.id as i64),
 177                project_id: ActiveValue::set(project_id),
 178                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 179                root_name: ActiveValue::set(worktree.root_name.clone()),
 180                visible: ActiveValue::set(worktree.visible),
 181                scan_id: ActiveValue::set(0),
 182                completed_scan_id: ActiveValue::set(0),
 183            }))
 184            .on_conflict(
 185                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 186                    .update_column(worktree::Column::RootName)
 187                    .to_owned(),
 188            )
 189            .exec(tx)
 190            .await?;
 191        }
 192
 193        worktree::Entity::delete_many()
 194            .filter(worktree::Column::ProjectId.eq(project_id).and(
 195                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 196            ))
 197            .exec(tx)
 198            .await?;
 199
 200        Ok(())
 201    }
 202
 203    pub async fn update_worktree(
 204        &self,
 205        update: &proto::UpdateWorktree,
 206        connection: ConnectionId,
 207    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 208        let project_id = ProjectId::from_proto(update.project_id);
 209        let worktree_id = update.worktree_id as i64;
 210        let room_id = self.room_id_for_project(project_id).await?;
 211        self.room_transaction(room_id, |tx| async move {
 212            // Ensure the update comes from the host.
 213            let _project = project::Entity::find_by_id(project_id)
 214                .filter(
 215                    Condition::all()
 216                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 217                        .add(
 218                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 219                        ),
 220                )
 221                .one(&*tx)
 222                .await?
 223                .ok_or_else(|| anyhow!("no such project"))?;
 224
 225            // Update metadata.
 226            worktree::Entity::update(worktree::ActiveModel {
 227                id: ActiveValue::set(worktree_id),
 228                project_id: ActiveValue::set(project_id),
 229                root_name: ActiveValue::set(update.root_name.clone()),
 230                scan_id: ActiveValue::set(update.scan_id as i64),
 231                completed_scan_id: if update.is_last_update {
 232                    ActiveValue::set(update.scan_id as i64)
 233                } else {
 234                    ActiveValue::default()
 235                },
 236                abs_path: ActiveValue::set(update.abs_path.clone()),
 237                ..Default::default()
 238            })
 239            .exec(&*tx)
 240            .await?;
 241
 242            if !update.updated_entries.is_empty() {
 243                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 244                    let mtime = entry.mtime.clone().unwrap_or_default();
 245                    worktree_entry::ActiveModel {
 246                        project_id: ActiveValue::set(project_id),
 247                        worktree_id: ActiveValue::set(worktree_id),
 248                        id: ActiveValue::set(entry.id as i64),
 249                        is_dir: ActiveValue::set(entry.is_dir),
 250                        path: ActiveValue::set(entry.path.clone()),
 251                        inode: ActiveValue::set(entry.inode as i64),
 252                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 253                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 254                        is_symlink: ActiveValue::set(entry.is_symlink),
 255                        is_ignored: ActiveValue::set(entry.is_ignored),
 256                        is_external: ActiveValue::set(entry.is_external),
 257                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 258                        is_deleted: ActiveValue::set(false),
 259                        scan_id: ActiveValue::set(update.scan_id as i64),
 260                    }
 261                }))
 262                .on_conflict(
 263                    OnConflict::columns([
 264                        worktree_entry::Column::ProjectId,
 265                        worktree_entry::Column::WorktreeId,
 266                        worktree_entry::Column::Id,
 267                    ])
 268                    .update_columns([
 269                        worktree_entry::Column::IsDir,
 270                        worktree_entry::Column::Path,
 271                        worktree_entry::Column::Inode,
 272                        worktree_entry::Column::MtimeSeconds,
 273                        worktree_entry::Column::MtimeNanos,
 274                        worktree_entry::Column::IsSymlink,
 275                        worktree_entry::Column::IsIgnored,
 276                        worktree_entry::Column::GitStatus,
 277                        worktree_entry::Column::ScanId,
 278                    ])
 279                    .to_owned(),
 280                )
 281                .exec(&*tx)
 282                .await?;
 283            }
 284
 285            if !update.removed_entries.is_empty() {
 286                worktree_entry::Entity::update_many()
 287                    .filter(
 288                        worktree_entry::Column::ProjectId
 289                            .eq(project_id)
 290                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 291                            .and(
 292                                worktree_entry::Column::Id
 293                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 294                            ),
 295                    )
 296                    .set(worktree_entry::ActiveModel {
 297                        is_deleted: ActiveValue::Set(true),
 298                        scan_id: ActiveValue::Set(update.scan_id as i64),
 299                        ..Default::default()
 300                    })
 301                    .exec(&*tx)
 302                    .await?;
 303            }
 304
 305            if !update.updated_repositories.is_empty() {
 306                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 307                    |repository| worktree_repository::ActiveModel {
 308                        project_id: ActiveValue::set(project_id),
 309                        worktree_id: ActiveValue::set(worktree_id),
 310                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 311                        scan_id: ActiveValue::set(update.scan_id as i64),
 312                        branch: ActiveValue::set(repository.branch.clone()),
 313                        is_deleted: ActiveValue::set(false),
 314                    },
 315                ))
 316                .on_conflict(
 317                    OnConflict::columns([
 318                        worktree_repository::Column::ProjectId,
 319                        worktree_repository::Column::WorktreeId,
 320                        worktree_repository::Column::WorkDirectoryId,
 321                    ])
 322                    .update_columns([
 323                        worktree_repository::Column::ScanId,
 324                        worktree_repository::Column::Branch,
 325                    ])
 326                    .to_owned(),
 327                )
 328                .exec(&*tx)
 329                .await?;
 330            }
 331
 332            if !update.removed_repositories.is_empty() {
 333                worktree_repository::Entity::update_many()
 334                    .filter(
 335                        worktree_repository::Column::ProjectId
 336                            .eq(project_id)
 337                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 338                            .and(
 339                                worktree_repository::Column::WorkDirectoryId
 340                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 341                            ),
 342                    )
 343                    .set(worktree_repository::ActiveModel {
 344                        is_deleted: ActiveValue::Set(true),
 345                        scan_id: ActiveValue::Set(update.scan_id as i64),
 346                        ..Default::default()
 347                    })
 348                    .exec(&*tx)
 349                    .await?;
 350            }
 351
 352            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 353            Ok(connection_ids)
 354        })
 355        .await
 356    }
 357
 358    /// Updates the diagnostic summary for the given connection.
 359    pub async fn update_diagnostic_summary(
 360        &self,
 361        update: &proto::UpdateDiagnosticSummary,
 362        connection: ConnectionId,
 363    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 364        let project_id = ProjectId::from_proto(update.project_id);
 365        let worktree_id = update.worktree_id as i64;
 366        let room_id = self.room_id_for_project(project_id).await?;
 367        self.room_transaction(room_id, |tx| async move {
 368            let summary = update
 369                .summary
 370                .as_ref()
 371                .ok_or_else(|| anyhow!("invalid summary"))?;
 372
 373            // Ensure the update comes from the host.
 374            let project = project::Entity::find_by_id(project_id)
 375                .one(&*tx)
 376                .await?
 377                .ok_or_else(|| anyhow!("no such project"))?;
 378            if project.host_connection()? != connection {
 379                return Err(anyhow!("can't update a project hosted by someone else"))?;
 380            }
 381
 382            // Update summary.
 383            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 384                project_id: ActiveValue::set(project_id),
 385                worktree_id: ActiveValue::set(worktree_id),
 386                path: ActiveValue::set(summary.path.clone()),
 387                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 388                error_count: ActiveValue::set(summary.error_count as i32),
 389                warning_count: ActiveValue::set(summary.warning_count as i32),
 390            })
 391            .on_conflict(
 392                OnConflict::columns([
 393                    worktree_diagnostic_summary::Column::ProjectId,
 394                    worktree_diagnostic_summary::Column::WorktreeId,
 395                    worktree_diagnostic_summary::Column::Path,
 396                ])
 397                .update_columns([
 398                    worktree_diagnostic_summary::Column::LanguageServerId,
 399                    worktree_diagnostic_summary::Column::ErrorCount,
 400                    worktree_diagnostic_summary::Column::WarningCount,
 401                ])
 402                .to_owned(),
 403            )
 404            .exec(&*tx)
 405            .await?;
 406
 407            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 408            Ok(connection_ids)
 409        })
 410        .await
 411    }
 412
 413    /// Starts the language server for the given connection.
 414    pub async fn start_language_server(
 415        &self,
 416        update: &proto::StartLanguageServer,
 417        connection: ConnectionId,
 418    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 419        let project_id = ProjectId::from_proto(update.project_id);
 420        let room_id = self.room_id_for_project(project_id).await?;
 421        self.room_transaction(room_id, |tx| async move {
 422            let server = update
 423                .server
 424                .as_ref()
 425                .ok_or_else(|| anyhow!("invalid language server"))?;
 426
 427            // Ensure the update comes from the host.
 428            let project = project::Entity::find_by_id(project_id)
 429                .one(&*tx)
 430                .await?
 431                .ok_or_else(|| anyhow!("no such project"))?;
 432            if project.host_connection()? != connection {
 433                return Err(anyhow!("can't update a project hosted by someone else"))?;
 434            }
 435
 436            // Add the newly-started language server.
 437            language_server::Entity::insert(language_server::ActiveModel {
 438                project_id: ActiveValue::set(project_id),
 439                id: ActiveValue::set(server.id as i64),
 440                name: ActiveValue::set(server.name.clone()),
 441            })
 442            .on_conflict(
 443                OnConflict::columns([
 444                    language_server::Column::ProjectId,
 445                    language_server::Column::Id,
 446                ])
 447                .update_column(language_server::Column::Name)
 448                .to_owned(),
 449            )
 450            .exec(&*tx)
 451            .await?;
 452
 453            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 454            Ok(connection_ids)
 455        })
 456        .await
 457    }
 458
 459    /// Updates the worktree settings for the given connection.
 460    pub async fn update_worktree_settings(
 461        &self,
 462        update: &proto::UpdateWorktreeSettings,
 463        connection: ConnectionId,
 464    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 465        let project_id = ProjectId::from_proto(update.project_id);
 466        let room_id = self.room_id_for_project(project_id).await?;
 467        self.room_transaction(room_id, |tx| async move {
 468            // Ensure the update comes from the host.
 469            let project = project::Entity::find_by_id(project_id)
 470                .one(&*tx)
 471                .await?
 472                .ok_or_else(|| anyhow!("no such project"))?;
 473            if project.host_connection()? != connection {
 474                return Err(anyhow!("can't update a project hosted by someone else"))?;
 475            }
 476
 477            if let Some(content) = &update.content {
 478                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 479                    project_id: ActiveValue::Set(project_id),
 480                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 481                    path: ActiveValue::Set(update.path.clone()),
 482                    content: ActiveValue::Set(content.clone()),
 483                })
 484                .on_conflict(
 485                    OnConflict::columns([
 486                        worktree_settings_file::Column::ProjectId,
 487                        worktree_settings_file::Column::WorktreeId,
 488                        worktree_settings_file::Column::Path,
 489                    ])
 490                    .update_column(worktree_settings_file::Column::Content)
 491                    .to_owned(),
 492                )
 493                .exec(&*tx)
 494                .await?;
 495            } else {
 496                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 497                    project_id: ActiveValue::Set(project_id),
 498                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 499                    path: ActiveValue::Set(update.path.clone()),
 500                    ..Default::default()
 501                })
 502                .exec(&*tx)
 503                .await?;
 504            }
 505
 506            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 507            Ok(connection_ids)
 508        })
 509        .await
 510    }
 511
 512    /// Adds the given connection to the specified hosted project
 513    pub async fn join_hosted_project(
 514        &self,
 515        id: ProjectId,
 516        user_id: UserId,
 517        connection: ConnectionId,
 518    ) -> Result<(Project, ReplicaId)> {
 519        self.transaction(|tx| async move {
 520            let (project, hosted_project) = project::Entity::find_by_id(id)
 521                .find_also_related(hosted_project::Entity)
 522                .one(&*tx)
 523                .await?
 524                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 525
 526            let Some(hosted_project) = hosted_project else {
 527                return Err(anyhow!("project is not hosted"))?;
 528            };
 529
 530            let channel = channel::Entity::find_by_id(hosted_project.channel_id)
 531                .one(&*tx)
 532                .await?
 533                .ok_or_else(|| anyhow!("no such channel"))?;
 534
 535            let role = self
 536                .check_user_is_channel_participant(&channel, user_id, &tx)
 537                .await?;
 538
 539            self.join_project_internal(project, user_id, connection, role, &tx)
 540                .await
 541        })
 542        .await
 543    }
 544
 545    /// Adds the given connection to the specified project
 546    /// in the current room.
 547    pub async fn join_project_in_room(
 548        &self,
 549        project_id: ProjectId,
 550        connection: ConnectionId,
 551    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
 552        let room_id = self.room_id_for_project(project_id).await?;
 553        self.room_transaction(room_id, |tx| async move {
 554            let participant = room_participant::Entity::find()
 555                .filter(
 556                    Condition::all()
 557                        .add(
 558                            room_participant::Column::AnsweringConnectionId
 559                                .eq(connection.id as i32),
 560                        )
 561                        .add(
 562                            room_participant::Column::AnsweringConnectionServerId
 563                                .eq(connection.owner_id as i32),
 564                        ),
 565                )
 566                .one(&*tx)
 567                .await?
 568                .ok_or_else(|| anyhow!("must join a room first"))?;
 569
 570            let project = project::Entity::find_by_id(project_id)
 571                .one(&*tx)
 572                .await?
 573                .ok_or_else(|| anyhow!("no such project"))?;
 574            if project.room_id != Some(participant.room_id) {
 575                return Err(anyhow!("no such project"))?;
 576            }
 577            self.join_project_internal(
 578                project,
 579                participant.user_id,
 580                connection,
 581                participant.role.unwrap_or(ChannelRole::Member),
 582                &tx,
 583            )
 584            .await
 585        })
 586        .await
 587    }
 588
 589    async fn join_project_internal(
 590        &self,
 591        project: project::Model,
 592        user_id: UserId,
 593        connection: ConnectionId,
 594        role: ChannelRole,
 595        tx: &DatabaseTransaction,
 596    ) -> Result<(Project, ReplicaId)> {
 597        let mut collaborators = project
 598            .find_related(project_collaborator::Entity)
 599            .all(tx)
 600            .await?;
 601        let replica_ids = collaborators
 602            .iter()
 603            .map(|c| c.replica_id)
 604            .collect::<HashSet<_>>();
 605        let mut replica_id = ReplicaId(1);
 606        while replica_ids.contains(&replica_id) {
 607            replica_id.0 += 1;
 608        }
 609        let new_collaborator = project_collaborator::ActiveModel {
 610            project_id: ActiveValue::set(project.id),
 611            connection_id: ActiveValue::set(connection.id as i32),
 612            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 613            user_id: ActiveValue::set(user_id),
 614            replica_id: ActiveValue::set(replica_id),
 615            is_host: ActiveValue::set(false),
 616            ..Default::default()
 617        }
 618        .insert(tx)
 619        .await?;
 620        collaborators.push(new_collaborator);
 621
 622        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 623        let mut worktrees = db_worktrees
 624            .into_iter()
 625            .map(|db_worktree| {
 626                (
 627                    db_worktree.id as u64,
 628                    Worktree {
 629                        id: db_worktree.id as u64,
 630                        abs_path: db_worktree.abs_path,
 631                        root_name: db_worktree.root_name,
 632                        visible: db_worktree.visible,
 633                        entries: Default::default(),
 634                        repository_entries: Default::default(),
 635                        diagnostic_summaries: Default::default(),
 636                        settings_files: Default::default(),
 637                        scan_id: db_worktree.scan_id as u64,
 638                        completed_scan_id: db_worktree.completed_scan_id as u64,
 639                    },
 640                )
 641            })
 642            .collect::<BTreeMap<_, _>>();
 643
 644        // Populate worktree entries.
 645        {
 646            let mut db_entries = worktree_entry::Entity::find()
 647                .filter(
 648                    Condition::all()
 649                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 650                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 651                )
 652                .stream(tx)
 653                .await?;
 654            while let Some(db_entry) = db_entries.next().await {
 655                let db_entry = db_entry?;
 656                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 657                    worktree.entries.push(proto::Entry {
 658                        id: db_entry.id as u64,
 659                        is_dir: db_entry.is_dir,
 660                        path: db_entry.path,
 661                        inode: db_entry.inode as u64,
 662                        mtime: Some(proto::Timestamp {
 663                            seconds: db_entry.mtime_seconds as u64,
 664                            nanos: db_entry.mtime_nanos as u32,
 665                        }),
 666                        is_symlink: db_entry.is_symlink,
 667                        is_ignored: db_entry.is_ignored,
 668                        is_external: db_entry.is_external,
 669                        git_status: db_entry.git_status.map(|status| status as i32),
 670                    });
 671                }
 672            }
 673        }
 674
 675        // Populate repository entries.
 676        {
 677            let mut db_repository_entries = worktree_repository::Entity::find()
 678                .filter(
 679                    Condition::all()
 680                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 681                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 682                )
 683                .stream(tx)
 684                .await?;
 685            while let Some(db_repository_entry) = db_repository_entries.next().await {
 686                let db_repository_entry = db_repository_entry?;
 687                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 688                {
 689                    worktree.repository_entries.insert(
 690                        db_repository_entry.work_directory_id as u64,
 691                        proto::RepositoryEntry {
 692                            work_directory_id: db_repository_entry.work_directory_id as u64,
 693                            branch: db_repository_entry.branch,
 694                        },
 695                    );
 696                }
 697            }
 698        }
 699
 700        // Populate worktree diagnostic summaries.
 701        {
 702            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 703                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 704                .stream(tx)
 705                .await?;
 706            while let Some(db_summary) = db_summaries.next().await {
 707                let db_summary = db_summary?;
 708                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 709                    worktree
 710                        .diagnostic_summaries
 711                        .push(proto::DiagnosticSummary {
 712                            path: db_summary.path,
 713                            language_server_id: db_summary.language_server_id as u64,
 714                            error_count: db_summary.error_count as u32,
 715                            warning_count: db_summary.warning_count as u32,
 716                        });
 717                }
 718            }
 719        }
 720
 721        // Populate worktree settings files
 722        {
 723            let mut db_settings_files = worktree_settings_file::Entity::find()
 724                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 725                .stream(tx)
 726                .await?;
 727            while let Some(db_settings_file) = db_settings_files.next().await {
 728                let db_settings_file = db_settings_file?;
 729                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 730                    worktree.settings_files.push(WorktreeSettingsFile {
 731                        path: db_settings_file.path,
 732                        content: db_settings_file.content,
 733                    });
 734                }
 735            }
 736        }
 737
 738        // Populate language servers.
 739        let language_servers = project
 740            .find_related(language_server::Entity)
 741            .all(tx)
 742            .await?;
 743
 744        let project = Project {
 745            id: project.id,
 746            role,
 747            collaborators: collaborators
 748                .into_iter()
 749                .map(|collaborator| ProjectCollaborator {
 750                    connection_id: collaborator.connection(),
 751                    user_id: collaborator.user_id,
 752                    replica_id: collaborator.replica_id,
 753                    is_host: collaborator.is_host,
 754                })
 755                .collect(),
 756            worktrees,
 757            language_servers: language_servers
 758                .into_iter()
 759                .map(|language_server| proto::LanguageServer {
 760                    id: language_server.id as u64,
 761                    name: language_server.name,
 762                })
 763                .collect(),
 764        };
 765        Ok((project, replica_id as ReplicaId))
 766    }
 767
 768    pub async fn leave_hosted_project(
 769        &self,
 770        project_id: ProjectId,
 771        connection: ConnectionId,
 772    ) -> Result<LeftProject> {
 773        self.transaction(|tx| async move {
 774            let result = project_collaborator::Entity::delete_many()
 775                .filter(
 776                    Condition::all()
 777                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 778                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 779                        .add(
 780                            project_collaborator::Column::ConnectionServerId
 781                                .eq(connection.owner_id as i32),
 782                        ),
 783                )
 784                .exec(&*tx)
 785                .await?;
 786            if result.rows_affected == 0 {
 787                return Err(anyhow!("not in the project"))?;
 788            }
 789
 790            let project = project::Entity::find_by_id(project_id)
 791                .one(&*tx)
 792                .await?
 793                .ok_or_else(|| anyhow!("no such project"))?;
 794            let collaborators = project
 795                .find_related(project_collaborator::Entity)
 796                .all(&*tx)
 797                .await?;
 798            let connection_ids = collaborators
 799                .into_iter()
 800                .map(|collaborator| collaborator.connection())
 801                .collect();
 802            Ok(LeftProject {
 803                id: project.id,
 804                connection_ids,
 805                host_user_id: None,
 806                host_connection_id: None,
 807            })
 808        })
 809        .await
 810    }
 811
 812    /// Removes the given connection from the specified project.
 813    pub async fn leave_project(
 814        &self,
 815        project_id: ProjectId,
 816        connection: ConnectionId,
 817    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
 818        let room_id = self.room_id_for_project(project_id).await?;
 819        self.room_transaction(room_id, |tx| async move {
 820            let result = project_collaborator::Entity::delete_many()
 821                .filter(
 822                    Condition::all()
 823                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 824                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 825                        .add(
 826                            project_collaborator::Column::ConnectionServerId
 827                                .eq(connection.owner_id as i32),
 828                        ),
 829                )
 830                .exec(&*tx)
 831                .await?;
 832            if result.rows_affected == 0 {
 833                Err(anyhow!("not a collaborator on this project"))?;
 834            }
 835
 836            let project = project::Entity::find_by_id(project_id)
 837                .one(&*tx)
 838                .await?
 839                .ok_or_else(|| anyhow!("no such project"))?;
 840            let collaborators = project
 841                .find_related(project_collaborator::Entity)
 842                .all(&*tx)
 843                .await?;
 844            let connection_ids = collaborators
 845                .into_iter()
 846                .map(|collaborator| collaborator.connection())
 847                .collect();
 848
 849            follower::Entity::delete_many()
 850                .filter(
 851                    Condition::any()
 852                        .add(
 853                            Condition::all()
 854                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 855                                .add(
 856                                    follower::Column::LeaderConnectionServerId
 857                                        .eq(connection.owner_id),
 858                                )
 859                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 860                        )
 861                        .add(
 862                            Condition::all()
 863                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 864                                .add(
 865                                    follower::Column::FollowerConnectionServerId
 866                                        .eq(connection.owner_id),
 867                                )
 868                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 869                        ),
 870                )
 871                .exec(&*tx)
 872                .await?;
 873
 874            let room = self.get_room(room_id, &tx).await?;
 875            let left_project = LeftProject {
 876                id: project_id,
 877                host_user_id: project.host_user_id,
 878                host_connection_id: Some(project.host_connection()?),
 879                connection_ids,
 880            };
 881            Ok((room, left_project))
 882        })
 883        .await
 884    }
 885
 886    pub async fn check_user_is_project_host(
 887        &self,
 888        project_id: ProjectId,
 889        connection_id: ConnectionId,
 890    ) -> Result<()> {
 891        let room_id = self.room_id_for_project(project_id).await?;
 892        self.room_transaction(room_id, |tx| async move {
 893            project_collaborator::Entity::find()
 894                .filter(
 895                    Condition::all()
 896                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 897                        .add(project_collaborator::Column::IsHost.eq(true))
 898                        .add(project_collaborator::Column::ConnectionId.eq(connection_id.id))
 899                        .add(
 900                            project_collaborator::Column::ConnectionServerId
 901                                .eq(connection_id.owner_id),
 902                        ),
 903                )
 904                .one(&*tx)
 905                .await?
 906                .ok_or_else(|| anyhow!("failed to read project host"))?;
 907
 908            Ok(())
 909        })
 910        .await
 911        .map(|guard| guard.into_inner())
 912    }
 913
 914    /// Returns the host connection for a read-only request to join a shared project.
 915    pub async fn host_for_read_only_project_request(
 916        &self,
 917        project_id: ProjectId,
 918        connection_id: ConnectionId,
 919    ) -> Result<ConnectionId> {
 920        let room_id = self.room_id_for_project(project_id).await?;
 921        self.room_transaction(room_id, |tx| async move {
 922            let current_participant = room_participant::Entity::find()
 923                .filter(room_participant::Column::RoomId.eq(room_id))
 924                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 925                .one(&*tx)
 926                .await?
 927                .ok_or_else(|| anyhow!("no such room"))?;
 928
 929            if !current_participant
 930                .role
 931                .map_or(false, |role| role.can_read_projects())
 932            {
 933                Err(anyhow!("not authorized to read projects"))?;
 934            }
 935
 936            let host = project_collaborator::Entity::find()
 937                .filter(
 938                    project_collaborator::Column::ProjectId
 939                        .eq(project_id)
 940                        .and(project_collaborator::Column::IsHost.eq(true)),
 941                )
 942                .one(&*tx)
 943                .await?
 944                .ok_or_else(|| anyhow!("failed to read project host"))?;
 945
 946            Ok(host.connection())
 947        })
 948        .await
 949        .map(|guard| guard.into_inner())
 950    }
 951
 952    /// Returns the host connection for a request to join a shared project.
 953    pub async fn host_for_mutating_project_request(
 954        &self,
 955        project_id: ProjectId,
 956        connection_id: ConnectionId,
 957    ) -> Result<ConnectionId> {
 958        let room_id = self.room_id_for_project(project_id).await?;
 959        self.room_transaction(room_id, |tx| async move {
 960            let current_participant = room_participant::Entity::find()
 961                .filter(room_participant::Column::RoomId.eq(room_id))
 962                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 963                .one(&*tx)
 964                .await?
 965                .ok_or_else(|| anyhow!("no such room"))?;
 966
 967            if !current_participant
 968                .role
 969                .map_or(false, |role| role.can_edit_projects())
 970            {
 971                Err(anyhow!("not authorized to edit projects"))?;
 972            }
 973
 974            let host = project_collaborator::Entity::find()
 975                .filter(
 976                    project_collaborator::Column::ProjectId
 977                        .eq(project_id)
 978                        .and(project_collaborator::Column::IsHost.eq(true)),
 979                )
 980                .one(&*tx)
 981                .await?
 982                .ok_or_else(|| anyhow!("failed to read project host"))?;
 983
 984            Ok(host.connection())
 985        })
 986        .await
 987        .map(|guard| guard.into_inner())
 988    }
 989
 990    pub async fn project_collaborators_for_buffer_update(
 991        &self,
 992        project_id: ProjectId,
 993        connection_id: ConnectionId,
 994        requires_write: bool,
 995    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
 996        let room_id = self.room_id_for_project(project_id).await?;
 997        self.room_transaction(room_id, |tx| async move {
 998            let current_participant = room_participant::Entity::find()
 999                .filter(room_participant::Column::RoomId.eq(room_id))
1000                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1001                .one(&*tx)
1002                .await?
1003                .ok_or_else(|| anyhow!("no such room"))?;
1004
1005            if requires_write
1006                && !current_participant
1007                    .role
1008                    .map_or(false, |role| role.can_edit_projects())
1009            {
1010                Err(anyhow!("not authorized to edit projects"))?;
1011            }
1012
1013            let collaborators = project_collaborator::Entity::find()
1014                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1015                .all(&*tx)
1016                .await?
1017                .into_iter()
1018                .map(|collaborator| ProjectCollaborator {
1019                    connection_id: collaborator.connection(),
1020                    user_id: collaborator.user_id,
1021                    replica_id: collaborator.replica_id,
1022                    is_host: collaborator.is_host,
1023                })
1024                .collect::<Vec<_>>();
1025
1026            if collaborators
1027                .iter()
1028                .any(|collaborator| collaborator.connection_id == connection_id)
1029            {
1030                Ok(collaborators)
1031            } else {
1032                Err(anyhow!("no such project"))?
1033            }
1034        })
1035        .await
1036    }
1037
1038    /// Returns the connection IDs in the given project.
1039    ///
1040    /// The provided `connection_id` must also be a collaborator in the project,
1041    /// otherwise an error will be returned.
1042    pub async fn project_connection_ids(
1043        &self,
1044        project_id: ProjectId,
1045        connection_id: ConnectionId,
1046    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1047        let room_id = self.room_id_for_project(project_id).await?;
1048        self.room_transaction(room_id, |tx| async move {
1049            let mut collaborators = project_collaborator::Entity::find()
1050                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1051                .stream(&*tx)
1052                .await?;
1053
1054            let mut connection_ids = HashSet::default();
1055            while let Some(collaborator) = collaborators.next().await {
1056                let collaborator = collaborator?;
1057                connection_ids.insert(collaborator.connection());
1058            }
1059
1060            if connection_ids.contains(&connection_id) {
1061                Ok(connection_ids)
1062            } else {
1063                Err(anyhow!("no such project"))?
1064            }
1065        })
1066        .await
1067    }
1068
1069    async fn project_guest_connection_ids(
1070        &self,
1071        project_id: ProjectId,
1072        tx: &DatabaseTransaction,
1073    ) -> Result<Vec<ConnectionId>> {
1074        let mut collaborators = project_collaborator::Entity::find()
1075            .filter(
1076                project_collaborator::Column::ProjectId
1077                    .eq(project_id)
1078                    .and(project_collaborator::Column::IsHost.eq(false)),
1079            )
1080            .stream(tx)
1081            .await?;
1082
1083        let mut guest_connection_ids = Vec::new();
1084        while let Some(collaborator) = collaborators.next().await {
1085            let collaborator = collaborator?;
1086            guest_connection_ids.push(collaborator.connection());
1087        }
1088        Ok(guest_connection_ids)
1089    }
1090
1091    /// Returns the [`RoomId`] for the given project.
1092    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
1093        self.transaction(|tx| async move {
1094            let project = project::Entity::find_by_id(project_id)
1095                .one(&*tx)
1096                .await?
1097                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
1098            Ok(project
1099                .room_id
1100                .ok_or_else(|| anyhow!("project not in room"))?)
1101        })
1102        .await
1103    }
1104
1105    pub async fn check_room_participants(
1106        &self,
1107        room_id: RoomId,
1108        leader_id: ConnectionId,
1109        follower_id: ConnectionId,
1110    ) -> Result<()> {
1111        self.transaction(|tx| async move {
1112            use room_participant::Column;
1113
1114            let count = room_participant::Entity::find()
1115                .filter(
1116                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1117                        Condition::any()
1118                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1119                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1120                            ))
1121                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1122                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1123                            )),
1124                    ),
1125                )
1126                .count(&*tx)
1127                .await?;
1128
1129            if count < 2 {
1130                Err(anyhow!("not room participants"))?;
1131            }
1132
1133            Ok(())
1134        })
1135        .await
1136    }
1137
1138    /// Adds the given follower connection as a follower of the given leader connection.
1139    pub async fn follow(
1140        &self,
1141        room_id: RoomId,
1142        project_id: ProjectId,
1143        leader_connection: ConnectionId,
1144        follower_connection: ConnectionId,
1145    ) -> Result<RoomGuard<proto::Room>> {
1146        self.room_transaction(room_id, |tx| async move {
1147            follower::ActiveModel {
1148                room_id: ActiveValue::set(room_id),
1149                project_id: ActiveValue::set(project_id),
1150                leader_connection_server_id: ActiveValue::set(ServerId(
1151                    leader_connection.owner_id as i32,
1152                )),
1153                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1154                follower_connection_server_id: ActiveValue::set(ServerId(
1155                    follower_connection.owner_id as i32,
1156                )),
1157                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1158                ..Default::default()
1159            }
1160            .insert(&*tx)
1161            .await?;
1162
1163            let room = self.get_room(room_id, &tx).await?;
1164            Ok(room)
1165        })
1166        .await
1167    }
1168
1169    /// Removes the given follower connection as a follower of the given leader connection.
1170    pub async fn unfollow(
1171        &self,
1172        room_id: RoomId,
1173        project_id: ProjectId,
1174        leader_connection: ConnectionId,
1175        follower_connection: ConnectionId,
1176    ) -> Result<RoomGuard<proto::Room>> {
1177        self.room_transaction(room_id, |tx| async move {
1178            follower::Entity::delete_many()
1179                .filter(
1180                    Condition::all()
1181                        .add(follower::Column::RoomId.eq(room_id))
1182                        .add(follower::Column::ProjectId.eq(project_id))
1183                        .add(
1184                            follower::Column::LeaderConnectionServerId
1185                                .eq(leader_connection.owner_id),
1186                        )
1187                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1188                        .add(
1189                            follower::Column::FollowerConnectionServerId
1190                                .eq(follower_connection.owner_id),
1191                        )
1192                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1193                )
1194                .exec(&*tx)
1195                .await?;
1196
1197            let room = self.get_room(room_id, &tx).await?;
1198            Ok(room)
1199        })
1200        .await
1201    }
1202}