projects.rs

   1use super::*;
   2
   3impl Database {
   4    /// Returns the count of all projects, excluding ones marked as admin.
   5    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   6        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
   7        enum QueryAs {
   8            Count,
   9        }
  10
  11        self.transaction(|tx| async move {
  12            Ok(project::Entity::find()
  13                .select_only()
  14                .column_as(project::Column::Id.count(), QueryAs::Count)
  15                .inner_join(user::Entity)
  16                .filter(user::Column::Admin.eq(false))
  17                .into_values::<_, QueryAs>()
  18                .one(&*tx)
  19                .await?
  20                .unwrap_or(0i64) as usize)
  21        })
  22        .await
  23    }
  24
  25    /// Shares a project with the given room.
  26    pub async fn share_project(
  27        &self,
  28        room_id: RoomId,
  29        connection: ConnectionId,
  30        worktrees: &[proto::WorktreeMetadata],
  31    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
  32        self.room_transaction(room_id, |tx| async move {
  33            let participant = room_participant::Entity::find()
  34                .filter(
  35                    Condition::all()
  36                        .add(
  37                            room_participant::Column::AnsweringConnectionId
  38                                .eq(connection.id as i32),
  39                        )
  40                        .add(
  41                            room_participant::Column::AnsweringConnectionServerId
  42                                .eq(connection.owner_id as i32),
  43                        ),
  44                )
  45                .one(&*tx)
  46                .await?
  47                .ok_or_else(|| anyhow!("could not find participant"))?;
  48            if participant.room_id != room_id {
  49                return Err(anyhow!("shared project on unexpected room"))?;
  50            }
  51            if !participant
  52                .role
  53                .unwrap_or(ChannelRole::Member)
  54                .can_edit_projects()
  55            {
  56                return Err(anyhow!("guests cannot share projects"))?;
  57            }
  58
  59            let project = project::ActiveModel {
  60                room_id: ActiveValue::set(Some(participant.room_id)),
  61                host_user_id: ActiveValue::set(Some(participant.user_id)),
  62                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  63                host_connection_server_id: ActiveValue::set(Some(ServerId(
  64                    connection.owner_id as i32,
  65                ))),
  66                id: ActiveValue::NotSet,
  67                hosted_project_id: ActiveValue::Set(None),
  68            }
  69            .insert(&*tx)
  70            .await?;
  71
  72            if !worktrees.is_empty() {
  73                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  74                    worktree::ActiveModel {
  75                        id: ActiveValue::set(worktree.id as i64),
  76                        project_id: ActiveValue::set(project.id),
  77                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  78                        root_name: ActiveValue::set(worktree.root_name.clone()),
  79                        visible: ActiveValue::set(worktree.visible),
  80                        scan_id: ActiveValue::set(0),
  81                        completed_scan_id: ActiveValue::set(0),
  82                    }
  83                }))
  84                .exec(&*tx)
  85                .await?;
  86            }
  87
  88            project_collaborator::ActiveModel {
  89                project_id: ActiveValue::set(project.id),
  90                connection_id: ActiveValue::set(connection.id as i32),
  91                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  92                user_id: ActiveValue::set(participant.user_id),
  93                replica_id: ActiveValue::set(ReplicaId(0)),
  94                is_host: ActiveValue::set(true),
  95                ..Default::default()
  96            }
  97            .insert(&*tx)
  98            .await?;
  99
 100            let room = self.get_room(room_id, &tx).await?;
 101            Ok((project.id, room))
 102        })
 103        .await
 104    }
 105
 106    /// Unshares the given project.
 107    pub async fn unshare_project(
 108        &self,
 109        project_id: ProjectId,
 110        connection: ConnectionId,
 111    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
 112        let room_id = self.room_id_for_project(project_id).await?;
 113        self.room_transaction(room_id, |tx| async move {
 114            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 115
 116            let project = project::Entity::find_by_id(project_id)
 117                .one(&*tx)
 118                .await?
 119                .ok_or_else(|| anyhow!("project not found"))?;
 120            if project.host_connection()? == connection {
 121                project::Entity::delete(project.into_active_model())
 122                    .exec(&*tx)
 123                    .await?;
 124                let room = self.get_room(room_id, &tx).await?;
 125                Ok((room, guest_connection_ids))
 126            } else {
 127                Err(anyhow!("cannot unshare a project hosted by another user"))?
 128            }
 129        })
 130        .await
 131    }
 132
 133    /// Updates the worktrees associated with the given project.
 134    pub async fn update_project(
 135        &self,
 136        project_id: ProjectId,
 137        connection: ConnectionId,
 138        worktrees: &[proto::WorktreeMetadata],
 139    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
 140        let room_id = self.room_id_for_project(project_id).await?;
 141        self.room_transaction(room_id, |tx| async move {
 142            let project = project::Entity::find_by_id(project_id)
 143                .filter(
 144                    Condition::all()
 145                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 146                        .add(
 147                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 148                        ),
 149                )
 150                .one(&*tx)
 151                .await?
 152                .ok_or_else(|| anyhow!("no such project"))?;
 153
 154            self.update_project_worktrees(project.id, worktrees, &tx)
 155                .await?;
 156
 157            let room_id = project
 158                .room_id
 159                .ok_or_else(|| anyhow!("project not in a room"))?;
 160
 161            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 162            let room = self.get_room(room_id, &tx).await?;
 163            Ok((room, guest_connection_ids))
 164        })
 165        .await
 166    }
 167
 168    pub(in crate::db) async fn update_project_worktrees(
 169        &self,
 170        project_id: ProjectId,
 171        worktrees: &[proto::WorktreeMetadata],
 172        tx: &DatabaseTransaction,
 173    ) -> Result<()> {
 174        if !worktrees.is_empty() {
 175            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 176                id: ActiveValue::set(worktree.id as i64),
 177                project_id: ActiveValue::set(project_id),
 178                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 179                root_name: ActiveValue::set(worktree.root_name.clone()),
 180                visible: ActiveValue::set(worktree.visible),
 181                scan_id: ActiveValue::set(0),
 182                completed_scan_id: ActiveValue::set(0),
 183            }))
 184            .on_conflict(
 185                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 186                    .update_column(worktree::Column::RootName)
 187                    .to_owned(),
 188            )
 189            .exec(tx)
 190            .await?;
 191        }
 192
 193        worktree::Entity::delete_many()
 194            .filter(worktree::Column::ProjectId.eq(project_id).and(
 195                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 196            ))
 197            .exec(tx)
 198            .await?;
 199
 200        Ok(())
 201    }
 202
 203    pub async fn update_worktree(
 204        &self,
 205        update: &proto::UpdateWorktree,
 206        connection: ConnectionId,
 207    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 208        let project_id = ProjectId::from_proto(update.project_id);
 209        let worktree_id = update.worktree_id as i64;
 210        let room_id = self.room_id_for_project(project_id).await?;
 211        self.room_transaction(room_id, |tx| async move {
 212            // Ensure the update comes from the host.
 213            let _project = project::Entity::find_by_id(project_id)
 214                .filter(
 215                    Condition::all()
 216                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 217                        .add(
 218                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 219                        ),
 220                )
 221                .one(&*tx)
 222                .await?
 223                .ok_or_else(|| anyhow!("no such project"))?;
 224
 225            // Update metadata.
 226            worktree::Entity::update(worktree::ActiveModel {
 227                id: ActiveValue::set(worktree_id),
 228                project_id: ActiveValue::set(project_id),
 229                root_name: ActiveValue::set(update.root_name.clone()),
 230                scan_id: ActiveValue::set(update.scan_id as i64),
 231                completed_scan_id: if update.is_last_update {
 232                    ActiveValue::set(update.scan_id as i64)
 233                } else {
 234                    ActiveValue::default()
 235                },
 236                abs_path: ActiveValue::set(update.abs_path.clone()),
 237                ..Default::default()
 238            })
 239            .exec(&*tx)
 240            .await?;
 241
 242            if !update.updated_entries.is_empty() {
 243                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 244                    let mtime = entry.mtime.clone().unwrap_or_default();
 245                    worktree_entry::ActiveModel {
 246                        project_id: ActiveValue::set(project_id),
 247                        worktree_id: ActiveValue::set(worktree_id),
 248                        id: ActiveValue::set(entry.id as i64),
 249                        is_dir: ActiveValue::set(entry.is_dir),
 250                        path: ActiveValue::set(entry.path.clone()),
 251                        inode: ActiveValue::set(entry.inode as i64),
 252                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 253                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 254                        is_symlink: ActiveValue::set(entry.is_symlink),
 255                        is_ignored: ActiveValue::set(entry.is_ignored),
 256                        is_external: ActiveValue::set(entry.is_external),
 257                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 258                        is_deleted: ActiveValue::set(false),
 259                        scan_id: ActiveValue::set(update.scan_id as i64),
 260                    }
 261                }))
 262                .on_conflict(
 263                    OnConflict::columns([
 264                        worktree_entry::Column::ProjectId,
 265                        worktree_entry::Column::WorktreeId,
 266                        worktree_entry::Column::Id,
 267                    ])
 268                    .update_columns([
 269                        worktree_entry::Column::IsDir,
 270                        worktree_entry::Column::Path,
 271                        worktree_entry::Column::Inode,
 272                        worktree_entry::Column::MtimeSeconds,
 273                        worktree_entry::Column::MtimeNanos,
 274                        worktree_entry::Column::IsSymlink,
 275                        worktree_entry::Column::IsIgnored,
 276                        worktree_entry::Column::GitStatus,
 277                        worktree_entry::Column::ScanId,
 278                    ])
 279                    .to_owned(),
 280                )
 281                .exec(&*tx)
 282                .await?;
 283            }
 284
 285            if !update.removed_entries.is_empty() {
 286                worktree_entry::Entity::update_many()
 287                    .filter(
 288                        worktree_entry::Column::ProjectId
 289                            .eq(project_id)
 290                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 291                            .and(
 292                                worktree_entry::Column::Id
 293                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 294                            ),
 295                    )
 296                    .set(worktree_entry::ActiveModel {
 297                        is_deleted: ActiveValue::Set(true),
 298                        scan_id: ActiveValue::Set(update.scan_id as i64),
 299                        ..Default::default()
 300                    })
 301                    .exec(&*tx)
 302                    .await?;
 303            }
 304
 305            if !update.updated_repositories.is_empty() {
 306                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 307                    |repository| worktree_repository::ActiveModel {
 308                        project_id: ActiveValue::set(project_id),
 309                        worktree_id: ActiveValue::set(worktree_id),
 310                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 311                        scan_id: ActiveValue::set(update.scan_id as i64),
 312                        branch: ActiveValue::set(repository.branch.clone()),
 313                        is_deleted: ActiveValue::set(false),
 314                    },
 315                ))
 316                .on_conflict(
 317                    OnConflict::columns([
 318                        worktree_repository::Column::ProjectId,
 319                        worktree_repository::Column::WorktreeId,
 320                        worktree_repository::Column::WorkDirectoryId,
 321                    ])
 322                    .update_columns([
 323                        worktree_repository::Column::ScanId,
 324                        worktree_repository::Column::Branch,
 325                    ])
 326                    .to_owned(),
 327                )
 328                .exec(&*tx)
 329                .await?;
 330            }
 331
 332            if !update.removed_repositories.is_empty() {
 333                worktree_repository::Entity::update_many()
 334                    .filter(
 335                        worktree_repository::Column::ProjectId
 336                            .eq(project_id)
 337                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 338                            .and(
 339                                worktree_repository::Column::WorkDirectoryId
 340                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 341                            ),
 342                    )
 343                    .set(worktree_repository::ActiveModel {
 344                        is_deleted: ActiveValue::Set(true),
 345                        scan_id: ActiveValue::Set(update.scan_id as i64),
 346                        ..Default::default()
 347                    })
 348                    .exec(&*tx)
 349                    .await?;
 350            }
 351
 352            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 353            Ok(connection_ids)
 354        })
 355        .await
 356    }
 357
 358    /// Updates the diagnostic summary for the given connection.
 359    pub async fn update_diagnostic_summary(
 360        &self,
 361        update: &proto::UpdateDiagnosticSummary,
 362        connection: ConnectionId,
 363    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 364        let project_id = ProjectId::from_proto(update.project_id);
 365        let worktree_id = update.worktree_id as i64;
 366        let room_id = self.room_id_for_project(project_id).await?;
 367        self.room_transaction(room_id, |tx| async move {
 368            let summary = update
 369                .summary
 370                .as_ref()
 371                .ok_or_else(|| anyhow!("invalid summary"))?;
 372
 373            // Ensure the update comes from the host.
 374            let project = project::Entity::find_by_id(project_id)
 375                .one(&*tx)
 376                .await?
 377                .ok_or_else(|| anyhow!("no such project"))?;
 378            if project.host_connection()? != connection {
 379                return Err(anyhow!("can't update a project hosted by someone else"))?;
 380            }
 381
 382            // Update summary.
 383            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 384                project_id: ActiveValue::set(project_id),
 385                worktree_id: ActiveValue::set(worktree_id),
 386                path: ActiveValue::set(summary.path.clone()),
 387                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 388                error_count: ActiveValue::set(summary.error_count as i32),
 389                warning_count: ActiveValue::set(summary.warning_count as i32),
 390            })
 391            .on_conflict(
 392                OnConflict::columns([
 393                    worktree_diagnostic_summary::Column::ProjectId,
 394                    worktree_diagnostic_summary::Column::WorktreeId,
 395                    worktree_diagnostic_summary::Column::Path,
 396                ])
 397                .update_columns([
 398                    worktree_diagnostic_summary::Column::LanguageServerId,
 399                    worktree_diagnostic_summary::Column::ErrorCount,
 400                    worktree_diagnostic_summary::Column::WarningCount,
 401                ])
 402                .to_owned(),
 403            )
 404            .exec(&*tx)
 405            .await?;
 406
 407            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 408            Ok(connection_ids)
 409        })
 410        .await
 411    }
 412
 413    /// Starts the language server for the given connection.
 414    pub async fn start_language_server(
 415        &self,
 416        update: &proto::StartLanguageServer,
 417        connection: ConnectionId,
 418    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 419        let project_id = ProjectId::from_proto(update.project_id);
 420        let room_id = self.room_id_for_project(project_id).await?;
 421        self.room_transaction(room_id, |tx| async move {
 422            let server = update
 423                .server
 424                .as_ref()
 425                .ok_or_else(|| anyhow!("invalid language server"))?;
 426
 427            // Ensure the update comes from the host.
 428            let project = project::Entity::find_by_id(project_id)
 429                .one(&*tx)
 430                .await?
 431                .ok_or_else(|| anyhow!("no such project"))?;
 432            if project.host_connection()? != connection {
 433                return Err(anyhow!("can't update a project hosted by someone else"))?;
 434            }
 435
 436            // Add the newly-started language server.
 437            language_server::Entity::insert(language_server::ActiveModel {
 438                project_id: ActiveValue::set(project_id),
 439                id: ActiveValue::set(server.id as i64),
 440                name: ActiveValue::set(server.name.clone()),
 441            })
 442            .on_conflict(
 443                OnConflict::columns([
 444                    language_server::Column::ProjectId,
 445                    language_server::Column::Id,
 446                ])
 447                .update_column(language_server::Column::Name)
 448                .to_owned(),
 449            )
 450            .exec(&*tx)
 451            .await?;
 452
 453            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 454            Ok(connection_ids)
 455        })
 456        .await
 457    }
 458
 459    /// Updates the worktree settings for the given connection.
 460    pub async fn update_worktree_settings(
 461        &self,
 462        update: &proto::UpdateWorktreeSettings,
 463        connection: ConnectionId,
 464    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 465        let project_id = ProjectId::from_proto(update.project_id);
 466        let room_id = self.room_id_for_project(project_id).await?;
 467        self.room_transaction(room_id, |tx| async move {
 468            // Ensure the update comes from the host.
 469            let project = project::Entity::find_by_id(project_id)
 470                .one(&*tx)
 471                .await?
 472                .ok_or_else(|| anyhow!("no such project"))?;
 473            if project.host_connection()? != connection {
 474                return Err(anyhow!("can't update a project hosted by someone else"))?;
 475            }
 476
 477            if let Some(content) = &update.content {
 478                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 479                    project_id: ActiveValue::Set(project_id),
 480                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 481                    path: ActiveValue::Set(update.path.clone()),
 482                    content: ActiveValue::Set(content.clone()),
 483                })
 484                .on_conflict(
 485                    OnConflict::columns([
 486                        worktree_settings_file::Column::ProjectId,
 487                        worktree_settings_file::Column::WorktreeId,
 488                        worktree_settings_file::Column::Path,
 489                    ])
 490                    .update_column(worktree_settings_file::Column::Content)
 491                    .to_owned(),
 492                )
 493                .exec(&*tx)
 494                .await?;
 495            } else {
 496                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 497                    project_id: ActiveValue::Set(project_id),
 498                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 499                    path: ActiveValue::Set(update.path.clone()),
 500                    ..Default::default()
 501                })
 502                .exec(&*tx)
 503                .await?;
 504            }
 505
 506            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 507            Ok(connection_ids)
 508        })
 509        .await
 510    }
 511
 512    /// Adds the given connection to the specified hosted project
 513    pub async fn join_hosted_project(
 514        &self,
 515        id: HostedProjectId,
 516        user_id: UserId,
 517        connection: ConnectionId,
 518    ) -> Result<(Project, ReplicaId)> {
 519        self.transaction(|tx| async move {
 520            let (hosted_project, role) = self.get_hosted_project(id, user_id, &tx).await?;
 521            let project = project::Entity::find()
 522                .filter(project::Column::HostedProjectId.eq(hosted_project.id))
 523                .one(&*tx)
 524                .await?
 525                .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
 526
 527            self.join_project_internal(project, user_id, connection, role, &tx)
 528                .await
 529        })
 530        .await
 531    }
 532
 533    /// Adds the given connection to the specified project
 534    /// in the current room.
 535    pub async fn join_project_in_room(
 536        &self,
 537        project_id: ProjectId,
 538        connection: ConnectionId,
 539    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
 540        let room_id = self.room_id_for_project(project_id).await?;
 541        self.room_transaction(room_id, |tx| async move {
 542            let participant = room_participant::Entity::find()
 543                .filter(
 544                    Condition::all()
 545                        .add(
 546                            room_participant::Column::AnsweringConnectionId
 547                                .eq(connection.id as i32),
 548                        )
 549                        .add(
 550                            room_participant::Column::AnsweringConnectionServerId
 551                                .eq(connection.owner_id as i32),
 552                        ),
 553                )
 554                .one(&*tx)
 555                .await?
 556                .ok_or_else(|| anyhow!("must join a room first"))?;
 557
 558            let project = project::Entity::find_by_id(project_id)
 559                .one(&*tx)
 560                .await?
 561                .ok_or_else(|| anyhow!("no such project"))?;
 562            if project.room_id != Some(participant.room_id) {
 563                return Err(anyhow!("no such project"))?;
 564            }
 565            self.join_project_internal(
 566                project,
 567                participant.user_id,
 568                connection,
 569                participant.role.unwrap_or(ChannelRole::Member),
 570                &tx,
 571            )
 572            .await
 573        })
 574        .await
 575    }
 576
 577    async fn join_project_internal(
 578        &self,
 579        project: project::Model,
 580        user_id: UserId,
 581        connection: ConnectionId,
 582        role: ChannelRole,
 583        tx: &DatabaseTransaction,
 584    ) -> Result<(Project, ReplicaId)> {
 585        let mut collaborators = project
 586            .find_related(project_collaborator::Entity)
 587            .all(tx)
 588            .await?;
 589        let replica_ids = collaborators
 590            .iter()
 591            .map(|c| c.replica_id)
 592            .collect::<HashSet<_>>();
 593        let mut replica_id = ReplicaId(1);
 594        while replica_ids.contains(&replica_id) {
 595            replica_id.0 += 1;
 596        }
 597        let new_collaborator = project_collaborator::ActiveModel {
 598            project_id: ActiveValue::set(project.id),
 599            connection_id: ActiveValue::set(connection.id as i32),
 600            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 601            user_id: ActiveValue::set(user_id),
 602            replica_id: ActiveValue::set(replica_id),
 603            is_host: ActiveValue::set(false),
 604            ..Default::default()
 605        }
 606        .insert(tx)
 607        .await?;
 608        collaborators.push(new_collaborator);
 609
 610        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 611        let mut worktrees = db_worktrees
 612            .into_iter()
 613            .map(|db_worktree| {
 614                (
 615                    db_worktree.id as u64,
 616                    Worktree {
 617                        id: db_worktree.id as u64,
 618                        abs_path: db_worktree.abs_path,
 619                        root_name: db_worktree.root_name,
 620                        visible: db_worktree.visible,
 621                        entries: Default::default(),
 622                        repository_entries: Default::default(),
 623                        diagnostic_summaries: Default::default(),
 624                        settings_files: Default::default(),
 625                        scan_id: db_worktree.scan_id as u64,
 626                        completed_scan_id: db_worktree.completed_scan_id as u64,
 627                    },
 628                )
 629            })
 630            .collect::<BTreeMap<_, _>>();
 631
 632        // Populate worktree entries.
 633        {
 634            let mut db_entries = worktree_entry::Entity::find()
 635                .filter(
 636                    Condition::all()
 637                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 638                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 639                )
 640                .stream(tx)
 641                .await?;
 642            while let Some(db_entry) = db_entries.next().await {
 643                let db_entry = db_entry?;
 644                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 645                    worktree.entries.push(proto::Entry {
 646                        id: db_entry.id as u64,
 647                        is_dir: db_entry.is_dir,
 648                        path: db_entry.path,
 649                        inode: db_entry.inode as u64,
 650                        mtime: Some(proto::Timestamp {
 651                            seconds: db_entry.mtime_seconds as u64,
 652                            nanos: db_entry.mtime_nanos as u32,
 653                        }),
 654                        is_symlink: db_entry.is_symlink,
 655                        is_ignored: db_entry.is_ignored,
 656                        is_external: db_entry.is_external,
 657                        git_status: db_entry.git_status.map(|status| status as i32),
 658                    });
 659                }
 660            }
 661        }
 662
 663        // Populate repository entries.
 664        {
 665            let mut db_repository_entries = worktree_repository::Entity::find()
 666                .filter(
 667                    Condition::all()
 668                        .add(worktree_repository::Column::ProjectId.eq(project.id))
 669                        .add(worktree_repository::Column::IsDeleted.eq(false)),
 670                )
 671                .stream(tx)
 672                .await?;
 673            while let Some(db_repository_entry) = db_repository_entries.next().await {
 674                let db_repository_entry = db_repository_entry?;
 675                if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 676                {
 677                    worktree.repository_entries.insert(
 678                        db_repository_entry.work_directory_id as u64,
 679                        proto::RepositoryEntry {
 680                            work_directory_id: db_repository_entry.work_directory_id as u64,
 681                            branch: db_repository_entry.branch,
 682                        },
 683                    );
 684                }
 685            }
 686        }
 687
 688        // Populate worktree diagnostic summaries.
 689        {
 690            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 691                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 692                .stream(tx)
 693                .await?;
 694            while let Some(db_summary) = db_summaries.next().await {
 695                let db_summary = db_summary?;
 696                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 697                    worktree
 698                        .diagnostic_summaries
 699                        .push(proto::DiagnosticSummary {
 700                            path: db_summary.path,
 701                            language_server_id: db_summary.language_server_id as u64,
 702                            error_count: db_summary.error_count as u32,
 703                            warning_count: db_summary.warning_count as u32,
 704                        });
 705                }
 706            }
 707        }
 708
 709        // Populate worktree settings files
 710        {
 711            let mut db_settings_files = worktree_settings_file::Entity::find()
 712                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 713                .stream(tx)
 714                .await?;
 715            while let Some(db_settings_file) = db_settings_files.next().await {
 716                let db_settings_file = db_settings_file?;
 717                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 718                    worktree.settings_files.push(WorktreeSettingsFile {
 719                        path: db_settings_file.path,
 720                        content: db_settings_file.content,
 721                    });
 722                }
 723            }
 724        }
 725
 726        // Populate language servers.
 727        let language_servers = project
 728            .find_related(language_server::Entity)
 729            .all(tx)
 730            .await?;
 731
 732        let project = Project {
 733            id: project.id,
 734            role,
 735            collaborators: collaborators
 736                .into_iter()
 737                .map(|collaborator| ProjectCollaborator {
 738                    connection_id: collaborator.connection(),
 739                    user_id: collaborator.user_id,
 740                    replica_id: collaborator.replica_id,
 741                    is_host: collaborator.is_host,
 742                })
 743                .collect(),
 744            worktrees,
 745            language_servers: language_servers
 746                .into_iter()
 747                .map(|language_server| proto::LanguageServer {
 748                    id: language_server.id as u64,
 749                    name: language_server.name,
 750                })
 751                .collect(),
 752        };
 753        Ok((project, replica_id as ReplicaId))
 754    }
 755
 756    pub async fn leave_hosted_project(
 757        &self,
 758        project_id: ProjectId,
 759        connection: ConnectionId,
 760    ) -> Result<LeftProject> {
 761        self.transaction(|tx| async move {
 762            let result = project_collaborator::Entity::delete_many()
 763                .filter(
 764                    Condition::all()
 765                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 766                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 767                        .add(
 768                            project_collaborator::Column::ConnectionServerId
 769                                .eq(connection.owner_id as i32),
 770                        ),
 771                )
 772                .exec(&*tx)
 773                .await?;
 774            if result.rows_affected == 0 {
 775                return Err(anyhow!("not in the project"))?;
 776            }
 777
 778            let project = project::Entity::find_by_id(project_id)
 779                .one(&*tx)
 780                .await?
 781                .ok_or_else(|| anyhow!("no such project"))?;
 782            let collaborators = project
 783                .find_related(project_collaborator::Entity)
 784                .all(&*tx)
 785                .await?;
 786            let connection_ids = collaborators
 787                .into_iter()
 788                .map(|collaborator| collaborator.connection())
 789                .collect();
 790            Ok(LeftProject {
 791                id: project.id,
 792                connection_ids,
 793                host_user_id: None,
 794                host_connection_id: None,
 795            })
 796        })
 797        .await
 798    }
 799
 800    /// Removes the given connection from the specified project.
 801    pub async fn leave_project(
 802        &self,
 803        project_id: ProjectId,
 804        connection: ConnectionId,
 805    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
 806        let room_id = self.room_id_for_project(project_id).await?;
 807        self.room_transaction(room_id, |tx| async move {
 808            let result = project_collaborator::Entity::delete_many()
 809                .filter(
 810                    Condition::all()
 811                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 812                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 813                        .add(
 814                            project_collaborator::Column::ConnectionServerId
 815                                .eq(connection.owner_id as i32),
 816                        ),
 817                )
 818                .exec(&*tx)
 819                .await?;
 820            if result.rows_affected == 0 {
 821                Err(anyhow!("not a collaborator on this project"))?;
 822            }
 823
 824            let project = project::Entity::find_by_id(project_id)
 825                .one(&*tx)
 826                .await?
 827                .ok_or_else(|| anyhow!("no such project"))?;
 828            let collaborators = project
 829                .find_related(project_collaborator::Entity)
 830                .all(&*tx)
 831                .await?;
 832            let connection_ids = collaborators
 833                .into_iter()
 834                .map(|collaborator| collaborator.connection())
 835                .collect();
 836
 837            follower::Entity::delete_many()
 838                .filter(
 839                    Condition::any()
 840                        .add(
 841                            Condition::all()
 842                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 843                                .add(
 844                                    follower::Column::LeaderConnectionServerId
 845                                        .eq(connection.owner_id),
 846                                )
 847                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 848                        )
 849                        .add(
 850                            Condition::all()
 851                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 852                                .add(
 853                                    follower::Column::FollowerConnectionServerId
 854                                        .eq(connection.owner_id),
 855                                )
 856                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 857                        ),
 858                )
 859                .exec(&*tx)
 860                .await?;
 861
 862            let room = self.get_room(room_id, &tx).await?;
 863            let left_project = LeftProject {
 864                id: project_id,
 865                host_user_id: project.host_user_id,
 866                host_connection_id: Some(project.host_connection()?),
 867                connection_ids,
 868            };
 869            Ok((room, left_project))
 870        })
 871        .await
 872    }
 873
 874    pub async fn check_user_is_project_host(
 875        &self,
 876        project_id: ProjectId,
 877        connection_id: ConnectionId,
 878    ) -> Result<()> {
 879        let room_id = self.room_id_for_project(project_id).await?;
 880        self.room_transaction(room_id, |tx| async move {
 881            project_collaborator::Entity::find()
 882                .filter(
 883                    Condition::all()
 884                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 885                        .add(project_collaborator::Column::IsHost.eq(true))
 886                        .add(project_collaborator::Column::ConnectionId.eq(connection_id.id))
 887                        .add(
 888                            project_collaborator::Column::ConnectionServerId
 889                                .eq(connection_id.owner_id),
 890                        ),
 891                )
 892                .one(&*tx)
 893                .await?
 894                .ok_or_else(|| anyhow!("failed to read project host"))?;
 895
 896            Ok(())
 897        })
 898        .await
 899        .map(|guard| guard.into_inner())
 900    }
 901
 902    /// Returns the host connection for a read-only request to join a shared project.
 903    pub async fn host_for_read_only_project_request(
 904        &self,
 905        project_id: ProjectId,
 906        connection_id: ConnectionId,
 907    ) -> Result<ConnectionId> {
 908        let room_id = self.room_id_for_project(project_id).await?;
 909        self.room_transaction(room_id, |tx| async move {
 910            let current_participant = room_participant::Entity::find()
 911                .filter(room_participant::Column::RoomId.eq(room_id))
 912                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 913                .one(&*tx)
 914                .await?
 915                .ok_or_else(|| anyhow!("no such room"))?;
 916
 917            if !current_participant
 918                .role
 919                .map_or(false, |role| role.can_read_projects())
 920            {
 921                Err(anyhow!("not authorized to read projects"))?;
 922            }
 923
 924            let host = project_collaborator::Entity::find()
 925                .filter(
 926                    project_collaborator::Column::ProjectId
 927                        .eq(project_id)
 928                        .and(project_collaborator::Column::IsHost.eq(true)),
 929                )
 930                .one(&*tx)
 931                .await?
 932                .ok_or_else(|| anyhow!("failed to read project host"))?;
 933
 934            Ok(host.connection())
 935        })
 936        .await
 937        .map(|guard| guard.into_inner())
 938    }
 939
 940    /// Returns the host connection for a request to join a shared project.
 941    pub async fn host_for_mutating_project_request(
 942        &self,
 943        project_id: ProjectId,
 944        connection_id: ConnectionId,
 945    ) -> Result<ConnectionId> {
 946        let room_id = self.room_id_for_project(project_id).await?;
 947        self.room_transaction(room_id, |tx| async move {
 948            let current_participant = room_participant::Entity::find()
 949                .filter(room_participant::Column::RoomId.eq(room_id))
 950                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 951                .one(&*tx)
 952                .await?
 953                .ok_or_else(|| anyhow!("no such room"))?;
 954
 955            if !current_participant
 956                .role
 957                .map_or(false, |role| role.can_edit_projects())
 958            {
 959                Err(anyhow!("not authorized to edit projects"))?;
 960            }
 961
 962            let host = project_collaborator::Entity::find()
 963                .filter(
 964                    project_collaborator::Column::ProjectId
 965                        .eq(project_id)
 966                        .and(project_collaborator::Column::IsHost.eq(true)),
 967                )
 968                .one(&*tx)
 969                .await?
 970                .ok_or_else(|| anyhow!("failed to read project host"))?;
 971
 972            Ok(host.connection())
 973        })
 974        .await
 975        .map(|guard| guard.into_inner())
 976    }
 977
 978    pub async fn project_collaborators_for_buffer_update(
 979        &self,
 980        project_id: ProjectId,
 981        connection_id: ConnectionId,
 982        requires_write: bool,
 983    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
 984        let room_id = self.room_id_for_project(project_id).await?;
 985        self.room_transaction(room_id, |tx| async move {
 986            let current_participant = room_participant::Entity::find()
 987                .filter(room_participant::Column::RoomId.eq(room_id))
 988                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
 989                .one(&*tx)
 990                .await?
 991                .ok_or_else(|| anyhow!("no such room"))?;
 992
 993            if requires_write
 994                && !current_participant
 995                    .role
 996                    .map_or(false, |role| role.can_edit_projects())
 997            {
 998                Err(anyhow!("not authorized to edit projects"))?;
 999            }
1000
1001            let collaborators = project_collaborator::Entity::find()
1002                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1003                .all(&*tx)
1004                .await?
1005                .into_iter()
1006                .map(|collaborator| ProjectCollaborator {
1007                    connection_id: collaborator.connection(),
1008                    user_id: collaborator.user_id,
1009                    replica_id: collaborator.replica_id,
1010                    is_host: collaborator.is_host,
1011                })
1012                .collect::<Vec<_>>();
1013
1014            if collaborators
1015                .iter()
1016                .any(|collaborator| collaborator.connection_id == connection_id)
1017            {
1018                Ok(collaborators)
1019            } else {
1020                Err(anyhow!("no such project"))?
1021            }
1022        })
1023        .await
1024    }
1025
1026    /// Returns the connection IDs in the given project.
1027    ///
1028    /// The provided `connection_id` must also be a collaborator in the project,
1029    /// otherwise an error will be returned.
1030    pub async fn project_connection_ids(
1031        &self,
1032        project_id: ProjectId,
1033        connection_id: ConnectionId,
1034    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1035        let room_id = self.room_id_for_project(project_id).await?;
1036        self.room_transaction(room_id, |tx| async move {
1037            let mut collaborators = project_collaborator::Entity::find()
1038                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1039                .stream(&*tx)
1040                .await?;
1041
1042            let mut connection_ids = HashSet::default();
1043            while let Some(collaborator) = collaborators.next().await {
1044                let collaborator = collaborator?;
1045                connection_ids.insert(collaborator.connection());
1046            }
1047
1048            if connection_ids.contains(&connection_id) {
1049                Ok(connection_ids)
1050            } else {
1051                Err(anyhow!("no such project"))?
1052            }
1053        })
1054        .await
1055    }
1056
1057    async fn project_guest_connection_ids(
1058        &self,
1059        project_id: ProjectId,
1060        tx: &DatabaseTransaction,
1061    ) -> Result<Vec<ConnectionId>> {
1062        let mut collaborators = project_collaborator::Entity::find()
1063            .filter(
1064                project_collaborator::Column::ProjectId
1065                    .eq(project_id)
1066                    .and(project_collaborator::Column::IsHost.eq(false)),
1067            )
1068            .stream(tx)
1069            .await?;
1070
1071        let mut guest_connection_ids = Vec::new();
1072        while let Some(collaborator) = collaborators.next().await {
1073            let collaborator = collaborator?;
1074            guest_connection_ids.push(collaborator.connection());
1075        }
1076        Ok(guest_connection_ids)
1077    }
1078
1079    /// Returns the [`RoomId`] for the given project.
1080    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
1081        self.transaction(|tx| async move {
1082            let project = project::Entity::find_by_id(project_id)
1083                .one(&*tx)
1084                .await?
1085                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
1086            Ok(project
1087                .room_id
1088                .ok_or_else(|| anyhow!("project not in room"))?)
1089        })
1090        .await
1091    }
1092
1093    pub async fn check_room_participants(
1094        &self,
1095        room_id: RoomId,
1096        leader_id: ConnectionId,
1097        follower_id: ConnectionId,
1098    ) -> Result<()> {
1099        self.transaction(|tx| async move {
1100            use room_participant::Column;
1101
1102            let count = room_participant::Entity::find()
1103                .filter(
1104                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1105                        Condition::any()
1106                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1107                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1108                            ))
1109                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1110                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1111                            )),
1112                    ),
1113                )
1114                .count(&*tx)
1115                .await?;
1116
1117            if count < 2 {
1118                Err(anyhow!("not room participants"))?;
1119            }
1120
1121            Ok(())
1122        })
1123        .await
1124    }
1125
1126    /// Adds the given follower connection as a follower of the given leader connection.
1127    pub async fn follow(
1128        &self,
1129        room_id: RoomId,
1130        project_id: ProjectId,
1131        leader_connection: ConnectionId,
1132        follower_connection: ConnectionId,
1133    ) -> Result<RoomGuard<proto::Room>> {
1134        self.room_transaction(room_id, |tx| async move {
1135            follower::ActiveModel {
1136                room_id: ActiveValue::set(room_id),
1137                project_id: ActiveValue::set(project_id),
1138                leader_connection_server_id: ActiveValue::set(ServerId(
1139                    leader_connection.owner_id as i32,
1140                )),
1141                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1142                follower_connection_server_id: ActiveValue::set(ServerId(
1143                    follower_connection.owner_id as i32,
1144                )),
1145                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1146                ..Default::default()
1147            }
1148            .insert(&*tx)
1149            .await?;
1150
1151            let room = self.get_room(room_id, &tx).await?;
1152            Ok(room)
1153        })
1154        .await
1155    }
1156
1157    /// Removes the given follower connection as a follower of the given leader connection.
1158    pub async fn unfollow(
1159        &self,
1160        room_id: RoomId,
1161        project_id: ProjectId,
1162        leader_connection: ConnectionId,
1163        follower_connection: ConnectionId,
1164    ) -> Result<RoomGuard<proto::Room>> {
1165        self.room_transaction(room_id, |tx| async move {
1166            follower::Entity::delete_many()
1167                .filter(
1168                    Condition::all()
1169                        .add(follower::Column::RoomId.eq(room_id))
1170                        .add(follower::Column::ProjectId.eq(project_id))
1171                        .add(
1172                            follower::Column::LeaderConnectionServerId
1173                                .eq(leader_connection.owner_id),
1174                        )
1175                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1176                        .add(
1177                            follower::Column::FollowerConnectionServerId
1178                                .eq(follower_connection.owner_id),
1179                        )
1180                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1181                )
1182                .exec(&*tx)
1183                .await?;
1184
1185            let room = self.get_room(room_id, &tx).await?;
1186            Ok(room)
1187        })
1188        .await
1189    }
1190}