projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .context("could not find participant")?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                id: ActiveValue::NotSet,
 102                committer_name: ActiveValue::Set(None),
 103                committer_email: ActiveValue::Set(None),
 104            }
 105            .insert(&*tx)
 106            .await?;
 107
 108            let room = self.get_room(room_id, &tx).await?;
 109            Ok((project.id, room))
 110        })
 111        .await
 112    }
 113
 114    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 115        self.transaction(|tx| async move {
 116            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 117            Ok(())
 118        })
 119        .await
 120    }
 121
 122    /// Unshares the given project.
 123    pub async fn unshare_project(
 124        &self,
 125        project_id: ProjectId,
 126        connection: ConnectionId,
 127    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 128        self.project_transaction(project_id, |tx| async move {
 129            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 130            let project = project::Entity::find_by_id(project_id)
 131                .one(&*tx)
 132                .await?
 133                .context("project not found")?;
 134            let room = if let Some(room_id) = project.room_id {
 135                Some(self.get_room(room_id, &tx).await?)
 136            } else {
 137                None
 138            };
 139            if project.host_connection()? == connection {
 140                return Ok((true, room, guest_connection_ids));
 141            }
 142            Err(anyhow!("cannot unshare a project hosted by another user"))?
 143        })
 144        .await
 145    }
 146
 147    /// Updates the worktrees associated with the given project.
 148    pub async fn update_project(
 149        &self,
 150        project_id: ProjectId,
 151        connection: ConnectionId,
 152        worktrees: &[proto::WorktreeMetadata],
 153    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 154        self.project_transaction(project_id, |tx| async move {
 155            let project = project::Entity::find_by_id(project_id)
 156                .filter(
 157                    Condition::all()
 158                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 159                        .add(
 160                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 161                        ),
 162                )
 163                .one(&*tx)
 164                .await?
 165                .context("no such project")?;
 166
 167            self.update_project_worktrees(project.id, worktrees, &tx)
 168                .await?;
 169
 170            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 171
 172            let room = if let Some(room_id) = project.room_id {
 173                Some(self.get_room(room_id, &tx).await?)
 174            } else {
 175                None
 176            };
 177
 178            Ok((room, guest_connection_ids))
 179        })
 180        .await
 181    }
 182
 183    pub(in crate::db) async fn update_project_worktrees(
 184        &self,
 185        project_id: ProjectId,
 186        worktrees: &[proto::WorktreeMetadata],
 187        tx: &DatabaseTransaction,
 188    ) -> Result<()> {
 189        if !worktrees.is_empty() {
 190            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 191                id: ActiveValue::set(worktree.id as i64),
 192                project_id: ActiveValue::set(project_id),
 193                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 194                root_name: ActiveValue::set(worktree.root_name.clone()),
 195                visible: ActiveValue::set(worktree.visible),
 196                scan_id: ActiveValue::set(0),
 197                completed_scan_id: ActiveValue::set(0),
 198            }))
 199            .on_conflict(
 200                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 201                    .update_column(worktree::Column::RootName)
 202                    .to_owned(),
 203            )
 204            .exec(tx)
 205            .await?;
 206        }
 207
 208        worktree::Entity::delete_many()
 209            .filter(worktree::Column::ProjectId.eq(project_id).and(
 210                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 211            ))
 212            .exec(tx)
 213            .await?;
 214
 215        Ok(())
 216    }
 217
 218    pub async fn update_worktree(
 219        &self,
 220        update: &proto::UpdateWorktree,
 221        connection: ConnectionId,
 222    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 223        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 224            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 225        {
 226            return Err(anyhow!(
 227                "invalid worktree update. removed entries: {}, updated entries: {}",
 228                update.removed_entries.len(),
 229                update.updated_entries.len()
 230            ))?;
 231        }
 232
 233        let project_id = ProjectId::from_proto(update.project_id);
 234        let worktree_id = update.worktree_id as i64;
 235        self.project_transaction(project_id, |tx| async move {
 236            // Ensure the update comes from the host.
 237            let _project = project::Entity::find_by_id(project_id)
 238                .filter(
 239                    Condition::all()
 240                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 241                        .add(
 242                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 243                        ),
 244                )
 245                .one(&*tx)
 246                .await?
 247                .with_context(|| format!("no such project: {project_id}"))?;
 248
 249            // Update metadata.
 250            worktree::Entity::update(worktree::ActiveModel {
 251                id: ActiveValue::set(worktree_id),
 252                project_id: ActiveValue::set(project_id),
 253                root_name: ActiveValue::set(update.root_name.clone()),
 254                scan_id: ActiveValue::set(update.scan_id as i64),
 255                completed_scan_id: if update.is_last_update {
 256                    ActiveValue::set(update.scan_id as i64)
 257                } else {
 258                    ActiveValue::default()
 259                },
 260                abs_path: ActiveValue::set(update.abs_path.clone()),
 261                ..Default::default()
 262            })
 263            .exec(&*tx)
 264            .await?;
 265
 266            if !update.updated_entries.is_empty() {
 267                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 268                    let mtime = entry.mtime.clone().unwrap_or_default();
 269                    worktree_entry::ActiveModel {
 270                        project_id: ActiveValue::set(project_id),
 271                        worktree_id: ActiveValue::set(worktree_id),
 272                        id: ActiveValue::set(entry.id as i64),
 273                        is_dir: ActiveValue::set(entry.is_dir),
 274                        path: ActiveValue::set(entry.path.clone()),
 275                        inode: ActiveValue::set(entry.inode as i64),
 276                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 277                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 278                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 279                        is_ignored: ActiveValue::set(entry.is_ignored),
 280                        git_status: ActiveValue::set(None),
 281                        is_external: ActiveValue::set(entry.is_external),
 282                        is_deleted: ActiveValue::set(false),
 283                        scan_id: ActiveValue::set(update.scan_id as i64),
 284                        is_fifo: ActiveValue::set(entry.is_fifo),
 285                    }
 286                }))
 287                .on_conflict(
 288                    OnConflict::columns([
 289                        worktree_entry::Column::ProjectId,
 290                        worktree_entry::Column::WorktreeId,
 291                        worktree_entry::Column::Id,
 292                    ])
 293                    .update_columns([
 294                        worktree_entry::Column::IsDir,
 295                        worktree_entry::Column::Path,
 296                        worktree_entry::Column::Inode,
 297                        worktree_entry::Column::MtimeSeconds,
 298                        worktree_entry::Column::MtimeNanos,
 299                        worktree_entry::Column::CanonicalPath,
 300                        worktree_entry::Column::IsIgnored,
 301                        worktree_entry::Column::ScanId,
 302                    ])
 303                    .to_owned(),
 304                )
 305                .exec(&*tx)
 306                .await?;
 307            }
 308
 309            if !update.removed_entries.is_empty() {
 310                worktree_entry::Entity::update_many()
 311                    .filter(
 312                        worktree_entry::Column::ProjectId
 313                            .eq(project_id)
 314                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 315                            .and(
 316                                worktree_entry::Column::Id
 317                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 318                            ),
 319                    )
 320                    .set(worktree_entry::ActiveModel {
 321                        is_deleted: ActiveValue::Set(true),
 322                        scan_id: ActiveValue::Set(update.scan_id as i64),
 323                        ..Default::default()
 324                    })
 325                    .exec(&*tx)
 326                    .await?;
 327            }
 328
 329            // Backward-compatibility for old Zed clients.
 330            //
 331            // Remove this block when Zed 1.80 stable has been out for a week.
 332            {
 333                if !update.updated_repositories.is_empty() {
 334                    project_repository::Entity::insert_many(
 335                        update.updated_repositories.iter().map(|repository| {
 336                            project_repository::ActiveModel {
 337                                project_id: ActiveValue::set(project_id),
 338                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 339                                id: ActiveValue::set(repository.repository_id as i64),
 340                                scan_id: ActiveValue::set(update.scan_id as i64),
 341                                is_deleted: ActiveValue::set(false),
 342                                branch_summary: ActiveValue::Set(
 343                                    repository
 344                                        .branch_summary
 345                                        .as_ref()
 346                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 347                                ),
 348                                current_merge_conflicts: ActiveValue::Set(Some(
 349                                    serde_json::to_string(&repository.current_merge_conflicts)
 350                                        .unwrap(),
 351                                )),
 352                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 353                                abs_path: ActiveValue::set(String::new()),
 354                                entry_ids: ActiveValue::set("[]".into()),
 355                                head_commit_details: ActiveValue::set(None),
 356                                merge_message: ActiveValue::set(None),
 357                            }
 358                        }),
 359                    )
 360                    .on_conflict(
 361                        OnConflict::columns([
 362                            project_repository::Column::ProjectId,
 363                            project_repository::Column::Id,
 364                        ])
 365                        .update_columns([
 366                            project_repository::Column::ScanId,
 367                            project_repository::Column::BranchSummary,
 368                            project_repository::Column::CurrentMergeConflicts,
 369                        ])
 370                        .to_owned(),
 371                    )
 372                    .exec(&*tx)
 373                    .await?;
 374
 375                    let has_any_statuses = update
 376                        .updated_repositories
 377                        .iter()
 378                        .any(|repository| !repository.updated_statuses.is_empty());
 379
 380                    if has_any_statuses {
 381                        project_repository_statuses::Entity::insert_many(
 382                            update.updated_repositories.iter().flat_map(
 383                                |repository: &proto::RepositoryEntry| {
 384                                    repository.updated_statuses.iter().map(|status_entry| {
 385                                        let (repo_path, status_kind, first_status, second_status) =
 386                                            proto_status_to_db(status_entry.clone());
 387                                        project_repository_statuses::ActiveModel {
 388                                            project_id: ActiveValue::set(project_id),
 389                                            repository_id: ActiveValue::set(
 390                                                repository.repository_id as i64,
 391                                            ),
 392                                            scan_id: ActiveValue::set(update.scan_id as i64),
 393                                            is_deleted: ActiveValue::set(false),
 394                                            repo_path: ActiveValue::set(repo_path),
 395                                            status: ActiveValue::set(0),
 396                                            status_kind: ActiveValue::set(status_kind),
 397                                            first_status: ActiveValue::set(first_status),
 398                                            second_status: ActiveValue::set(second_status),
 399                                        }
 400                                    })
 401                                },
 402                            ),
 403                        )
 404                        .on_conflict(
 405                            OnConflict::columns([
 406                                project_repository_statuses::Column::ProjectId,
 407                                project_repository_statuses::Column::RepositoryId,
 408                                project_repository_statuses::Column::RepoPath,
 409                            ])
 410                            .update_columns([
 411                                project_repository_statuses::Column::ScanId,
 412                                project_repository_statuses::Column::StatusKind,
 413                                project_repository_statuses::Column::FirstStatus,
 414                                project_repository_statuses::Column::SecondStatus,
 415                            ])
 416                            .to_owned(),
 417                        )
 418                        .exec(&*tx)
 419                        .await?;
 420                    }
 421
 422                    for repo in &update.updated_repositories {
 423                        if !repo.removed_statuses.is_empty() {
 424                            project_repository_statuses::Entity::update_many()
 425                                .filter(
 426                                    project_repository_statuses::Column::ProjectId
 427                                        .eq(project_id)
 428                                        .and(
 429                                            project_repository_statuses::Column::RepositoryId
 430                                                .eq(repo.repository_id),
 431                                        )
 432                                        .and(
 433                                            project_repository_statuses::Column::RepoPath
 434                                                .is_in(repo.removed_statuses.iter()),
 435                                        ),
 436                                )
 437                                .set(project_repository_statuses::ActiveModel {
 438                                    is_deleted: ActiveValue::Set(true),
 439                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 440                                    ..Default::default()
 441                                })
 442                                .exec(&*tx)
 443                                .await?;
 444                        }
 445                    }
 446                }
 447
 448                if !update.removed_repositories.is_empty() {
 449                    project_repository::Entity::update_many()
 450                        .filter(
 451                            project_repository::Column::ProjectId
 452                                .eq(project_id)
 453                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 454                                .and(project_repository::Column::Id.is_in(
 455                                    update.removed_repositories.iter().map(|id| *id as i64),
 456                                )),
 457                        )
 458                        .set(project_repository::ActiveModel {
 459                            is_deleted: ActiveValue::Set(true),
 460                            scan_id: ActiveValue::Set(update.scan_id as i64),
 461                            ..Default::default()
 462                        })
 463                        .exec(&*tx)
 464                        .await?;
 465                }
 466            }
 467
 468            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 469            Ok(connection_ids)
 470        })
 471        .await
 472    }
 473
 474    pub async fn update_repository(
 475        &self,
 476        update: &proto::UpdateRepository,
 477        _connection: ConnectionId,
 478    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 479        let project_id = ProjectId::from_proto(update.project_id);
 480        let repository_id = update.id as i64;
 481        self.project_transaction(project_id, |tx| async move {
 482            project_repository::Entity::insert(project_repository::ActiveModel {
 483                project_id: ActiveValue::set(project_id),
 484                id: ActiveValue::set(repository_id),
 485                legacy_worktree_id: ActiveValue::set(None),
 486                abs_path: ActiveValue::set(update.abs_path.clone()),
 487                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 488                scan_id: ActiveValue::set(update.scan_id as i64),
 489                is_deleted: ActiveValue::set(false),
 490                branch_summary: ActiveValue::Set(
 491                    update
 492                        .branch_summary
 493                        .as_ref()
 494                        .map(|summary| serde_json::to_string(summary).unwrap()),
 495                ),
 496                head_commit_details: ActiveValue::Set(
 497                    update
 498                        .head_commit_details
 499                        .as_ref()
 500                        .map(|details| serde_json::to_string(details).unwrap()),
 501                ),
 502                current_merge_conflicts: ActiveValue::Set(Some(
 503                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 504                )),
 505                merge_message: ActiveValue::set(update.merge_message.clone()),
 506            })
 507            .on_conflict(
 508                OnConflict::columns([
 509                    project_repository::Column::ProjectId,
 510                    project_repository::Column::Id,
 511                ])
 512                .update_columns([
 513                    project_repository::Column::ScanId,
 514                    project_repository::Column::BranchSummary,
 515                    project_repository::Column::EntryIds,
 516                    project_repository::Column::AbsPath,
 517                    project_repository::Column::CurrentMergeConflicts,
 518                    project_repository::Column::HeadCommitDetails,
 519                    project_repository::Column::MergeMessage,
 520                ])
 521                .to_owned(),
 522            )
 523            .exec(&*tx)
 524            .await?;
 525
 526            let has_any_statuses = !update.updated_statuses.is_empty();
 527
 528            if has_any_statuses {
 529                project_repository_statuses::Entity::insert_many(
 530                    update.updated_statuses.iter().map(|status_entry| {
 531                        let (repo_path, status_kind, first_status, second_status) =
 532                            proto_status_to_db(status_entry.clone());
 533                        project_repository_statuses::ActiveModel {
 534                            project_id: ActiveValue::set(project_id),
 535                            repository_id: ActiveValue::set(repository_id),
 536                            scan_id: ActiveValue::set(update.scan_id as i64),
 537                            is_deleted: ActiveValue::set(false),
 538                            repo_path: ActiveValue::set(repo_path),
 539                            status: ActiveValue::set(0),
 540                            status_kind: ActiveValue::set(status_kind),
 541                            first_status: ActiveValue::set(first_status),
 542                            second_status: ActiveValue::set(second_status),
 543                        }
 544                    }),
 545                )
 546                .on_conflict(
 547                    OnConflict::columns([
 548                        project_repository_statuses::Column::ProjectId,
 549                        project_repository_statuses::Column::RepositoryId,
 550                        project_repository_statuses::Column::RepoPath,
 551                    ])
 552                    .update_columns([
 553                        project_repository_statuses::Column::ScanId,
 554                        project_repository_statuses::Column::StatusKind,
 555                        project_repository_statuses::Column::FirstStatus,
 556                        project_repository_statuses::Column::SecondStatus,
 557                    ])
 558                    .to_owned(),
 559                )
 560                .exec(&*tx)
 561                .await?;
 562            }
 563
 564            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 565
 566            if has_any_removed_statuses {
 567                project_repository_statuses::Entity::update_many()
 568                    .filter(
 569                        project_repository_statuses::Column::ProjectId
 570                            .eq(project_id)
 571                            .and(
 572                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 573                            )
 574                            .and(
 575                                project_repository_statuses::Column::RepoPath
 576                                    .is_in(update.removed_statuses.iter()),
 577                            ),
 578                    )
 579                    .set(project_repository_statuses::ActiveModel {
 580                        is_deleted: ActiveValue::Set(true),
 581                        scan_id: ActiveValue::Set(update.scan_id as i64),
 582                        ..Default::default()
 583                    })
 584                    .exec(&*tx)
 585                    .await?;
 586            }
 587
 588            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 589            Ok(connection_ids)
 590        })
 591        .await
 592    }
 593
 594    pub async fn remove_repository(
 595        &self,
 596        remove: &proto::RemoveRepository,
 597        _connection: ConnectionId,
 598    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 599        let project_id = ProjectId::from_proto(remove.project_id);
 600        let repository_id = remove.id as i64;
 601        self.project_transaction(project_id, |tx| async move {
 602            project_repository::Entity::update_many()
 603                .filter(
 604                    project_repository::Column::ProjectId
 605                        .eq(project_id)
 606                        .and(project_repository::Column::Id.eq(repository_id)),
 607                )
 608                .set(project_repository::ActiveModel {
 609                    is_deleted: ActiveValue::Set(true),
 610                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 611                    ..Default::default()
 612                })
 613                .exec(&*tx)
 614                .await?;
 615
 616            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 617            Ok(connection_ids)
 618        })
 619        .await
 620    }
 621
 622    /// Updates the diagnostic summary for the given connection.
 623    pub async fn update_diagnostic_summary(
 624        &self,
 625        update: &proto::UpdateDiagnosticSummary,
 626        connection: ConnectionId,
 627    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 628        let project_id = ProjectId::from_proto(update.project_id);
 629        let worktree_id = update.worktree_id as i64;
 630        self.project_transaction(project_id, |tx| async move {
 631            let summary = update.summary.as_ref().context("invalid summary")?;
 632
 633            // Ensure the update comes from the host.
 634            let project = project::Entity::find_by_id(project_id)
 635                .one(&*tx)
 636                .await?
 637                .context("no such project")?;
 638            if project.host_connection()? != connection {
 639                return Err(anyhow!("can't update a project hosted by someone else"))?;
 640            }
 641
 642            // Update summary.
 643            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 644                project_id: ActiveValue::set(project_id),
 645                worktree_id: ActiveValue::set(worktree_id),
 646                path: ActiveValue::set(summary.path.clone()),
 647                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 648                error_count: ActiveValue::set(summary.error_count as i32),
 649                warning_count: ActiveValue::set(summary.warning_count as i32),
 650            })
 651            .on_conflict(
 652                OnConflict::columns([
 653                    worktree_diagnostic_summary::Column::ProjectId,
 654                    worktree_diagnostic_summary::Column::WorktreeId,
 655                    worktree_diagnostic_summary::Column::Path,
 656                ])
 657                .update_columns([
 658                    worktree_diagnostic_summary::Column::LanguageServerId,
 659                    worktree_diagnostic_summary::Column::ErrorCount,
 660                    worktree_diagnostic_summary::Column::WarningCount,
 661                ])
 662                .to_owned(),
 663            )
 664            .exec(&*tx)
 665            .await?;
 666
 667            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 668            Ok(connection_ids)
 669        })
 670        .await
 671    }
 672
 673    /// Starts the language server for the given connection.
 674    pub async fn start_language_server(
 675        &self,
 676        update: &proto::StartLanguageServer,
 677        connection: ConnectionId,
 678    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 679        let project_id = ProjectId::from_proto(update.project_id);
 680        self.project_transaction(project_id, |tx| async move {
 681            let server = update.server.as_ref().context("invalid language server")?;
 682
 683            // Ensure the update comes from the host.
 684            let project = project::Entity::find_by_id(project_id)
 685                .one(&*tx)
 686                .await?
 687                .context("no such project")?;
 688            if project.host_connection()? != connection {
 689                return Err(anyhow!("can't update a project hosted by someone else"))?;
 690            }
 691
 692            // Add the newly-started language server.
 693            language_server::Entity::insert(language_server::ActiveModel {
 694                project_id: ActiveValue::set(project_id),
 695                id: ActiveValue::set(server.id as i64),
 696                name: ActiveValue::set(server.name.clone()),
 697                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 698                capabilities: ActiveValue::set(update.capabilities.clone()),
 699            })
 700            .on_conflict(
 701                OnConflict::columns([
 702                    language_server::Column::ProjectId,
 703                    language_server::Column::Id,
 704                ])
 705                .update_columns([
 706                    language_server::Column::Name,
 707                    language_server::Column::Capabilities,
 708                    language_server::Column::WorktreeId,
 709                ])
 710                .to_owned(),
 711            )
 712            .exec(&*tx)
 713            .await?;
 714
 715            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 716            Ok(connection_ids)
 717        })
 718        .await
 719    }
 720
 721    /// Updates the worktree settings for the given connection.
 722    pub async fn update_worktree_settings(
 723        &self,
 724        update: &proto::UpdateWorktreeSettings,
 725        connection: ConnectionId,
 726    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 727        let project_id = ProjectId::from_proto(update.project_id);
 728        let kind = match update.kind {
 729            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 730                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 731            None => proto::LocalSettingsKind::Settings,
 732        };
 733        let kind = LocalSettingsKind::from_proto(kind);
 734        self.project_transaction(project_id, |tx| async move {
 735            // Ensure the update comes from the host.
 736            let project = project::Entity::find_by_id(project_id)
 737                .one(&*tx)
 738                .await?
 739                .context("no such project")?;
 740            if project.host_connection()? != connection {
 741                return Err(anyhow!("can't update a project hosted by someone else"))?;
 742            }
 743
 744            if let Some(content) = &update.content {
 745                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 746                    project_id: ActiveValue::Set(project_id),
 747                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 748                    path: ActiveValue::Set(update.path.clone()),
 749                    content: ActiveValue::Set(content.clone()),
 750                    kind: ActiveValue::Set(kind),
 751                })
 752                .on_conflict(
 753                    OnConflict::columns([
 754                        worktree_settings_file::Column::ProjectId,
 755                        worktree_settings_file::Column::WorktreeId,
 756                        worktree_settings_file::Column::Path,
 757                    ])
 758                    .update_column(worktree_settings_file::Column::Content)
 759                    .to_owned(),
 760                )
 761                .exec(&*tx)
 762                .await?;
 763            } else {
 764                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 765                    project_id: ActiveValue::Set(project_id),
 766                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 767                    path: ActiveValue::Set(update.path.clone()),
 768                    ..Default::default()
 769                })
 770                .exec(&*tx)
 771                .await?;
 772            }
 773
 774            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 775            Ok(connection_ids)
 776        })
 777        .await
 778    }
 779
 780    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 781        self.transaction(|tx| async move {
 782            Ok(project::Entity::find_by_id(id)
 783                .one(&*tx)
 784                .await?
 785                .context("no such project")?)
 786        })
 787        .await
 788    }
 789
 790    /// Adds the given connection to the specified project
 791    /// in the current room.
 792    pub async fn join_project(
 793        &self,
 794        project_id: ProjectId,
 795        connection: ConnectionId,
 796        user_id: UserId,
 797        committer_name: Option<String>,
 798        committer_email: Option<String>,
 799    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 800        self.project_transaction(project_id, move |tx| {
 801            let committer_name = committer_name.clone();
 802            let committer_email = committer_email.clone();
 803            async move {
 804                let (project, role) = self
 805                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 806                    .await?;
 807                self.join_project_internal(
 808                    project,
 809                    user_id,
 810                    committer_name,
 811                    committer_email,
 812                    connection,
 813                    role,
 814                    &tx,
 815                )
 816                .await
 817            }
 818        })
 819        .await
 820    }
 821
 822    async fn join_project_internal(
 823        &self,
 824        project: project::Model,
 825        user_id: UserId,
 826        committer_name: Option<String>,
 827        committer_email: Option<String>,
 828        connection: ConnectionId,
 829        role: ChannelRole,
 830        tx: &DatabaseTransaction,
 831    ) -> Result<(Project, ReplicaId)> {
 832        let mut collaborators = project
 833            .find_related(project_collaborator::Entity)
 834            .all(tx)
 835            .await?;
 836        let replica_ids = collaborators
 837            .iter()
 838            .map(|c| c.replica_id)
 839            .collect::<HashSet<_>>();
 840        let mut replica_id = ReplicaId(1);
 841        while replica_ids.contains(&replica_id) {
 842            replica_id.0 += 1;
 843        }
 844        let new_collaborator = project_collaborator::ActiveModel {
 845            project_id: ActiveValue::set(project.id),
 846            connection_id: ActiveValue::set(connection.id as i32),
 847            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 848            user_id: ActiveValue::set(user_id),
 849            replica_id: ActiveValue::set(replica_id),
 850            is_host: ActiveValue::set(false),
 851            id: ActiveValue::NotSet,
 852            committer_name: ActiveValue::set(committer_name),
 853            committer_email: ActiveValue::set(committer_email),
 854        }
 855        .insert(tx)
 856        .await?;
 857        collaborators.push(new_collaborator);
 858
 859        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 860        let mut worktrees = db_worktrees
 861            .into_iter()
 862            .map(|db_worktree| {
 863                (
 864                    db_worktree.id as u64,
 865                    Worktree {
 866                        id: db_worktree.id as u64,
 867                        abs_path: db_worktree.abs_path,
 868                        root_name: db_worktree.root_name,
 869                        visible: db_worktree.visible,
 870                        entries: Default::default(),
 871                        diagnostic_summaries: Default::default(),
 872                        settings_files: Default::default(),
 873                        scan_id: db_worktree.scan_id as u64,
 874                        completed_scan_id: db_worktree.completed_scan_id as u64,
 875                        legacy_repository_entries: Default::default(),
 876                    },
 877                )
 878            })
 879            .collect::<BTreeMap<_, _>>();
 880
 881        // Populate worktree entries.
 882        {
 883            let mut db_entries = worktree_entry::Entity::find()
 884                .filter(
 885                    Condition::all()
 886                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 887                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 888                )
 889                .stream(tx)
 890                .await?;
 891            while let Some(db_entry) = db_entries.next().await {
 892                let db_entry = db_entry?;
 893                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 894                    worktree.entries.push(proto::Entry {
 895                        id: db_entry.id as u64,
 896                        is_dir: db_entry.is_dir,
 897                        path: db_entry.path,
 898                        inode: db_entry.inode as u64,
 899                        mtime: Some(proto::Timestamp {
 900                            seconds: db_entry.mtime_seconds as u64,
 901                            nanos: db_entry.mtime_nanos as u32,
 902                        }),
 903                        canonical_path: db_entry.canonical_path,
 904                        is_ignored: db_entry.is_ignored,
 905                        is_external: db_entry.is_external,
 906                        // This is only used in the summarization backlog, so if it's None,
 907                        // that just means we won't be able to detect when to resummarize
 908                        // based on total number of backlogged bytes - instead, we'd go
 909                        // on number of files only. That shouldn't be a huge deal in practice.
 910                        size: None,
 911                        is_fifo: db_entry.is_fifo,
 912                    });
 913                }
 914            }
 915        }
 916
 917        // Populate repository entries.
 918        let mut repositories = Vec::new();
 919        {
 920            let db_repository_entries = project_repository::Entity::find()
 921                .filter(
 922                    Condition::all()
 923                        .add(project_repository::Column::ProjectId.eq(project.id))
 924                        .add(project_repository::Column::IsDeleted.eq(false)),
 925                )
 926                .all(tx)
 927                .await?;
 928            for db_repository_entry in db_repository_entries {
 929                let mut repository_statuses = project_repository_statuses::Entity::find()
 930                    .filter(
 931                        Condition::all()
 932                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 933                            .add(
 934                                project_repository_statuses::Column::RepositoryId
 935                                    .eq(db_repository_entry.id),
 936                            )
 937                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 938                    )
 939                    .stream(tx)
 940                    .await?;
 941                let mut updated_statuses = Vec::new();
 942                while let Some(status_entry) = repository_statuses.next().await {
 943                    let status_entry = status_entry?;
 944                    updated_statuses.push(db_status_to_proto(status_entry)?);
 945                }
 946
 947                let current_merge_conflicts = db_repository_entry
 948                    .current_merge_conflicts
 949                    .as_ref()
 950                    .map(|conflicts| serde_json::from_str(conflicts))
 951                    .transpose()?
 952                    .unwrap_or_default();
 953
 954                let branch_summary = db_repository_entry
 955                    .branch_summary
 956                    .as_ref()
 957                    .map(|branch_summary| serde_json::from_str(branch_summary))
 958                    .transpose()?
 959                    .unwrap_or_default();
 960
 961                let head_commit_details = db_repository_entry
 962                    .head_commit_details
 963                    .as_ref()
 964                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 965                    .transpose()?
 966                    .unwrap_or_default();
 967
 968                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 969                    .context("failed to deserialize repository's entry ids")?;
 970
 971                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 972                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 973                        worktree.legacy_repository_entries.insert(
 974                            db_repository_entry.id as u64,
 975                            proto::RepositoryEntry {
 976                                repository_id: db_repository_entry.id as u64,
 977                                updated_statuses,
 978                                removed_statuses: Vec::new(),
 979                                current_merge_conflicts,
 980                                branch_summary,
 981                            },
 982                        );
 983                    }
 984                } else {
 985                    repositories.push(proto::UpdateRepository {
 986                        project_id: db_repository_entry.project_id.0 as u64,
 987                        id: db_repository_entry.id as u64,
 988                        abs_path: db_repository_entry.abs_path,
 989                        entry_ids,
 990                        updated_statuses,
 991                        removed_statuses: Vec::new(),
 992                        current_merge_conflicts,
 993                        branch_summary,
 994                        head_commit_details,
 995                        scan_id: db_repository_entry.scan_id as u64,
 996                        is_last_update: true,
 997                        merge_message: db_repository_entry.merge_message,
 998                        stash_entries: Vec::new(),
 999                    });
1000                }
1001            }
1002        }
1003
1004        // Populate worktree diagnostic summaries.
1005        {
1006            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1007                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1008                .stream(tx)
1009                .await?;
1010            while let Some(db_summary) = db_summaries.next().await {
1011                let db_summary = db_summary?;
1012                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1013                    worktree
1014                        .diagnostic_summaries
1015                        .push(proto::DiagnosticSummary {
1016                            path: db_summary.path,
1017                            language_server_id: db_summary.language_server_id as u64,
1018                            error_count: db_summary.error_count as u32,
1019                            warning_count: db_summary.warning_count as u32,
1020                        });
1021                }
1022            }
1023        }
1024
1025        // Populate worktree settings files
1026        {
1027            let mut db_settings_files = worktree_settings_file::Entity::find()
1028                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1029                .stream(tx)
1030                .await?;
1031            while let Some(db_settings_file) = db_settings_files.next().await {
1032                let db_settings_file = db_settings_file?;
1033                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1034                    worktree.settings_files.push(WorktreeSettingsFile {
1035                        path: db_settings_file.path,
1036                        content: db_settings_file.content,
1037                        kind: db_settings_file.kind,
1038                    });
1039                }
1040            }
1041        }
1042
1043        // Populate language servers.
1044        let language_servers = project
1045            .find_related(language_server::Entity)
1046            .all(tx)
1047            .await?;
1048
1049        let project = Project {
1050            id: project.id,
1051            role,
1052            collaborators: collaborators
1053                .into_iter()
1054                .map(|collaborator| ProjectCollaborator {
1055                    connection_id: collaborator.connection(),
1056                    user_id: collaborator.user_id,
1057                    replica_id: collaborator.replica_id,
1058                    is_host: collaborator.is_host,
1059                    committer_name: collaborator.committer_name,
1060                    committer_email: collaborator.committer_email,
1061                })
1062                .collect(),
1063            worktrees,
1064            repositories,
1065            language_servers: language_servers
1066                .into_iter()
1067                .map(|language_server| LanguageServer {
1068                    server: proto::LanguageServer {
1069                        id: language_server.id as u64,
1070                        name: language_server.name,
1071                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1072                    },
1073                    capabilities: language_server.capabilities,
1074                })
1075                .collect(),
1076        };
1077        Ok((project, replica_id as ReplicaId))
1078    }
1079
1080    /// Removes the given connection from the specified project.
1081    pub async fn leave_project(
1082        &self,
1083        project_id: ProjectId,
1084        connection: ConnectionId,
1085    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1086        self.project_transaction(project_id, |tx| async move {
1087            let result = project_collaborator::Entity::delete_many()
1088                .filter(
1089                    Condition::all()
1090                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1091                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1092                        .add(
1093                            project_collaborator::Column::ConnectionServerId
1094                                .eq(connection.owner_id as i32),
1095                        ),
1096                )
1097                .exec(&*tx)
1098                .await?;
1099            if result.rows_affected == 0 {
1100                Err(anyhow!("not a collaborator on this project"))?;
1101            }
1102
1103            let project = project::Entity::find_by_id(project_id)
1104                .one(&*tx)
1105                .await?
1106                .context("no such project")?;
1107            let collaborators = project
1108                .find_related(project_collaborator::Entity)
1109                .all(&*tx)
1110                .await?;
1111            let connection_ids: Vec<ConnectionId> = collaborators
1112                .into_iter()
1113                .map(|collaborator| collaborator.connection())
1114                .collect();
1115
1116            follower::Entity::delete_many()
1117                .filter(
1118                    Condition::any()
1119                        .add(
1120                            Condition::all()
1121                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1122                                .add(
1123                                    follower::Column::LeaderConnectionServerId
1124                                        .eq(connection.owner_id),
1125                                )
1126                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1127                        )
1128                        .add(
1129                            Condition::all()
1130                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1131                                .add(
1132                                    follower::Column::FollowerConnectionServerId
1133                                        .eq(connection.owner_id),
1134                                )
1135                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1136                        ),
1137                )
1138                .exec(&*tx)
1139                .await?;
1140
1141            let room = if let Some(room_id) = project.room_id {
1142                Some(self.get_room(room_id, &tx).await?)
1143            } else {
1144                None
1145            };
1146
1147            let left_project = LeftProject {
1148                id: project_id,
1149                should_unshare: connection == project.host_connection()?,
1150                connection_ids,
1151            };
1152            Ok((room, left_project))
1153        })
1154        .await
1155    }
1156
1157    pub async fn check_user_is_project_host(
1158        &self,
1159        project_id: ProjectId,
1160        connection_id: ConnectionId,
1161    ) -> Result<()> {
1162        self.project_transaction(project_id, |tx| async move {
1163            project::Entity::find()
1164                .filter(
1165                    Condition::all()
1166                        .add(project::Column::Id.eq(project_id))
1167                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1168                        .add(
1169                            project::Column::HostConnectionServerId
1170                                .eq(Some(connection_id.owner_id as i32)),
1171                        ),
1172                )
1173                .one(&*tx)
1174                .await?
1175                .context("failed to read project host")?;
1176
1177            Ok(())
1178        })
1179        .await
1180        .map(|guard| guard.into_inner())
1181    }
1182
1183    /// Returns the current project if the given user is authorized to access it with the specified capability.
1184    pub async fn access_project(
1185        &self,
1186        project_id: ProjectId,
1187        connection_id: ConnectionId,
1188        capability: Capability,
1189        tx: &DatabaseTransaction,
1190    ) -> Result<(project::Model, ChannelRole)> {
1191        let project = project::Entity::find_by_id(project_id)
1192            .one(tx)
1193            .await?
1194            .context("no such project")?;
1195
1196        let role_from_room = if let Some(room_id) = project.room_id {
1197            room_participant::Entity::find()
1198                .filter(room_participant::Column::RoomId.eq(room_id))
1199                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1200                .one(tx)
1201                .await?
1202                .and_then(|participant| participant.role)
1203        } else {
1204            None
1205        };
1206
1207        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1208
1209        match capability {
1210            Capability::ReadWrite => {
1211                if !role.can_edit_projects() {
1212                    return Err(anyhow!("not authorized to edit projects"))?;
1213                }
1214            }
1215            Capability::ReadOnly => {
1216                if !role.can_read_projects() {
1217                    return Err(anyhow!("not authorized to read projects"))?;
1218                }
1219            }
1220        }
1221
1222        Ok((project, role))
1223    }
1224
1225    /// Returns the host connection for a read-only request to join a shared project.
1226    pub async fn host_for_read_only_project_request(
1227        &self,
1228        project_id: ProjectId,
1229        connection_id: ConnectionId,
1230    ) -> Result<ConnectionId> {
1231        self.project_transaction(project_id, |tx| async move {
1232            let (project, _) = self
1233                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1234                .await?;
1235            project.host_connection()
1236        })
1237        .await
1238        .map(|guard| guard.into_inner())
1239    }
1240
1241    /// Returns the host connection for a request to join a shared project.
1242    pub async fn host_for_mutating_project_request(
1243        &self,
1244        project_id: ProjectId,
1245        connection_id: ConnectionId,
1246    ) -> Result<ConnectionId> {
1247        self.project_transaction(project_id, |tx| async move {
1248            let (project, _) = self
1249                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1250                .await?;
1251            project.host_connection()
1252        })
1253        .await
1254        .map(|guard| guard.into_inner())
1255    }
1256
1257    pub async fn connections_for_buffer_update(
1258        &self,
1259        project_id: ProjectId,
1260        connection_id: ConnectionId,
1261        capability: Capability,
1262    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1263        self.project_transaction(project_id, |tx| async move {
1264            // Authorize
1265            let (project, _) = self
1266                .access_project(project_id, connection_id, capability, &tx)
1267                .await?;
1268
1269            let host_connection_id = project.host_connection()?;
1270
1271            let collaborators = project_collaborator::Entity::find()
1272                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1273                .all(&*tx)
1274                .await?;
1275
1276            let guest_connection_ids = collaborators
1277                .into_iter()
1278                .filter_map(|collaborator| {
1279                    if collaborator.is_host {
1280                        None
1281                    } else {
1282                        Some(collaborator.connection())
1283                    }
1284                })
1285                .collect();
1286
1287            Ok((host_connection_id, guest_connection_ids))
1288        })
1289        .await
1290    }
1291
1292    /// Returns the connection IDs in the given project.
1293    ///
1294    /// The provided `connection_id` must also be a collaborator in the project,
1295    /// otherwise an error will be returned.
1296    pub async fn project_connection_ids(
1297        &self,
1298        project_id: ProjectId,
1299        connection_id: ConnectionId,
1300        exclude_dev_server: bool,
1301    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1302        self.project_transaction(project_id, |tx| async move {
1303            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1304                .await
1305        })
1306        .await
1307    }
1308
1309    async fn internal_project_connection_ids(
1310        &self,
1311        project_id: ProjectId,
1312        connection_id: ConnectionId,
1313        exclude_dev_server: bool,
1314        tx: &DatabaseTransaction,
1315    ) -> Result<HashSet<ConnectionId>> {
1316        let project = project::Entity::find_by_id(project_id)
1317            .one(tx)
1318            .await?
1319            .context("no such project")?;
1320
1321        let mut collaborators = project_collaborator::Entity::find()
1322            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1323            .stream(tx)
1324            .await?;
1325
1326        let mut connection_ids = HashSet::default();
1327        if let Some(host_connection) = project.host_connection().log_err()
1328            && !exclude_dev_server
1329        {
1330            connection_ids.insert(host_connection);
1331        }
1332
1333        while let Some(collaborator) = collaborators.next().await {
1334            let collaborator = collaborator?;
1335            connection_ids.insert(collaborator.connection());
1336        }
1337
1338        if connection_ids.contains(&connection_id)
1339            || Some(connection_id) == project.host_connection().ok()
1340        {
1341            Ok(connection_ids)
1342        } else {
1343            Err(anyhow!(
1344                "can only send project updates to a project you're in"
1345            ))?
1346        }
1347    }
1348
1349    async fn project_guest_connection_ids(
1350        &self,
1351        project_id: ProjectId,
1352        tx: &DatabaseTransaction,
1353    ) -> Result<Vec<ConnectionId>> {
1354        let mut collaborators = project_collaborator::Entity::find()
1355            .filter(
1356                project_collaborator::Column::ProjectId
1357                    .eq(project_id)
1358                    .and(project_collaborator::Column::IsHost.eq(false)),
1359            )
1360            .stream(tx)
1361            .await?;
1362
1363        let mut guest_connection_ids = Vec::new();
1364        while let Some(collaborator) = collaborators.next().await {
1365            let collaborator = collaborator?;
1366            guest_connection_ids.push(collaborator.connection());
1367        }
1368        Ok(guest_connection_ids)
1369    }
1370
1371    /// Returns the [`RoomId`] for the given project.
1372    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1373        self.transaction(|tx| async move {
1374            Ok(project::Entity::find_by_id(project_id)
1375                .one(&*tx)
1376                .await?
1377                .and_then(|project| project.room_id))
1378        })
1379        .await
1380    }
1381
1382    pub async fn check_room_participants(
1383        &self,
1384        room_id: RoomId,
1385        leader_id: ConnectionId,
1386        follower_id: ConnectionId,
1387    ) -> Result<()> {
1388        self.transaction(|tx| async move {
1389            use room_participant::Column;
1390
1391            let count = room_participant::Entity::find()
1392                .filter(
1393                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1394                        Condition::any()
1395                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1396                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1397                            ))
1398                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1399                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1400                            )),
1401                    ),
1402                )
1403                .count(&*tx)
1404                .await?;
1405
1406            if count < 2 {
1407                Err(anyhow!("not room participants"))?;
1408            }
1409
1410            Ok(())
1411        })
1412        .await
1413    }
1414
1415    /// Adds the given follower connection as a follower of the given leader connection.
1416    pub async fn follow(
1417        &self,
1418        room_id: RoomId,
1419        project_id: ProjectId,
1420        leader_connection: ConnectionId,
1421        follower_connection: ConnectionId,
1422    ) -> Result<TransactionGuard<proto::Room>> {
1423        self.room_transaction(room_id, |tx| async move {
1424            follower::ActiveModel {
1425                room_id: ActiveValue::set(room_id),
1426                project_id: ActiveValue::set(project_id),
1427                leader_connection_server_id: ActiveValue::set(ServerId(
1428                    leader_connection.owner_id as i32,
1429                )),
1430                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1431                follower_connection_server_id: ActiveValue::set(ServerId(
1432                    follower_connection.owner_id as i32,
1433                )),
1434                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1435                ..Default::default()
1436            }
1437            .insert(&*tx)
1438            .await?;
1439
1440            let room = self.get_room(room_id, &tx).await?;
1441            Ok(room)
1442        })
1443        .await
1444    }
1445
1446    /// Removes the given follower connection as a follower of the given leader connection.
1447    pub async fn unfollow(
1448        &self,
1449        room_id: RoomId,
1450        project_id: ProjectId,
1451        leader_connection: ConnectionId,
1452        follower_connection: ConnectionId,
1453    ) -> Result<TransactionGuard<proto::Room>> {
1454        self.room_transaction(room_id, |tx| async move {
1455            follower::Entity::delete_many()
1456                .filter(
1457                    Condition::all()
1458                        .add(follower::Column::RoomId.eq(room_id))
1459                        .add(follower::Column::ProjectId.eq(project_id))
1460                        .add(
1461                            follower::Column::LeaderConnectionServerId
1462                                .eq(leader_connection.owner_id),
1463                        )
1464                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1465                        .add(
1466                            follower::Column::FollowerConnectionServerId
1467                                .eq(follower_connection.owner_id),
1468                        )
1469                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1470                )
1471                .exec(&*tx)
1472                .await?;
1473
1474            let room = self.get_room(room_id, &tx).await?;
1475            Ok(room)
1476        })
1477        .await
1478    }
1479}