projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                ..Default::default()
 102            }
 103            .insert(&*tx)
 104            .await?;
 105
 106            let room = self.get_room(room_id, &tx).await?;
 107            Ok((project.id, room))
 108        })
 109        .await
 110    }
 111
 112    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 113        self.weak_transaction(|tx| async move {
 114            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 115            Ok(())
 116        })
 117        .await
 118    }
 119
 120    /// Unshares the given project.
 121    pub async fn unshare_project(
 122        &self,
 123        project_id: ProjectId,
 124        connection: ConnectionId,
 125    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 126        self.project_transaction(project_id, |tx| async move {
 127            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 128            let project = project::Entity::find_by_id(project_id)
 129                .one(&*tx)
 130                .await?
 131                .ok_or_else(|| anyhow!("project not found"))?;
 132            let room = if let Some(room_id) = project.room_id {
 133                Some(self.get_room(room_id, &tx).await?)
 134            } else {
 135                None
 136            };
 137            if project.host_connection()? == connection {
 138                return Ok((true, room, guest_connection_ids));
 139            }
 140            Err(anyhow!("cannot unshare a project hosted by another user"))?
 141        })
 142        .await
 143    }
 144
 145    /// Updates the worktrees associated with the given project.
 146    pub async fn update_project(
 147        &self,
 148        project_id: ProjectId,
 149        connection: ConnectionId,
 150        worktrees: &[proto::WorktreeMetadata],
 151    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 152        self.project_transaction(project_id, |tx| async move {
 153            let project = project::Entity::find_by_id(project_id)
 154                .filter(
 155                    Condition::all()
 156                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 157                        .add(
 158                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 159                        ),
 160                )
 161                .one(&*tx)
 162                .await?
 163                .ok_or_else(|| anyhow!("no such project"))?;
 164
 165            self.update_project_worktrees(project.id, worktrees, &tx)
 166                .await?;
 167
 168            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 169
 170            let room = if let Some(room_id) = project.room_id {
 171                Some(self.get_room(room_id, &tx).await?)
 172            } else {
 173                None
 174            };
 175
 176            Ok((room, guest_connection_ids))
 177        })
 178        .await
 179    }
 180
 181    pub(in crate::db) async fn update_project_worktrees(
 182        &self,
 183        project_id: ProjectId,
 184        worktrees: &[proto::WorktreeMetadata],
 185        tx: &DatabaseTransaction,
 186    ) -> Result<()> {
 187        if !worktrees.is_empty() {
 188            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 189                id: ActiveValue::set(worktree.id as i64),
 190                project_id: ActiveValue::set(project_id),
 191                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 192                root_name: ActiveValue::set(worktree.root_name.clone()),
 193                visible: ActiveValue::set(worktree.visible),
 194                scan_id: ActiveValue::set(0),
 195                completed_scan_id: ActiveValue::set(0),
 196            }))
 197            .on_conflict(
 198                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 199                    .update_column(worktree::Column::RootName)
 200                    .to_owned(),
 201            )
 202            .exec(tx)
 203            .await?;
 204        }
 205
 206        worktree::Entity::delete_many()
 207            .filter(worktree::Column::ProjectId.eq(project_id).and(
 208                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 209            ))
 210            .exec(tx)
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    pub async fn update_worktree(
 217        &self,
 218        update: &proto::UpdateWorktree,
 219        connection: ConnectionId,
 220    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 221        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 222            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 223        {
 224            return Err(anyhow!(
 225                "invalid worktree update. removed entries: {}, updated entries: {}",
 226                update.removed_entries.len(),
 227                update.updated_entries.len()
 228            ))?;
 229        }
 230
 231        let project_id = ProjectId::from_proto(update.project_id);
 232        let worktree_id = update.worktree_id as i64;
 233        self.project_transaction(project_id, |tx| async move {
 234            // Ensure the update comes from the host.
 235            let _project = project::Entity::find_by_id(project_id)
 236                .filter(
 237                    Condition::all()
 238                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 239                        .add(
 240                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 241                        ),
 242                )
 243                .one(&*tx)
 244                .await?
 245                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 246
 247            // Update metadata.
 248            worktree::Entity::update(worktree::ActiveModel {
 249                id: ActiveValue::set(worktree_id),
 250                project_id: ActiveValue::set(project_id),
 251                root_name: ActiveValue::set(update.root_name.clone()),
 252                scan_id: ActiveValue::set(update.scan_id as i64),
 253                completed_scan_id: if update.is_last_update {
 254                    ActiveValue::set(update.scan_id as i64)
 255                } else {
 256                    ActiveValue::default()
 257                },
 258                abs_path: ActiveValue::set(update.abs_path.clone()),
 259                ..Default::default()
 260            })
 261            .exec(&*tx)
 262            .await?;
 263
 264            if !update.updated_entries.is_empty() {
 265                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 266                    let mtime = entry.mtime.clone().unwrap_or_default();
 267                    worktree_entry::ActiveModel {
 268                        project_id: ActiveValue::set(project_id),
 269                        worktree_id: ActiveValue::set(worktree_id),
 270                        id: ActiveValue::set(entry.id as i64),
 271                        is_dir: ActiveValue::set(entry.is_dir),
 272                        path: ActiveValue::set(entry.path.clone()),
 273                        inode: ActiveValue::set(entry.inode as i64),
 274                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 275                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 276                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 277                        is_ignored: ActiveValue::set(entry.is_ignored),
 278                        git_status: ActiveValue::set(None),
 279                        is_external: ActiveValue::set(entry.is_external),
 280                        is_deleted: ActiveValue::set(false),
 281                        scan_id: ActiveValue::set(update.scan_id as i64),
 282                        is_fifo: ActiveValue::set(entry.is_fifo),
 283                    }
 284                }))
 285                .on_conflict(
 286                    OnConflict::columns([
 287                        worktree_entry::Column::ProjectId,
 288                        worktree_entry::Column::WorktreeId,
 289                        worktree_entry::Column::Id,
 290                    ])
 291                    .update_columns([
 292                        worktree_entry::Column::IsDir,
 293                        worktree_entry::Column::Path,
 294                        worktree_entry::Column::Inode,
 295                        worktree_entry::Column::MtimeSeconds,
 296                        worktree_entry::Column::MtimeNanos,
 297                        worktree_entry::Column::CanonicalPath,
 298                        worktree_entry::Column::IsIgnored,
 299                        worktree_entry::Column::ScanId,
 300                    ])
 301                    .to_owned(),
 302                )
 303                .exec(&*tx)
 304                .await?;
 305            }
 306
 307            if !update.removed_entries.is_empty() {
 308                worktree_entry::Entity::update_many()
 309                    .filter(
 310                        worktree_entry::Column::ProjectId
 311                            .eq(project_id)
 312                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 313                            .and(
 314                                worktree_entry::Column::Id
 315                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 316                            ),
 317                    )
 318                    .set(worktree_entry::ActiveModel {
 319                        is_deleted: ActiveValue::Set(true),
 320                        scan_id: ActiveValue::Set(update.scan_id as i64),
 321                        ..Default::default()
 322                    })
 323                    .exec(&*tx)
 324                    .await?;
 325            }
 326
 327            // Backward-compatibility for old Zed clients.
 328            //
 329            // Remove this block when Zed 1.80 stable has been out for a week.
 330            {
 331                if !update.updated_repositories.is_empty() {
 332                    project_repository::Entity::insert_many(
 333                        update.updated_repositories.iter().map(|repository| {
 334                            project_repository::ActiveModel {
 335                                project_id: ActiveValue::set(project_id),
 336                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 337                                id: ActiveValue::set(repository.repository_id as i64),
 338                                scan_id: ActiveValue::set(update.scan_id as i64),
 339                                is_deleted: ActiveValue::set(false),
 340                                branch_summary: ActiveValue::Set(
 341                                    repository
 342                                        .branch_summary
 343                                        .as_ref()
 344                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 345                                ),
 346                                current_merge_conflicts: ActiveValue::Set(Some(
 347                                    serde_json::to_string(&repository.current_merge_conflicts)
 348                                        .unwrap(),
 349                                )),
 350
 351                                // Old clients do not use abs path, entry ids or head_commit_details.
 352                                abs_path: ActiveValue::set(String::new()),
 353                                entry_ids: ActiveValue::set("[]".into()),
 354                                head_commit_details: ActiveValue::set(None),
 355                            }
 356                        }),
 357                    )
 358                    .on_conflict(
 359                        OnConflict::columns([
 360                            project_repository::Column::ProjectId,
 361                            project_repository::Column::Id,
 362                        ])
 363                        .update_columns([
 364                            project_repository::Column::ScanId,
 365                            project_repository::Column::BranchSummary,
 366                            project_repository::Column::CurrentMergeConflicts,
 367                        ])
 368                        .to_owned(),
 369                    )
 370                    .exec(&*tx)
 371                    .await?;
 372
 373                    let has_any_statuses = update
 374                        .updated_repositories
 375                        .iter()
 376                        .any(|repository| !repository.updated_statuses.is_empty());
 377
 378                    if has_any_statuses {
 379                        project_repository_statuses::Entity::insert_many(
 380                            update.updated_repositories.iter().flat_map(
 381                                |repository: &proto::RepositoryEntry| {
 382                                    repository.updated_statuses.iter().map(|status_entry| {
 383                                        let (repo_path, status_kind, first_status, second_status) =
 384                                            proto_status_to_db(status_entry.clone());
 385                                        project_repository_statuses::ActiveModel {
 386                                            project_id: ActiveValue::set(project_id),
 387                                            repository_id: ActiveValue::set(
 388                                                repository.repository_id as i64,
 389                                            ),
 390                                            scan_id: ActiveValue::set(update.scan_id as i64),
 391                                            is_deleted: ActiveValue::set(false),
 392                                            repo_path: ActiveValue::set(repo_path),
 393                                            status: ActiveValue::set(0),
 394                                            status_kind: ActiveValue::set(status_kind),
 395                                            first_status: ActiveValue::set(first_status),
 396                                            second_status: ActiveValue::set(second_status),
 397                                        }
 398                                    })
 399                                },
 400                            ),
 401                        )
 402                        .on_conflict(
 403                            OnConflict::columns([
 404                                project_repository_statuses::Column::ProjectId,
 405                                project_repository_statuses::Column::RepositoryId,
 406                                project_repository_statuses::Column::RepoPath,
 407                            ])
 408                            .update_columns([
 409                                project_repository_statuses::Column::ScanId,
 410                                project_repository_statuses::Column::StatusKind,
 411                                project_repository_statuses::Column::FirstStatus,
 412                                project_repository_statuses::Column::SecondStatus,
 413                            ])
 414                            .to_owned(),
 415                        )
 416                        .exec(&*tx)
 417                        .await?;
 418                    }
 419
 420                    for repo in &update.updated_repositories {
 421                        if !repo.removed_statuses.is_empty() {
 422                            project_repository_statuses::Entity::update_many()
 423                                .filter(
 424                                    project_repository_statuses::Column::ProjectId
 425                                        .eq(project_id)
 426                                        .and(
 427                                            project_repository_statuses::Column::RepositoryId
 428                                                .eq(repo.repository_id),
 429                                        )
 430                                        .and(
 431                                            project_repository_statuses::Column::RepoPath
 432                                                .is_in(repo.removed_statuses.iter()),
 433                                        ),
 434                                )
 435                                .set(project_repository_statuses::ActiveModel {
 436                                    is_deleted: ActiveValue::Set(true),
 437                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 438                                    ..Default::default()
 439                                })
 440                                .exec(&*tx)
 441                                .await?;
 442                        }
 443                    }
 444                }
 445
 446                if !update.removed_repositories.is_empty() {
 447                    project_repository::Entity::update_many()
 448                        .filter(
 449                            project_repository::Column::ProjectId
 450                                .eq(project_id)
 451                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 452                                .and(project_repository::Column::Id.is_in(
 453                                    update.removed_repositories.iter().map(|id| *id as i64),
 454                                )),
 455                        )
 456                        .set(project_repository::ActiveModel {
 457                            is_deleted: ActiveValue::Set(true),
 458                            scan_id: ActiveValue::Set(update.scan_id as i64),
 459                            ..Default::default()
 460                        })
 461                        .exec(&*tx)
 462                        .await?;
 463                }
 464            }
 465
 466            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 467            Ok(connection_ids)
 468        })
 469        .await
 470    }
 471
 472    pub async fn update_repository(
 473        &self,
 474        update: &proto::UpdateRepository,
 475        _connection: ConnectionId,
 476    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 477        let project_id = ProjectId::from_proto(update.project_id);
 478        let repository_id = update.id as i64;
 479        self.project_transaction(project_id, |tx| async move {
 480            project_repository::Entity::insert(project_repository::ActiveModel {
 481                project_id: ActiveValue::set(project_id),
 482                id: ActiveValue::set(repository_id),
 483                legacy_worktree_id: ActiveValue::set(None),
 484                abs_path: ActiveValue::set(update.abs_path.clone()),
 485                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 486                scan_id: ActiveValue::set(update.scan_id as i64),
 487                is_deleted: ActiveValue::set(false),
 488                branch_summary: ActiveValue::Set(
 489                    update
 490                        .branch_summary
 491                        .as_ref()
 492                        .map(|summary| serde_json::to_string(summary).unwrap()),
 493                ),
 494                head_commit_details: ActiveValue::Set(
 495                    update
 496                        .head_commit_details
 497                        .as_ref()
 498                        .map(|details| serde_json::to_string(details).unwrap()),
 499                ),
 500                current_merge_conflicts: ActiveValue::Set(Some(
 501                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 502                )),
 503            })
 504            .on_conflict(
 505                OnConflict::columns([
 506                    project_repository::Column::ProjectId,
 507                    project_repository::Column::Id,
 508                ])
 509                .update_columns([
 510                    project_repository::Column::ScanId,
 511                    project_repository::Column::BranchSummary,
 512                    project_repository::Column::EntryIds,
 513                    project_repository::Column::AbsPath,
 514                    project_repository::Column::CurrentMergeConflicts,
 515                    project_repository::Column::HeadCommitDetails,
 516                ])
 517                .to_owned(),
 518            )
 519            .exec(&*tx)
 520            .await?;
 521
 522            let has_any_statuses = !update.updated_statuses.is_empty();
 523
 524            if has_any_statuses {
 525                project_repository_statuses::Entity::insert_many(
 526                    update.updated_statuses.iter().map(|status_entry| {
 527                        let (repo_path, status_kind, first_status, second_status) =
 528                            proto_status_to_db(status_entry.clone());
 529                        project_repository_statuses::ActiveModel {
 530                            project_id: ActiveValue::set(project_id),
 531                            repository_id: ActiveValue::set(repository_id),
 532                            scan_id: ActiveValue::set(update.scan_id as i64),
 533                            is_deleted: ActiveValue::set(false),
 534                            repo_path: ActiveValue::set(repo_path),
 535                            status: ActiveValue::set(0),
 536                            status_kind: ActiveValue::set(status_kind),
 537                            first_status: ActiveValue::set(first_status),
 538                            second_status: ActiveValue::set(second_status),
 539                        }
 540                    }),
 541                )
 542                .on_conflict(
 543                    OnConflict::columns([
 544                        project_repository_statuses::Column::ProjectId,
 545                        project_repository_statuses::Column::RepositoryId,
 546                        project_repository_statuses::Column::RepoPath,
 547                    ])
 548                    .update_columns([
 549                        project_repository_statuses::Column::ScanId,
 550                        project_repository_statuses::Column::StatusKind,
 551                        project_repository_statuses::Column::FirstStatus,
 552                        project_repository_statuses::Column::SecondStatus,
 553                    ])
 554                    .to_owned(),
 555                )
 556                .exec(&*tx)
 557                .await?;
 558            }
 559
 560            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 561
 562            if has_any_removed_statuses {
 563                project_repository_statuses::Entity::update_many()
 564                    .filter(
 565                        project_repository_statuses::Column::ProjectId
 566                            .eq(project_id)
 567                            .and(
 568                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 569                            )
 570                            .and(
 571                                project_repository_statuses::Column::RepoPath
 572                                    .is_in(update.removed_statuses.iter()),
 573                            ),
 574                    )
 575                    .set(project_repository_statuses::ActiveModel {
 576                        is_deleted: ActiveValue::Set(true),
 577                        scan_id: ActiveValue::Set(update.scan_id as i64),
 578                        ..Default::default()
 579                    })
 580                    .exec(&*tx)
 581                    .await?;
 582            }
 583
 584            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 585            Ok(connection_ids)
 586        })
 587        .await
 588    }
 589
 590    pub async fn remove_repository(
 591        &self,
 592        remove: &proto::RemoveRepository,
 593        _connection: ConnectionId,
 594    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 595        let project_id = ProjectId::from_proto(remove.project_id);
 596        let repository_id = remove.id as i64;
 597        self.project_transaction(project_id, |tx| async move {
 598            project_repository::Entity::update_many()
 599                .filter(
 600                    project_repository::Column::ProjectId
 601                        .eq(project_id)
 602                        .and(project_repository::Column::Id.eq(repository_id)),
 603                )
 604                .set(project_repository::ActiveModel {
 605                    is_deleted: ActiveValue::Set(true),
 606                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 607                    ..Default::default()
 608                })
 609                .exec(&*tx)
 610                .await?;
 611
 612            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 613            Ok(connection_ids)
 614        })
 615        .await
 616    }
 617
 618    /// Updates the diagnostic summary for the given connection.
 619    pub async fn update_diagnostic_summary(
 620        &self,
 621        update: &proto::UpdateDiagnosticSummary,
 622        connection: ConnectionId,
 623    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 624        let project_id = ProjectId::from_proto(update.project_id);
 625        let worktree_id = update.worktree_id as i64;
 626        self.project_transaction(project_id, |tx| async move {
 627            let summary = update
 628                .summary
 629                .as_ref()
 630                .ok_or_else(|| anyhow!("invalid summary"))?;
 631
 632            // Ensure the update comes from the host.
 633            let project = project::Entity::find_by_id(project_id)
 634                .one(&*tx)
 635                .await?
 636                .ok_or_else(|| anyhow!("no such project"))?;
 637            if project.host_connection()? != connection {
 638                return Err(anyhow!("can't update a project hosted by someone else"))?;
 639            }
 640
 641            // Update summary.
 642            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 643                project_id: ActiveValue::set(project_id),
 644                worktree_id: ActiveValue::set(worktree_id),
 645                path: ActiveValue::set(summary.path.clone()),
 646                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 647                error_count: ActiveValue::set(summary.error_count as i32),
 648                warning_count: ActiveValue::set(summary.warning_count as i32),
 649            })
 650            .on_conflict(
 651                OnConflict::columns([
 652                    worktree_diagnostic_summary::Column::ProjectId,
 653                    worktree_diagnostic_summary::Column::WorktreeId,
 654                    worktree_diagnostic_summary::Column::Path,
 655                ])
 656                .update_columns([
 657                    worktree_diagnostic_summary::Column::LanguageServerId,
 658                    worktree_diagnostic_summary::Column::ErrorCount,
 659                    worktree_diagnostic_summary::Column::WarningCount,
 660                ])
 661                .to_owned(),
 662            )
 663            .exec(&*tx)
 664            .await?;
 665
 666            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 667            Ok(connection_ids)
 668        })
 669        .await
 670    }
 671
 672    /// Starts the language server for the given connection.
 673    pub async fn start_language_server(
 674        &self,
 675        update: &proto::StartLanguageServer,
 676        connection: ConnectionId,
 677    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 678        let project_id = ProjectId::from_proto(update.project_id);
 679        self.project_transaction(project_id, |tx| async move {
 680            let server = update
 681                .server
 682                .as_ref()
 683                .ok_or_else(|| anyhow!("invalid language server"))?;
 684
 685            // Ensure the update comes from the host.
 686            let project = project::Entity::find_by_id(project_id)
 687                .one(&*tx)
 688                .await?
 689                .ok_or_else(|| anyhow!("no such project"))?;
 690            if project.host_connection()? != connection {
 691                return Err(anyhow!("can't update a project hosted by someone else"))?;
 692            }
 693
 694            // Add the newly-started language server.
 695            language_server::Entity::insert(language_server::ActiveModel {
 696                project_id: ActiveValue::set(project_id),
 697                id: ActiveValue::set(server.id as i64),
 698                name: ActiveValue::set(server.name.clone()),
 699            })
 700            .on_conflict(
 701                OnConflict::columns([
 702                    language_server::Column::ProjectId,
 703                    language_server::Column::Id,
 704                ])
 705                .update_column(language_server::Column::Name)
 706                .to_owned(),
 707            )
 708            .exec(&*tx)
 709            .await?;
 710
 711            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 712            Ok(connection_ids)
 713        })
 714        .await
 715    }
 716
 717    /// Updates the worktree settings for the given connection.
 718    pub async fn update_worktree_settings(
 719        &self,
 720        update: &proto::UpdateWorktreeSettings,
 721        connection: ConnectionId,
 722    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 723        let project_id = ProjectId::from_proto(update.project_id);
 724        let kind = match update.kind {
 725            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 726                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 727            None => proto::LocalSettingsKind::Settings,
 728        };
 729        let kind = LocalSettingsKind::from_proto(kind);
 730        self.project_transaction(project_id, |tx| async move {
 731            // Ensure the update comes from the host.
 732            let project = project::Entity::find_by_id(project_id)
 733                .one(&*tx)
 734                .await?
 735                .ok_or_else(|| anyhow!("no such project"))?;
 736            if project.host_connection()? != connection {
 737                return Err(anyhow!("can't update a project hosted by someone else"))?;
 738            }
 739
 740            if let Some(content) = &update.content {
 741                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 742                    project_id: ActiveValue::Set(project_id),
 743                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 744                    path: ActiveValue::Set(update.path.clone()),
 745                    content: ActiveValue::Set(content.clone()),
 746                    kind: ActiveValue::Set(kind),
 747                })
 748                .on_conflict(
 749                    OnConflict::columns([
 750                        worktree_settings_file::Column::ProjectId,
 751                        worktree_settings_file::Column::WorktreeId,
 752                        worktree_settings_file::Column::Path,
 753                    ])
 754                    .update_column(worktree_settings_file::Column::Content)
 755                    .to_owned(),
 756                )
 757                .exec(&*tx)
 758                .await?;
 759            } else {
 760                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 761                    project_id: ActiveValue::Set(project_id),
 762                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 763                    path: ActiveValue::Set(update.path.clone()),
 764                    ..Default::default()
 765                })
 766                .exec(&*tx)
 767                .await?;
 768            }
 769
 770            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 771            Ok(connection_ids)
 772        })
 773        .await
 774    }
 775
 776    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 777        self.transaction(|tx| async move {
 778            Ok(project::Entity::find_by_id(id)
 779                .one(&*tx)
 780                .await?
 781                .ok_or_else(|| anyhow!("no such project"))?)
 782        })
 783        .await
 784    }
 785
 786    /// Adds the given connection to the specified project
 787    /// in the current room.
 788    pub async fn join_project(
 789        &self,
 790        project_id: ProjectId,
 791        connection: ConnectionId,
 792        user_id: UserId,
 793    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 794        self.project_transaction(project_id, |tx| async move {
 795            let (project, role) = self
 796                .access_project(project_id, connection, Capability::ReadOnly, &tx)
 797                .await?;
 798            self.join_project_internal(project, user_id, connection, role, &tx)
 799                .await
 800        })
 801        .await
 802    }
 803
 804    async fn join_project_internal(
 805        &self,
 806        project: project::Model,
 807        user_id: UserId,
 808        connection: ConnectionId,
 809        role: ChannelRole,
 810        tx: &DatabaseTransaction,
 811    ) -> Result<(Project, ReplicaId)> {
 812        let mut collaborators = project
 813            .find_related(project_collaborator::Entity)
 814            .all(tx)
 815            .await?;
 816        let replica_ids = collaborators
 817            .iter()
 818            .map(|c| c.replica_id)
 819            .collect::<HashSet<_>>();
 820        let mut replica_id = ReplicaId(1);
 821        while replica_ids.contains(&replica_id) {
 822            replica_id.0 += 1;
 823        }
 824        let new_collaborator = project_collaborator::ActiveModel {
 825            project_id: ActiveValue::set(project.id),
 826            connection_id: ActiveValue::set(connection.id as i32),
 827            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 828            user_id: ActiveValue::set(user_id),
 829            replica_id: ActiveValue::set(replica_id),
 830            is_host: ActiveValue::set(false),
 831            ..Default::default()
 832        }
 833        .insert(tx)
 834        .await?;
 835        collaborators.push(new_collaborator);
 836
 837        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 838        let mut worktrees = db_worktrees
 839            .into_iter()
 840            .map(|db_worktree| {
 841                (
 842                    db_worktree.id as u64,
 843                    Worktree {
 844                        id: db_worktree.id as u64,
 845                        abs_path: db_worktree.abs_path,
 846                        root_name: db_worktree.root_name,
 847                        visible: db_worktree.visible,
 848                        entries: Default::default(),
 849                        diagnostic_summaries: Default::default(),
 850                        settings_files: Default::default(),
 851                        scan_id: db_worktree.scan_id as u64,
 852                        completed_scan_id: db_worktree.completed_scan_id as u64,
 853                        legacy_repository_entries: Default::default(),
 854                    },
 855                )
 856            })
 857            .collect::<BTreeMap<_, _>>();
 858
 859        // Populate worktree entries.
 860        {
 861            let mut db_entries = worktree_entry::Entity::find()
 862                .filter(
 863                    Condition::all()
 864                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 865                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 866                )
 867                .stream(tx)
 868                .await?;
 869            while let Some(db_entry) = db_entries.next().await {
 870                let db_entry = db_entry?;
 871                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 872                    worktree.entries.push(proto::Entry {
 873                        id: db_entry.id as u64,
 874                        is_dir: db_entry.is_dir,
 875                        path: db_entry.path,
 876                        inode: db_entry.inode as u64,
 877                        mtime: Some(proto::Timestamp {
 878                            seconds: db_entry.mtime_seconds as u64,
 879                            nanos: db_entry.mtime_nanos as u32,
 880                        }),
 881                        canonical_path: db_entry.canonical_path,
 882                        is_ignored: db_entry.is_ignored,
 883                        is_external: db_entry.is_external,
 884                        // This is only used in the summarization backlog, so if it's None,
 885                        // that just means we won't be able to detect when to resummarize
 886                        // based on total number of backlogged bytes - instead, we'd go
 887                        // on number of files only. That shouldn't be a huge deal in practice.
 888                        size: None,
 889                        is_fifo: db_entry.is_fifo,
 890                    });
 891                }
 892            }
 893        }
 894
 895        // Populate repository entries.
 896        let mut repositories = Vec::new();
 897        {
 898            let db_repository_entries = project_repository::Entity::find()
 899                .filter(
 900                    Condition::all()
 901                        .add(project_repository::Column::ProjectId.eq(project.id))
 902                        .add(project_repository::Column::IsDeleted.eq(false)),
 903                )
 904                .all(tx)
 905                .await?;
 906            for db_repository_entry in db_repository_entries {
 907                let mut repository_statuses = project_repository_statuses::Entity::find()
 908                    .filter(
 909                        Condition::all()
 910                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 911                            .add(
 912                                project_repository_statuses::Column::RepositoryId
 913                                    .eq(db_repository_entry.id),
 914                            )
 915                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 916                    )
 917                    .stream(tx)
 918                    .await?;
 919                let mut updated_statuses = Vec::new();
 920                while let Some(status_entry) = repository_statuses.next().await {
 921                    let status_entry = status_entry?;
 922                    updated_statuses.push(db_status_to_proto(status_entry)?);
 923                }
 924
 925                let current_merge_conflicts = db_repository_entry
 926                    .current_merge_conflicts
 927                    .as_ref()
 928                    .map(|conflicts| serde_json::from_str(&conflicts))
 929                    .transpose()?
 930                    .unwrap_or_default();
 931
 932                let branch_summary = db_repository_entry
 933                    .branch_summary
 934                    .as_ref()
 935                    .map(|branch_summary| serde_json::from_str(&branch_summary))
 936                    .transpose()?
 937                    .unwrap_or_default();
 938
 939                let head_commit_details = db_repository_entry
 940                    .head_commit_details
 941                    .as_ref()
 942                    .map(|head_commit_details| serde_json::from_str(&head_commit_details))
 943                    .transpose()?
 944                    .unwrap_or_default();
 945
 946                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 947                    .context("failed to deserialize repository's entry ids")?;
 948
 949                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 950                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 951                        worktree.legacy_repository_entries.insert(
 952                            db_repository_entry.id as u64,
 953                            proto::RepositoryEntry {
 954                                repository_id: db_repository_entry.id as u64,
 955                                updated_statuses,
 956                                removed_statuses: Vec::new(),
 957                                current_merge_conflicts,
 958                                branch_summary,
 959                            },
 960                        );
 961                    }
 962                } else {
 963                    repositories.push(proto::UpdateRepository {
 964                        project_id: db_repository_entry.project_id.0 as u64,
 965                        id: db_repository_entry.id as u64,
 966                        abs_path: db_repository_entry.abs_path,
 967                        entry_ids,
 968                        updated_statuses,
 969                        removed_statuses: Vec::new(),
 970                        current_merge_conflicts,
 971                        branch_summary,
 972                        head_commit_details,
 973                        scan_id: db_repository_entry.scan_id as u64,
 974                        is_last_update: true,
 975                    });
 976                }
 977            }
 978        }
 979
 980        // Populate worktree diagnostic summaries.
 981        {
 982            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 983                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 984                .stream(tx)
 985                .await?;
 986            while let Some(db_summary) = db_summaries.next().await {
 987                let db_summary = db_summary?;
 988                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 989                    worktree
 990                        .diagnostic_summaries
 991                        .push(proto::DiagnosticSummary {
 992                            path: db_summary.path,
 993                            language_server_id: db_summary.language_server_id as u64,
 994                            error_count: db_summary.error_count as u32,
 995                            warning_count: db_summary.warning_count as u32,
 996                        });
 997                }
 998            }
 999        }
1000
1001        // Populate worktree settings files
1002        {
1003            let mut db_settings_files = worktree_settings_file::Entity::find()
1004                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1005                .stream(tx)
1006                .await?;
1007            while let Some(db_settings_file) = db_settings_files.next().await {
1008                let db_settings_file = db_settings_file?;
1009                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1010                    worktree.settings_files.push(WorktreeSettingsFile {
1011                        path: db_settings_file.path,
1012                        content: db_settings_file.content,
1013                        kind: db_settings_file.kind,
1014                    });
1015                }
1016            }
1017        }
1018
1019        // Populate language servers.
1020        let language_servers = project
1021            .find_related(language_server::Entity)
1022            .all(tx)
1023            .await?;
1024
1025        let project = Project {
1026            id: project.id,
1027            role,
1028            collaborators: collaborators
1029                .into_iter()
1030                .map(|collaborator| ProjectCollaborator {
1031                    connection_id: collaborator.connection(),
1032                    user_id: collaborator.user_id,
1033                    replica_id: collaborator.replica_id,
1034                    is_host: collaborator.is_host,
1035                })
1036                .collect(),
1037            worktrees,
1038            repositories,
1039            language_servers: language_servers
1040                .into_iter()
1041                .map(|language_server| proto::LanguageServer {
1042                    id: language_server.id as u64,
1043                    name: language_server.name,
1044                    worktree_id: None,
1045                })
1046                .collect(),
1047        };
1048        Ok((project, replica_id as ReplicaId))
1049    }
1050
1051    /// Removes the given connection from the specified project.
1052    pub async fn leave_project(
1053        &self,
1054        project_id: ProjectId,
1055        connection: ConnectionId,
1056    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1057        self.project_transaction(project_id, |tx| async move {
1058            let result = project_collaborator::Entity::delete_many()
1059                .filter(
1060                    Condition::all()
1061                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1062                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1063                        .add(
1064                            project_collaborator::Column::ConnectionServerId
1065                                .eq(connection.owner_id as i32),
1066                        ),
1067                )
1068                .exec(&*tx)
1069                .await?;
1070            if result.rows_affected == 0 {
1071                Err(anyhow!("not a collaborator on this project"))?;
1072            }
1073
1074            let project = project::Entity::find_by_id(project_id)
1075                .one(&*tx)
1076                .await?
1077                .ok_or_else(|| anyhow!("no such project"))?;
1078            let collaborators = project
1079                .find_related(project_collaborator::Entity)
1080                .all(&*tx)
1081                .await?;
1082            let connection_ids: Vec<ConnectionId> = collaborators
1083                .into_iter()
1084                .map(|collaborator| collaborator.connection())
1085                .collect();
1086
1087            follower::Entity::delete_many()
1088                .filter(
1089                    Condition::any()
1090                        .add(
1091                            Condition::all()
1092                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1093                                .add(
1094                                    follower::Column::LeaderConnectionServerId
1095                                        .eq(connection.owner_id),
1096                                )
1097                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1098                        )
1099                        .add(
1100                            Condition::all()
1101                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1102                                .add(
1103                                    follower::Column::FollowerConnectionServerId
1104                                        .eq(connection.owner_id),
1105                                )
1106                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1107                        ),
1108                )
1109                .exec(&*tx)
1110                .await?;
1111
1112            let room = if let Some(room_id) = project.room_id {
1113                Some(self.get_room(room_id, &tx).await?)
1114            } else {
1115                None
1116            };
1117
1118            let left_project = LeftProject {
1119                id: project_id,
1120                should_unshare: connection == project.host_connection()?,
1121                connection_ids,
1122            };
1123            Ok((room, left_project))
1124        })
1125        .await
1126    }
1127
1128    pub async fn check_user_is_project_host(
1129        &self,
1130        project_id: ProjectId,
1131        connection_id: ConnectionId,
1132    ) -> Result<()> {
1133        self.project_transaction(project_id, |tx| async move {
1134            project::Entity::find()
1135                .filter(
1136                    Condition::all()
1137                        .add(project::Column::Id.eq(project_id))
1138                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1139                        .add(
1140                            project::Column::HostConnectionServerId
1141                                .eq(Some(connection_id.owner_id as i32)),
1142                        ),
1143                )
1144                .one(&*tx)
1145                .await?
1146                .ok_or_else(|| anyhow!("failed to read project host"))?;
1147
1148            Ok(())
1149        })
1150        .await
1151        .map(|guard| guard.into_inner())
1152    }
1153
1154    /// Returns the current project if the given user is authorized to access it with the specified capability.
1155    pub async fn access_project(
1156        &self,
1157        project_id: ProjectId,
1158        connection_id: ConnectionId,
1159        capability: Capability,
1160        tx: &DatabaseTransaction,
1161    ) -> Result<(project::Model, ChannelRole)> {
1162        let project = project::Entity::find_by_id(project_id)
1163            .one(tx)
1164            .await?
1165            .ok_or_else(|| anyhow!("no such project"))?;
1166
1167        let role_from_room = if let Some(room_id) = project.room_id {
1168            room_participant::Entity::find()
1169                .filter(room_participant::Column::RoomId.eq(room_id))
1170                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1171                .one(tx)
1172                .await?
1173                .and_then(|participant| participant.role)
1174        } else {
1175            None
1176        };
1177
1178        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1179
1180        match capability {
1181            Capability::ReadWrite => {
1182                if !role.can_edit_projects() {
1183                    return Err(anyhow!("not authorized to edit projects"))?;
1184                }
1185            }
1186            Capability::ReadOnly => {
1187                if !role.can_read_projects() {
1188                    return Err(anyhow!("not authorized to read projects"))?;
1189                }
1190            }
1191        }
1192
1193        Ok((project, role))
1194    }
1195
1196    /// Returns the host connection for a read-only request to join a shared project.
1197    pub async fn host_for_read_only_project_request(
1198        &self,
1199        project_id: ProjectId,
1200        connection_id: ConnectionId,
1201    ) -> Result<ConnectionId> {
1202        self.project_transaction(project_id, |tx| async move {
1203            let (project, _) = self
1204                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1205                .await?;
1206            project.host_connection()
1207        })
1208        .await
1209        .map(|guard| guard.into_inner())
1210    }
1211
1212    /// Returns the host connection for a request to join a shared project.
1213    pub async fn host_for_mutating_project_request(
1214        &self,
1215        project_id: ProjectId,
1216        connection_id: ConnectionId,
1217    ) -> Result<ConnectionId> {
1218        self.project_transaction(project_id, |tx| async move {
1219            let (project, _) = self
1220                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1221                .await?;
1222            project.host_connection()
1223        })
1224        .await
1225        .map(|guard| guard.into_inner())
1226    }
1227
1228    pub async fn connections_for_buffer_update(
1229        &self,
1230        project_id: ProjectId,
1231        connection_id: ConnectionId,
1232        capability: Capability,
1233    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1234        self.project_transaction(project_id, |tx| async move {
1235            // Authorize
1236            let (project, _) = self
1237                .access_project(project_id, connection_id, capability, &tx)
1238                .await?;
1239
1240            let host_connection_id = project.host_connection()?;
1241
1242            let collaborators = project_collaborator::Entity::find()
1243                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1244                .all(&*tx)
1245                .await?;
1246
1247            let guest_connection_ids = collaborators
1248                .into_iter()
1249                .filter_map(|collaborator| {
1250                    if collaborator.is_host {
1251                        None
1252                    } else {
1253                        Some(collaborator.connection())
1254                    }
1255                })
1256                .collect();
1257
1258            Ok((host_connection_id, guest_connection_ids))
1259        })
1260        .await
1261    }
1262
1263    /// Returns the connection IDs in the given project.
1264    ///
1265    /// The provided `connection_id` must also be a collaborator in the project,
1266    /// otherwise an error will be returned.
1267    pub async fn project_connection_ids(
1268        &self,
1269        project_id: ProjectId,
1270        connection_id: ConnectionId,
1271        exclude_dev_server: bool,
1272    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1273        self.project_transaction(project_id, |tx| async move {
1274            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1275                .await
1276        })
1277        .await
1278    }
1279
1280    async fn internal_project_connection_ids(
1281        &self,
1282        project_id: ProjectId,
1283        connection_id: ConnectionId,
1284        exclude_dev_server: bool,
1285        tx: &DatabaseTransaction,
1286    ) -> Result<HashSet<ConnectionId>> {
1287        let project = project::Entity::find_by_id(project_id)
1288            .one(tx)
1289            .await?
1290            .ok_or_else(|| anyhow!("no such project"))?;
1291
1292        let mut collaborators = project_collaborator::Entity::find()
1293            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1294            .stream(tx)
1295            .await?;
1296
1297        let mut connection_ids = HashSet::default();
1298        if let Some(host_connection) = project.host_connection().log_err() {
1299            if !exclude_dev_server {
1300                connection_ids.insert(host_connection);
1301            }
1302        }
1303
1304        while let Some(collaborator) = collaborators.next().await {
1305            let collaborator = collaborator?;
1306            connection_ids.insert(collaborator.connection());
1307        }
1308
1309        if connection_ids.contains(&connection_id)
1310            || Some(connection_id) == project.host_connection().ok()
1311        {
1312            Ok(connection_ids)
1313        } else {
1314            Err(anyhow!(
1315                "can only send project updates to a project you're in"
1316            ))?
1317        }
1318    }
1319
1320    async fn project_guest_connection_ids(
1321        &self,
1322        project_id: ProjectId,
1323        tx: &DatabaseTransaction,
1324    ) -> Result<Vec<ConnectionId>> {
1325        let mut collaborators = project_collaborator::Entity::find()
1326            .filter(
1327                project_collaborator::Column::ProjectId
1328                    .eq(project_id)
1329                    .and(project_collaborator::Column::IsHost.eq(false)),
1330            )
1331            .stream(tx)
1332            .await?;
1333
1334        let mut guest_connection_ids = Vec::new();
1335        while let Some(collaborator) = collaborators.next().await {
1336            let collaborator = collaborator?;
1337            guest_connection_ids.push(collaborator.connection());
1338        }
1339        Ok(guest_connection_ids)
1340    }
1341
1342    /// Returns the [`RoomId`] for the given project.
1343    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1344        self.transaction(|tx| async move {
1345            Ok(project::Entity::find_by_id(project_id)
1346                .one(&*tx)
1347                .await?
1348                .and_then(|project| project.room_id))
1349        })
1350        .await
1351    }
1352
1353    pub async fn check_room_participants(
1354        &self,
1355        room_id: RoomId,
1356        leader_id: ConnectionId,
1357        follower_id: ConnectionId,
1358    ) -> Result<()> {
1359        self.transaction(|tx| async move {
1360            use room_participant::Column;
1361
1362            let count = room_participant::Entity::find()
1363                .filter(
1364                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1365                        Condition::any()
1366                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1367                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1368                            ))
1369                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1370                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1371                            )),
1372                    ),
1373                )
1374                .count(&*tx)
1375                .await?;
1376
1377            if count < 2 {
1378                Err(anyhow!("not room participants"))?;
1379            }
1380
1381            Ok(())
1382        })
1383        .await
1384    }
1385
1386    /// Adds the given follower connection as a follower of the given leader connection.
1387    pub async fn follow(
1388        &self,
1389        room_id: RoomId,
1390        project_id: ProjectId,
1391        leader_connection: ConnectionId,
1392        follower_connection: ConnectionId,
1393    ) -> Result<TransactionGuard<proto::Room>> {
1394        self.room_transaction(room_id, |tx| async move {
1395            follower::ActiveModel {
1396                room_id: ActiveValue::set(room_id),
1397                project_id: ActiveValue::set(project_id),
1398                leader_connection_server_id: ActiveValue::set(ServerId(
1399                    leader_connection.owner_id as i32,
1400                )),
1401                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1402                follower_connection_server_id: ActiveValue::set(ServerId(
1403                    follower_connection.owner_id as i32,
1404                )),
1405                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1406                ..Default::default()
1407            }
1408            .insert(&*tx)
1409            .await?;
1410
1411            let room = self.get_room(room_id, &tx).await?;
1412            Ok(room)
1413        })
1414        .await
1415    }
1416
1417    /// Removes the given follower connection as a follower of the given leader connection.
1418    pub async fn unfollow(
1419        &self,
1420        room_id: RoomId,
1421        project_id: ProjectId,
1422        leader_connection: ConnectionId,
1423        follower_connection: ConnectionId,
1424    ) -> Result<TransactionGuard<proto::Room>> {
1425        self.room_transaction(room_id, |tx| async move {
1426            follower::Entity::delete_many()
1427                .filter(
1428                    Condition::all()
1429                        .add(follower::Column::RoomId.eq(room_id))
1430                        .add(follower::Column::ProjectId.eq(project_id))
1431                        .add(
1432                            follower::Column::LeaderConnectionServerId
1433                                .eq(leader_connection.owner_id),
1434                        )
1435                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1436                        .add(
1437                            follower::Column::FollowerConnectionServerId
1438                                .eq(follower_connection.owner_id),
1439                        )
1440                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1441                )
1442                .exec(&*tx)
1443                .await?;
1444
1445            let room = self.get_room(room_id, &tx).await?;
1446            Ok(room)
1447        })
1448        .await
1449    }
1450}