projects.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
   5        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
   6        enum QueryAs {
   7            Count,
   8        }
   9
  10        self.transaction(|tx| async move {
  11            Ok(project::Entity::find()
  12                .select_only()
  13                .column_as(project::Column::Id.count(), QueryAs::Count)
  14                .inner_join(user::Entity)
  15                .filter(user::Column::Admin.eq(false))
  16                .into_values::<_, QueryAs>()
  17                .one(&*tx)
  18                .await?
  19                .unwrap_or(0i64) as usize)
  20        })
  21        .await
  22    }
  23
  24    pub async fn share_project(
  25        &self,
  26        room_id: RoomId,
  27        connection: ConnectionId,
  28        worktrees: &[proto::WorktreeMetadata],
  29    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
  30        self.room_transaction(room_id, |tx| async move {
  31            let participant = room_participant::Entity::find()
  32                .filter(
  33                    Condition::all()
  34                        .add(
  35                            room_participant::Column::AnsweringConnectionId
  36                                .eq(connection.id as i32),
  37                        )
  38                        .add(
  39                            room_participant::Column::AnsweringConnectionServerId
  40                                .eq(connection.owner_id as i32),
  41                        ),
  42                )
  43                .one(&*tx)
  44                .await?
  45                .ok_or_else(|| anyhow!("could not find participant"))?;
  46            if participant.room_id != room_id {
  47                return Err(anyhow!("shared project on unexpected room"))?;
  48            }
  49
  50            let project = project::ActiveModel {
  51                room_id: ActiveValue::set(participant.room_id),
  52                host_user_id: ActiveValue::set(participant.user_id),
  53                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  54                host_connection_server_id: ActiveValue::set(Some(ServerId(
  55                    connection.owner_id as i32,
  56                ))),
  57                ..Default::default()
  58            }
  59            .insert(&*tx)
  60            .await?;
  61
  62            if !worktrees.is_empty() {
  63                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  64                    worktree::ActiveModel {
  65                        id: ActiveValue::set(worktree.id as i64),
  66                        project_id: ActiveValue::set(project.id),
  67                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  68                        root_name: ActiveValue::set(worktree.root_name.clone()),
  69                        visible: ActiveValue::set(worktree.visible),
  70                        scan_id: ActiveValue::set(0),
  71                        completed_scan_id: ActiveValue::set(0),
  72                    }
  73                }))
  74                .exec(&*tx)
  75                .await?;
  76            }
  77
  78            project_collaborator::ActiveModel {
  79                project_id: ActiveValue::set(project.id),
  80                connection_id: ActiveValue::set(connection.id as i32),
  81                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  82                user_id: ActiveValue::set(participant.user_id),
  83                replica_id: ActiveValue::set(ReplicaId(0)),
  84                is_host: ActiveValue::set(true),
  85                ..Default::default()
  86            }
  87            .insert(&*tx)
  88            .await?;
  89
  90            let room = self.get_room(room_id, &tx).await?;
  91            Ok((project.id, room))
  92        })
  93        .await
  94    }
  95
  96    pub async fn unshare_project(
  97        &self,
  98        project_id: ProjectId,
  99        connection: ConnectionId,
 100    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
 101        let room_id = self.room_id_for_project(project_id).await?;
 102        self.room_transaction(room_id, |tx| async move {
 103            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 104
 105            let project = project::Entity::find_by_id(project_id)
 106                .one(&*tx)
 107                .await?
 108                .ok_or_else(|| anyhow!("project not found"))?;
 109            if project.host_connection()? == connection {
 110                project::Entity::delete(project.into_active_model())
 111                    .exec(&*tx)
 112                    .await?;
 113                let room = self.get_room(room_id, &tx).await?;
 114                Ok((room, guest_connection_ids))
 115            } else {
 116                Err(anyhow!("cannot unshare a project hosted by another user"))?
 117            }
 118        })
 119        .await
 120    }
 121
 122    pub async fn update_project(
 123        &self,
 124        project_id: ProjectId,
 125        connection: ConnectionId,
 126        worktrees: &[proto::WorktreeMetadata],
 127    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
 128        let room_id = self.room_id_for_project(project_id).await?;
 129        self.room_transaction(room_id, |tx| async move {
 130            let project = project::Entity::find_by_id(project_id)
 131                .filter(
 132                    Condition::all()
 133                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 134                        .add(
 135                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 136                        ),
 137                )
 138                .one(&*tx)
 139                .await?
 140                .ok_or_else(|| anyhow!("no such project"))?;
 141
 142            self.update_project_worktrees(project.id, worktrees, &tx)
 143                .await?;
 144
 145            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 146            let room = self.get_room(project.room_id, &tx).await?;
 147            Ok((room, guest_connection_ids))
 148        })
 149        .await
 150    }
 151
 152    pub(in crate::db) async fn update_project_worktrees(
 153        &self,
 154        project_id: ProjectId,
 155        worktrees: &[proto::WorktreeMetadata],
 156        tx: &DatabaseTransaction,
 157    ) -> Result<()> {
 158        if !worktrees.is_empty() {
 159            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 160                id: ActiveValue::set(worktree.id as i64),
 161                project_id: ActiveValue::set(project_id),
 162                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 163                root_name: ActiveValue::set(worktree.root_name.clone()),
 164                visible: ActiveValue::set(worktree.visible),
 165                scan_id: ActiveValue::set(0),
 166                completed_scan_id: ActiveValue::set(0),
 167            }))
 168            .on_conflict(
 169                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 170                    .update_column(worktree::Column::RootName)
 171                    .to_owned(),
 172            )
 173            .exec(&*tx)
 174            .await?;
 175        }
 176
 177        worktree::Entity::delete_many()
 178            .filter(worktree::Column::ProjectId.eq(project_id).and(
 179                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 180            ))
 181            .exec(&*tx)
 182            .await?;
 183
 184        Ok(())
 185    }
 186
 187    pub async fn update_worktree(
 188        &self,
 189        update: &proto::UpdateWorktree,
 190        connection: ConnectionId,
 191    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 192        let project_id = ProjectId::from_proto(update.project_id);
 193        let worktree_id = update.worktree_id as i64;
 194        let room_id = self.room_id_for_project(project_id).await?;
 195        self.room_transaction(room_id, |tx| async move {
 196            // Ensure the update comes from the host.
 197            let _project = project::Entity::find_by_id(project_id)
 198                .filter(
 199                    Condition::all()
 200                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 201                        .add(
 202                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 203                        ),
 204                )
 205                .one(&*tx)
 206                .await?
 207                .ok_or_else(|| anyhow!("no such project"))?;
 208
 209            // Update metadata.
 210            worktree::Entity::update(worktree::ActiveModel {
 211                id: ActiveValue::set(worktree_id),
 212                project_id: ActiveValue::set(project_id),
 213                root_name: ActiveValue::set(update.root_name.clone()),
 214                scan_id: ActiveValue::set(update.scan_id as i64),
 215                completed_scan_id: if update.is_last_update {
 216                    ActiveValue::set(update.scan_id as i64)
 217                } else {
 218                    ActiveValue::default()
 219                },
 220                abs_path: ActiveValue::set(update.abs_path.clone()),
 221                ..Default::default()
 222            })
 223            .exec(&*tx)
 224            .await?;
 225
 226            if !update.updated_entries.is_empty() {
 227                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 228                    let mtime = entry.mtime.clone().unwrap_or_default();
 229                    worktree_entry::ActiveModel {
 230                        project_id: ActiveValue::set(project_id),
 231                        worktree_id: ActiveValue::set(worktree_id),
 232                        id: ActiveValue::set(entry.id as i64),
 233                        is_dir: ActiveValue::set(entry.is_dir),
 234                        path: ActiveValue::set(entry.path.clone()),
 235                        inode: ActiveValue::set(entry.inode as i64),
 236                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 237                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 238                        is_symlink: ActiveValue::set(entry.is_symlink),
 239                        is_ignored: ActiveValue::set(entry.is_ignored),
 240                        is_external: ActiveValue::set(entry.is_external),
 241                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
 242                        is_deleted: ActiveValue::set(false),
 243                        scan_id: ActiveValue::set(update.scan_id as i64),
 244                    }
 245                }))
 246                .on_conflict(
 247                    OnConflict::columns([
 248                        worktree_entry::Column::ProjectId,
 249                        worktree_entry::Column::WorktreeId,
 250                        worktree_entry::Column::Id,
 251                    ])
 252                    .update_columns([
 253                        worktree_entry::Column::IsDir,
 254                        worktree_entry::Column::Path,
 255                        worktree_entry::Column::Inode,
 256                        worktree_entry::Column::MtimeSeconds,
 257                        worktree_entry::Column::MtimeNanos,
 258                        worktree_entry::Column::IsSymlink,
 259                        worktree_entry::Column::IsIgnored,
 260                        worktree_entry::Column::GitStatus,
 261                        worktree_entry::Column::ScanId,
 262                    ])
 263                    .to_owned(),
 264                )
 265                .exec(&*tx)
 266                .await?;
 267            }
 268
 269            if !update.removed_entries.is_empty() {
 270                worktree_entry::Entity::update_many()
 271                    .filter(
 272                        worktree_entry::Column::ProjectId
 273                            .eq(project_id)
 274                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 275                            .and(
 276                                worktree_entry::Column::Id
 277                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 278                            ),
 279                    )
 280                    .set(worktree_entry::ActiveModel {
 281                        is_deleted: ActiveValue::Set(true),
 282                        scan_id: ActiveValue::Set(update.scan_id as i64),
 283                        ..Default::default()
 284                    })
 285                    .exec(&*tx)
 286                    .await?;
 287            }
 288
 289            if !update.updated_repositories.is_empty() {
 290                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
 291                    |repository| worktree_repository::ActiveModel {
 292                        project_id: ActiveValue::set(project_id),
 293                        worktree_id: ActiveValue::set(worktree_id),
 294                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
 295                        scan_id: ActiveValue::set(update.scan_id as i64),
 296                        branch: ActiveValue::set(repository.branch.clone()),
 297                        is_deleted: ActiveValue::set(false),
 298                    },
 299                ))
 300                .on_conflict(
 301                    OnConflict::columns([
 302                        worktree_repository::Column::ProjectId,
 303                        worktree_repository::Column::WorktreeId,
 304                        worktree_repository::Column::WorkDirectoryId,
 305                    ])
 306                    .update_columns([
 307                        worktree_repository::Column::ScanId,
 308                        worktree_repository::Column::Branch,
 309                    ])
 310                    .to_owned(),
 311                )
 312                .exec(&*tx)
 313                .await?;
 314            }
 315
 316            if !update.removed_repositories.is_empty() {
 317                worktree_repository::Entity::update_many()
 318                    .filter(
 319                        worktree_repository::Column::ProjectId
 320                            .eq(project_id)
 321                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
 322                            .and(
 323                                worktree_repository::Column::WorkDirectoryId
 324                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
 325                            ),
 326                    )
 327                    .set(worktree_repository::ActiveModel {
 328                        is_deleted: ActiveValue::Set(true),
 329                        scan_id: ActiveValue::Set(update.scan_id as i64),
 330                        ..Default::default()
 331                    })
 332                    .exec(&*tx)
 333                    .await?;
 334            }
 335
 336            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 337            Ok(connection_ids)
 338        })
 339        .await
 340    }
 341
 342    pub async fn update_diagnostic_summary(
 343        &self,
 344        update: &proto::UpdateDiagnosticSummary,
 345        connection: ConnectionId,
 346    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 347        let project_id = ProjectId::from_proto(update.project_id);
 348        let worktree_id = update.worktree_id as i64;
 349        let room_id = self.room_id_for_project(project_id).await?;
 350        self.room_transaction(room_id, |tx| async move {
 351            let summary = update
 352                .summary
 353                .as_ref()
 354                .ok_or_else(|| anyhow!("invalid summary"))?;
 355
 356            // Ensure the update comes from the host.
 357            let project = project::Entity::find_by_id(project_id)
 358                .one(&*tx)
 359                .await?
 360                .ok_or_else(|| anyhow!("no such project"))?;
 361            if project.host_connection()? != connection {
 362                return Err(anyhow!("can't update a project hosted by someone else"))?;
 363            }
 364
 365            // Update summary.
 366            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 367                project_id: ActiveValue::set(project_id),
 368                worktree_id: ActiveValue::set(worktree_id),
 369                path: ActiveValue::set(summary.path.clone()),
 370                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 371                error_count: ActiveValue::set(summary.error_count as i32),
 372                warning_count: ActiveValue::set(summary.warning_count as i32),
 373                ..Default::default()
 374            })
 375            .on_conflict(
 376                OnConflict::columns([
 377                    worktree_diagnostic_summary::Column::ProjectId,
 378                    worktree_diagnostic_summary::Column::WorktreeId,
 379                    worktree_diagnostic_summary::Column::Path,
 380                ])
 381                .update_columns([
 382                    worktree_diagnostic_summary::Column::LanguageServerId,
 383                    worktree_diagnostic_summary::Column::ErrorCount,
 384                    worktree_diagnostic_summary::Column::WarningCount,
 385                ])
 386                .to_owned(),
 387            )
 388            .exec(&*tx)
 389            .await?;
 390
 391            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 392            Ok(connection_ids)
 393        })
 394        .await
 395    }
 396
 397    pub async fn start_language_server(
 398        &self,
 399        update: &proto::StartLanguageServer,
 400        connection: ConnectionId,
 401    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 402        let project_id = ProjectId::from_proto(update.project_id);
 403        let room_id = self.room_id_for_project(project_id).await?;
 404        self.room_transaction(room_id, |tx| async move {
 405            let server = update
 406                .server
 407                .as_ref()
 408                .ok_or_else(|| anyhow!("invalid language server"))?;
 409
 410            // Ensure the update comes from the host.
 411            let project = project::Entity::find_by_id(project_id)
 412                .one(&*tx)
 413                .await?
 414                .ok_or_else(|| anyhow!("no such project"))?;
 415            if project.host_connection()? != connection {
 416                return Err(anyhow!("can't update a project hosted by someone else"))?;
 417            }
 418
 419            // Add the newly-started language server.
 420            language_server::Entity::insert(language_server::ActiveModel {
 421                project_id: ActiveValue::set(project_id),
 422                id: ActiveValue::set(server.id as i64),
 423                name: ActiveValue::set(server.name.clone()),
 424                ..Default::default()
 425            })
 426            .on_conflict(
 427                OnConflict::columns([
 428                    language_server::Column::ProjectId,
 429                    language_server::Column::Id,
 430                ])
 431                .update_column(language_server::Column::Name)
 432                .to_owned(),
 433            )
 434            .exec(&*tx)
 435            .await?;
 436
 437            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 438            Ok(connection_ids)
 439        })
 440        .await
 441    }
 442
 443    pub async fn update_worktree_settings(
 444        &self,
 445        update: &proto::UpdateWorktreeSettings,
 446        connection: ConnectionId,
 447    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
 448        let project_id = ProjectId::from_proto(update.project_id);
 449        let room_id = self.room_id_for_project(project_id).await?;
 450        self.room_transaction(room_id, |tx| async move {
 451            // Ensure the update comes from the host.
 452            let project = project::Entity::find_by_id(project_id)
 453                .one(&*tx)
 454                .await?
 455                .ok_or_else(|| anyhow!("no such project"))?;
 456            if project.host_connection()? != connection {
 457                return Err(anyhow!("can't update a project hosted by someone else"))?;
 458            }
 459
 460            if let Some(content) = &update.content {
 461                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 462                    project_id: ActiveValue::Set(project_id),
 463                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 464                    path: ActiveValue::Set(update.path.clone()),
 465                    content: ActiveValue::Set(content.clone()),
 466                })
 467                .on_conflict(
 468                    OnConflict::columns([
 469                        worktree_settings_file::Column::ProjectId,
 470                        worktree_settings_file::Column::WorktreeId,
 471                        worktree_settings_file::Column::Path,
 472                    ])
 473                    .update_column(worktree_settings_file::Column::Content)
 474                    .to_owned(),
 475                )
 476                .exec(&*tx)
 477                .await?;
 478            } else {
 479                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 480                    project_id: ActiveValue::Set(project_id),
 481                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 482                    path: ActiveValue::Set(update.path.clone()),
 483                    ..Default::default()
 484                })
 485                .exec(&*tx)
 486                .await?;
 487            }
 488
 489            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 490            Ok(connection_ids)
 491        })
 492        .await
 493    }
 494
 495    pub async fn join_project(
 496        &self,
 497        project_id: ProjectId,
 498        connection: ConnectionId,
 499    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
 500        let room_id = self.room_id_for_project(project_id).await?;
 501        self.room_transaction(room_id, |tx| async move {
 502            let participant = room_participant::Entity::find()
 503                .filter(
 504                    Condition::all()
 505                        .add(
 506                            room_participant::Column::AnsweringConnectionId
 507                                .eq(connection.id as i32),
 508                        )
 509                        .add(
 510                            room_participant::Column::AnsweringConnectionServerId
 511                                .eq(connection.owner_id as i32),
 512                        ),
 513                )
 514                .one(&*tx)
 515                .await?
 516                .ok_or_else(|| anyhow!("must join a room first"))?;
 517
 518            let project = project::Entity::find_by_id(project_id)
 519                .one(&*tx)
 520                .await?
 521                .ok_or_else(|| anyhow!("no such project"))?;
 522            if project.room_id != participant.room_id {
 523                return Err(anyhow!("no such project"))?;
 524            }
 525
 526            let mut collaborators = project
 527                .find_related(project_collaborator::Entity)
 528                .all(&*tx)
 529                .await?;
 530            let replica_ids = collaborators
 531                .iter()
 532                .map(|c| c.replica_id)
 533                .collect::<HashSet<_>>();
 534            let mut replica_id = ReplicaId(1);
 535            while replica_ids.contains(&replica_id) {
 536                replica_id.0 += 1;
 537            }
 538            let new_collaborator = project_collaborator::ActiveModel {
 539                project_id: ActiveValue::set(project_id),
 540                connection_id: ActiveValue::set(connection.id as i32),
 541                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 542                user_id: ActiveValue::set(participant.user_id),
 543                replica_id: ActiveValue::set(replica_id),
 544                is_host: ActiveValue::set(false),
 545                ..Default::default()
 546            }
 547            .insert(&*tx)
 548            .await?;
 549            collaborators.push(new_collaborator);
 550
 551            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 552            let mut worktrees = db_worktrees
 553                .into_iter()
 554                .map(|db_worktree| {
 555                    (
 556                        db_worktree.id as u64,
 557                        Worktree {
 558                            id: db_worktree.id as u64,
 559                            abs_path: db_worktree.abs_path,
 560                            root_name: db_worktree.root_name,
 561                            visible: db_worktree.visible,
 562                            entries: Default::default(),
 563                            repository_entries: Default::default(),
 564                            diagnostic_summaries: Default::default(),
 565                            settings_files: Default::default(),
 566                            scan_id: db_worktree.scan_id as u64,
 567                            completed_scan_id: db_worktree.completed_scan_id as u64,
 568                        },
 569                    )
 570                })
 571                .collect::<BTreeMap<_, _>>();
 572
 573            // Populate worktree entries.
 574            {
 575                let mut db_entries = worktree_entry::Entity::find()
 576                    .filter(
 577                        Condition::all()
 578                            .add(worktree_entry::Column::ProjectId.eq(project_id))
 579                            .add(worktree_entry::Column::IsDeleted.eq(false)),
 580                    )
 581                    .stream(&*tx)
 582                    .await?;
 583                while let Some(db_entry) = db_entries.next().await {
 584                    let db_entry = db_entry?;
 585                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 586                        worktree.entries.push(proto::Entry {
 587                            id: db_entry.id as u64,
 588                            is_dir: db_entry.is_dir,
 589                            path: db_entry.path,
 590                            inode: db_entry.inode as u64,
 591                            mtime: Some(proto::Timestamp {
 592                                seconds: db_entry.mtime_seconds as u64,
 593                                nanos: db_entry.mtime_nanos as u32,
 594                            }),
 595                            is_symlink: db_entry.is_symlink,
 596                            is_ignored: db_entry.is_ignored,
 597                            is_external: db_entry.is_external,
 598                            git_status: db_entry.git_status.map(|status| status as i32),
 599                        });
 600                    }
 601                }
 602            }
 603
 604            // Populate repository entries.
 605            {
 606                let mut db_repository_entries = worktree_repository::Entity::find()
 607                    .filter(
 608                        Condition::all()
 609                            .add(worktree_repository::Column::ProjectId.eq(project_id))
 610                            .add(worktree_repository::Column::IsDeleted.eq(false)),
 611                    )
 612                    .stream(&*tx)
 613                    .await?;
 614                while let Some(db_repository_entry) = db_repository_entries.next().await {
 615                    let db_repository_entry = db_repository_entry?;
 616                    if let Some(worktree) =
 617                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
 618                    {
 619                        worktree.repository_entries.insert(
 620                            db_repository_entry.work_directory_id as u64,
 621                            proto::RepositoryEntry {
 622                                work_directory_id: db_repository_entry.work_directory_id as u64,
 623                                branch: db_repository_entry.branch,
 624                            },
 625                        );
 626                    }
 627                }
 628            }
 629
 630            // Populate worktree diagnostic summaries.
 631            {
 632                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 633                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
 634                    .stream(&*tx)
 635                    .await?;
 636                while let Some(db_summary) = db_summaries.next().await {
 637                    let db_summary = db_summary?;
 638                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 639                        worktree
 640                            .diagnostic_summaries
 641                            .push(proto::DiagnosticSummary {
 642                                path: db_summary.path,
 643                                language_server_id: db_summary.language_server_id as u64,
 644                                error_count: db_summary.error_count as u32,
 645                                warning_count: db_summary.warning_count as u32,
 646                            });
 647                    }
 648                }
 649            }
 650
 651            // Populate worktree settings files
 652            {
 653                let mut db_settings_files = worktree_settings_file::Entity::find()
 654                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 655                    .stream(&*tx)
 656                    .await?;
 657                while let Some(db_settings_file) = db_settings_files.next().await {
 658                    let db_settings_file = db_settings_file?;
 659                    if let Some(worktree) =
 660                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
 661                    {
 662                        worktree.settings_files.push(WorktreeSettingsFile {
 663                            path: db_settings_file.path,
 664                            content: db_settings_file.content,
 665                        });
 666                    }
 667                }
 668            }
 669
 670            // Populate language servers.
 671            let language_servers = project
 672                .find_related(language_server::Entity)
 673                .all(&*tx)
 674                .await?;
 675
 676            let project = Project {
 677                collaborators: collaborators
 678                    .into_iter()
 679                    .map(|collaborator| ProjectCollaborator {
 680                        connection_id: collaborator.connection(),
 681                        user_id: collaborator.user_id,
 682                        replica_id: collaborator.replica_id,
 683                        is_host: collaborator.is_host,
 684                    })
 685                    .collect(),
 686                worktrees,
 687                language_servers: language_servers
 688                    .into_iter()
 689                    .map(|language_server| proto::LanguageServer {
 690                        id: language_server.id as u64,
 691                        name: language_server.name,
 692                    })
 693                    .collect(),
 694            };
 695            Ok((project, replica_id as ReplicaId))
 696        })
 697        .await
 698    }
 699
 700    pub async fn leave_project(
 701        &self,
 702        project_id: ProjectId,
 703        connection: ConnectionId,
 704    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
 705        let room_id = self.room_id_for_project(project_id).await?;
 706        self.room_transaction(room_id, |tx| async move {
 707            let result = project_collaborator::Entity::delete_many()
 708                .filter(
 709                    Condition::all()
 710                        .add(project_collaborator::Column::ProjectId.eq(project_id))
 711                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
 712                        .add(
 713                            project_collaborator::Column::ConnectionServerId
 714                                .eq(connection.owner_id as i32),
 715                        ),
 716                )
 717                .exec(&*tx)
 718                .await?;
 719            if result.rows_affected == 0 {
 720                Err(anyhow!("not a collaborator on this project"))?;
 721            }
 722
 723            let project = project::Entity::find_by_id(project_id)
 724                .one(&*tx)
 725                .await?
 726                .ok_or_else(|| anyhow!("no such project"))?;
 727            let collaborators = project
 728                .find_related(project_collaborator::Entity)
 729                .all(&*tx)
 730                .await?;
 731            let connection_ids = collaborators
 732                .into_iter()
 733                .map(|collaborator| collaborator.connection())
 734                .collect();
 735
 736            follower::Entity::delete_many()
 737                .filter(
 738                    Condition::any()
 739                        .add(
 740                            Condition::all()
 741                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 742                                .add(
 743                                    follower::Column::LeaderConnectionServerId
 744                                        .eq(connection.owner_id),
 745                                )
 746                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
 747                        )
 748                        .add(
 749                            Condition::all()
 750                                .add(follower::Column::ProjectId.eq(Some(project_id)))
 751                                .add(
 752                                    follower::Column::FollowerConnectionServerId
 753                                        .eq(connection.owner_id),
 754                                )
 755                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
 756                        ),
 757                )
 758                .exec(&*tx)
 759                .await?;
 760
 761            let room = self.get_room(project.room_id, &tx).await?;
 762            let left_project = LeftProject {
 763                id: project_id,
 764                host_user_id: project.host_user_id,
 765                host_connection_id: project.host_connection()?,
 766                connection_ids,
 767            };
 768            Ok((room, left_project))
 769        })
 770        .await
 771    }
 772
 773    pub async fn project_collaborators(
 774        &self,
 775        project_id: ProjectId,
 776        connection_id: ConnectionId,
 777    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
 778        let room_id = self.room_id_for_project(project_id).await?;
 779        self.room_transaction(room_id, |tx| async move {
 780            let collaborators = project_collaborator::Entity::find()
 781                .filter(project_collaborator::Column::ProjectId.eq(project_id))
 782                .all(&*tx)
 783                .await?
 784                .into_iter()
 785                .map(|collaborator| ProjectCollaborator {
 786                    connection_id: collaborator.connection(),
 787                    user_id: collaborator.user_id,
 788                    replica_id: collaborator.replica_id,
 789                    is_host: collaborator.is_host,
 790                })
 791                .collect::<Vec<_>>();
 792
 793            if collaborators
 794                .iter()
 795                .any(|collaborator| collaborator.connection_id == connection_id)
 796            {
 797                Ok(collaborators)
 798            } else {
 799                Err(anyhow!("no such project"))?
 800            }
 801        })
 802        .await
 803    }
 804
 805    pub async fn project_connection_ids(
 806        &self,
 807        project_id: ProjectId,
 808        connection_id: ConnectionId,
 809    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
 810        let room_id = self.room_id_for_project(project_id).await?;
 811        self.room_transaction(room_id, |tx| async move {
 812            let mut collaborators = project_collaborator::Entity::find()
 813                .filter(project_collaborator::Column::ProjectId.eq(project_id))
 814                .stream(&*tx)
 815                .await?;
 816
 817            let mut connection_ids = HashSet::default();
 818            while let Some(collaborator) = collaborators.next().await {
 819                let collaborator = collaborator?;
 820                connection_ids.insert(collaborator.connection());
 821            }
 822
 823            if connection_ids.contains(&connection_id) {
 824                Ok(connection_ids)
 825            } else {
 826                Err(anyhow!("no such project"))?
 827            }
 828        })
 829        .await
 830    }
 831
 832    async fn project_guest_connection_ids(
 833        &self,
 834        project_id: ProjectId,
 835        tx: &DatabaseTransaction,
 836    ) -> Result<Vec<ConnectionId>> {
 837        let mut collaborators = project_collaborator::Entity::find()
 838            .filter(
 839                project_collaborator::Column::ProjectId
 840                    .eq(project_id)
 841                    .and(project_collaborator::Column::IsHost.eq(false)),
 842            )
 843            .stream(tx)
 844            .await?;
 845
 846        let mut guest_connection_ids = Vec::new();
 847        while let Some(collaborator) = collaborators.next().await {
 848            let collaborator = collaborator?;
 849            guest_connection_ids.push(collaborator.connection());
 850        }
 851        Ok(guest_connection_ids)
 852    }
 853
 854    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
 855        self.transaction(|tx| async move {
 856            let project = project::Entity::find_by_id(project_id)
 857                .one(&*tx)
 858                .await?
 859                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
 860            Ok(project.room_id)
 861        })
 862        .await
 863    }
 864
 865    pub async fn check_can_follow(
 866        &self,
 867        room_id: RoomId,
 868        project_id: Option<ProjectId>,
 869        leader_id: ConnectionId,
 870        follower_id: ConnectionId,
 871    ) -> Result<()> {
 872        let mut found_leader = false;
 873        let mut found_follower = false;
 874        self.transaction(|tx| async move {
 875            if let Some(project_id) = project_id {
 876                let mut rows = project_collaborator::Entity::find()
 877                    .filter(project_collaborator::Column::ProjectId.eq(project_id))
 878                    .stream(&*tx)
 879                    .await?;
 880                while let Some(row) = rows.next().await {
 881                    let row = row?;
 882                    let connection = row.connection();
 883                    if connection == leader_id {
 884                        found_leader = true;
 885                    } else if connection == follower_id {
 886                        found_follower = true;
 887                    }
 888                }
 889            } else {
 890                let mut rows = room_participant::Entity::find()
 891                    .filter(room_participant::Column::RoomId.eq(room_id))
 892                    .stream(&*tx)
 893                    .await?;
 894                while let Some(row) = rows.next().await {
 895                    let row = row?;
 896                    if let Some(connection) = row.answering_connection() {
 897                        if connection == leader_id {
 898                            found_leader = true;
 899                        } else if connection == follower_id {
 900                            found_follower = true;
 901                        }
 902                    }
 903                }
 904            }
 905
 906            if !found_leader || !found_follower {
 907                Err(anyhow!("not a room participant"))?;
 908            }
 909
 910            Ok(())
 911        })
 912        .await
 913    }
 914
 915    pub async fn check_can_unfollow(
 916        &self,
 917        room_id: RoomId,
 918        project_id: Option<ProjectId>,
 919        leader_id: ConnectionId,
 920        follower_id: ConnectionId,
 921    ) -> Result<()> {
 922        self.transaction(|tx| async move {
 923            follower::Entity::find()
 924                .filter(
 925                    Condition::all()
 926                        .add(follower::Column::RoomId.eq(room_id))
 927                        .add(follower::Column::ProjectId.eq(project_id))
 928                        .add(follower::Column::LeaderConnectionId.eq(leader_id.id as i32))
 929                        .add(follower::Column::FollowerConnectionId.eq(follower_id.id as i32))
 930                        .add(
 931                            follower::Column::LeaderConnectionServerId
 932                                .eq(leader_id.owner_id as i32),
 933                        )
 934                        .add(
 935                            follower::Column::FollowerConnectionServerId
 936                                .eq(follower_id.owner_id as i32),
 937                        ),
 938                )
 939                .one(&*tx)
 940                .await?
 941                .ok_or_else(|| anyhow!("not a follower"))?;
 942            Ok(())
 943        })
 944        .await
 945    }
 946
 947    pub async fn follow(
 948        &self,
 949        room_id: RoomId,
 950        project_id: Option<ProjectId>,
 951        leader_connection: ConnectionId,
 952        follower_connection: ConnectionId,
 953    ) -> Result<RoomGuard<proto::Room>> {
 954        self.room_transaction(room_id, |tx| async move {
 955            follower::ActiveModel {
 956                room_id: ActiveValue::set(room_id),
 957                project_id: ActiveValue::set(project_id),
 958                leader_connection_server_id: ActiveValue::set(ServerId(
 959                    leader_connection.owner_id as i32,
 960                )),
 961                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
 962                follower_connection_server_id: ActiveValue::set(ServerId(
 963                    follower_connection.owner_id as i32,
 964                )),
 965                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
 966                ..Default::default()
 967            }
 968            .insert(&*tx)
 969            .await?;
 970
 971            let room = self.get_room(room_id, &*tx).await?;
 972            Ok(room)
 973        })
 974        .await
 975    }
 976
 977    pub async fn unfollow(
 978        &self,
 979        room_id: RoomId,
 980        project_id: Option<ProjectId>,
 981        leader_connection: ConnectionId,
 982        follower_connection: ConnectionId,
 983    ) -> Result<RoomGuard<proto::Room>> {
 984        self.room_transaction(room_id, |tx| async move {
 985            follower::Entity::delete_many()
 986                .filter(
 987                    Condition::all()
 988                        .add(follower::Column::RoomId.eq(room_id))
 989                        .add(follower::Column::ProjectId.eq(project_id))
 990                        .add(
 991                            follower::Column::LeaderConnectionServerId
 992                                .eq(leader_connection.owner_id),
 993                        )
 994                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
 995                        .add(
 996                            follower::Column::FollowerConnectionServerId
 997                                .eq(follower_connection.owner_id),
 998                        )
 999                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1000                )
1001                .exec(&*tx)
1002                .await?;
1003
1004            let room = self.get_room(room_id, &*tx).await?;
1005            Ok(room)
1006        })
1007        .await
1008    }
1009}