projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project {
  95                clock::ReplicaId::REMOTE_SERVER
  96            } else {
  97                clock::ReplicaId::LOCAL
  98            };
  99
 100            project_collaborator::ActiveModel {
 101                project_id: ActiveValue::set(project.id),
 102                connection_id: ActiveValue::set(connection.id as i32),
 103                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 104                user_id: ActiveValue::set(participant.user_id),
 105                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 106                is_host: ActiveValue::set(true),
 107                id: ActiveValue::NotSet,
 108                committer_name: ActiveValue::Set(None),
 109                committer_email: ActiveValue::Set(None),
 110            }
 111            .insert(&*tx)
 112            .await?;
 113
 114            let room = self.get_room(room_id, &tx).await?;
 115            Ok((project.id, room))
 116        })
 117        .await
 118    }
 119
 120    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 121        self.transaction(|tx| async move {
 122            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 123            Ok(())
 124        })
 125        .await
 126    }
 127
 128    /// Unshares the given project.
 129    pub async fn unshare_project(
 130        &self,
 131        project_id: ProjectId,
 132        connection: ConnectionId,
 133    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 134        self.project_transaction(project_id, |tx| async move {
 135            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 136            let project = project::Entity::find_by_id(project_id)
 137                .one(&*tx)
 138                .await?
 139                .context("project not found")?;
 140            let room = if let Some(room_id) = project.room_id {
 141                Some(self.get_room(room_id, &tx).await?)
 142            } else {
 143                None
 144            };
 145            if project.host_connection()? == connection {
 146                return Ok((true, room, guest_connection_ids));
 147            }
 148            Err(anyhow!("cannot unshare a project hosted by another user"))?
 149        })
 150        .await
 151    }
 152
 153    /// Updates the worktrees associated with the given project.
 154    pub async fn update_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        worktrees: &[proto::WorktreeMetadata],
 159    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let project = project::Entity::find_by_id(project_id)
 162                .filter(
 163                    Condition::all()
 164                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 165                        .add(
 166                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 167                        ),
 168                )
 169                .one(&*tx)
 170                .await?
 171                .context("no such project")?;
 172
 173            self.update_project_worktrees(project.id, worktrees, &tx)
 174                .await?;
 175
 176            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 177
 178            let room = if let Some(room_id) = project.room_id {
 179                Some(self.get_room(room_id, &tx).await?)
 180            } else {
 181                None
 182            };
 183
 184            Ok((room, guest_connection_ids))
 185        })
 186        .await
 187    }
 188
 189    pub(in crate::db) async fn update_project_worktrees(
 190        &self,
 191        project_id: ProjectId,
 192        worktrees: &[proto::WorktreeMetadata],
 193        tx: &DatabaseTransaction,
 194    ) -> Result<()> {
 195        if !worktrees.is_empty() {
 196            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 197                id: ActiveValue::set(worktree.id as i64),
 198                project_id: ActiveValue::set(project_id),
 199                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 200                root_name: ActiveValue::set(worktree.root_name.clone()),
 201                visible: ActiveValue::set(worktree.visible),
 202                scan_id: ActiveValue::set(0),
 203                completed_scan_id: ActiveValue::set(0),
 204            }))
 205            .on_conflict(
 206                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 207                    .update_column(worktree::Column::RootName)
 208                    .to_owned(),
 209            )
 210            .exec(tx)
 211            .await?;
 212        }
 213
 214        worktree::Entity::delete_many()
 215            .filter(worktree::Column::ProjectId.eq(project_id).and(
 216                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 217            ))
 218            .exec(tx)
 219            .await?;
 220
 221        Ok(())
 222    }
 223
 224    pub async fn update_worktree(
 225        &self,
 226        update: &proto::UpdateWorktree,
 227        connection: ConnectionId,
 228    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 229        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 230            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 231        {
 232            return Err(anyhow!(
 233                "invalid worktree update. removed entries: {}, updated entries: {}",
 234                update.removed_entries.len(),
 235                update.updated_entries.len()
 236            ))?;
 237        }
 238
 239        let project_id = ProjectId::from_proto(update.project_id);
 240        let worktree_id = update.worktree_id as i64;
 241        self.project_transaction(project_id, |tx| async move {
 242            // Ensure the update comes from the host.
 243            let _project = project::Entity::find_by_id(project_id)
 244                .filter(
 245                    Condition::all()
 246                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 247                        .add(
 248                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 249                        ),
 250                )
 251                .one(&*tx)
 252                .await?
 253                .with_context(|| format!("no such project: {project_id}"))?;
 254
 255            // Update metadata.
 256            worktree::Entity::update(worktree::ActiveModel {
 257                id: ActiveValue::set(worktree_id),
 258                project_id: ActiveValue::set(project_id),
 259                root_name: ActiveValue::set(update.root_name.clone()),
 260                scan_id: ActiveValue::set(update.scan_id as i64),
 261                completed_scan_id: if update.is_last_update {
 262                    ActiveValue::set(update.scan_id as i64)
 263                } else {
 264                    ActiveValue::default()
 265                },
 266                abs_path: ActiveValue::set(update.abs_path.clone()),
 267                ..Default::default()
 268            })
 269            .exec(&*tx)
 270            .await?;
 271
 272            if !update.updated_entries.is_empty() {
 273                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 274                    let mtime = entry.mtime.clone().unwrap_or_default();
 275                    worktree_entry::ActiveModel {
 276                        project_id: ActiveValue::set(project_id),
 277                        worktree_id: ActiveValue::set(worktree_id),
 278                        id: ActiveValue::set(entry.id as i64),
 279                        is_dir: ActiveValue::set(entry.is_dir),
 280                        path: ActiveValue::set(entry.path.clone()),
 281                        inode: ActiveValue::set(entry.inode as i64),
 282                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 283                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 284                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 285                        is_ignored: ActiveValue::set(entry.is_ignored),
 286                        git_status: ActiveValue::set(None),
 287                        is_external: ActiveValue::set(entry.is_external),
 288                        is_deleted: ActiveValue::set(false),
 289                        is_hidden: ActiveValue::set(entry.is_hidden),
 290                        scan_id: ActiveValue::set(update.scan_id as i64),
 291                        is_fifo: ActiveValue::set(entry.is_fifo),
 292                    }
 293                }))
 294                .on_conflict(
 295                    OnConflict::columns([
 296                        worktree_entry::Column::ProjectId,
 297                        worktree_entry::Column::WorktreeId,
 298                        worktree_entry::Column::Id,
 299                    ])
 300                    .update_columns([
 301                        worktree_entry::Column::IsDir,
 302                        worktree_entry::Column::Path,
 303                        worktree_entry::Column::Inode,
 304                        worktree_entry::Column::MtimeSeconds,
 305                        worktree_entry::Column::MtimeNanos,
 306                        worktree_entry::Column::CanonicalPath,
 307                        worktree_entry::Column::IsIgnored,
 308                        worktree_entry::Column::IsHidden,
 309                        worktree_entry::Column::ScanId,
 310                    ])
 311                    .to_owned(),
 312                )
 313                .exec(&*tx)
 314                .await?;
 315            }
 316
 317            if !update.removed_entries.is_empty() {
 318                worktree_entry::Entity::update_many()
 319                    .filter(
 320                        worktree_entry::Column::ProjectId
 321                            .eq(project_id)
 322                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 323                            .and(
 324                                worktree_entry::Column::Id
 325                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 326                            ),
 327                    )
 328                    .set(worktree_entry::ActiveModel {
 329                        is_deleted: ActiveValue::Set(true),
 330                        scan_id: ActiveValue::Set(update.scan_id as i64),
 331                        ..Default::default()
 332                    })
 333                    .exec(&*tx)
 334                    .await?;
 335            }
 336
 337            // Backward-compatibility for old Zed clients.
 338            //
 339            // Remove this block when Zed 1.80 stable has been out for a week.
 340            {
 341                if !update.updated_repositories.is_empty() {
 342                    project_repository::Entity::insert_many(
 343                        update.updated_repositories.iter().map(|repository| {
 344                            project_repository::ActiveModel {
 345                                project_id: ActiveValue::set(project_id),
 346                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 347                                id: ActiveValue::set(repository.repository_id as i64),
 348                                scan_id: ActiveValue::set(update.scan_id as i64),
 349                                is_deleted: ActiveValue::set(false),
 350                                branch_summary: ActiveValue::Set(
 351                                    repository
 352                                        .branch_summary
 353                                        .as_ref()
 354                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 355                                ),
 356                                current_merge_conflicts: ActiveValue::Set(Some(
 357                                    serde_json::to_string(&repository.current_merge_conflicts)
 358                                        .unwrap(),
 359                                )),
 360                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 361                                abs_path: ActiveValue::set(String::new()),
 362                                entry_ids: ActiveValue::set("[]".into()),
 363                                head_commit_details: ActiveValue::set(None),
 364                                merge_message: ActiveValue::set(None),
 365                                remote_upstream_url: ActiveValue::set(None),
 366                                remote_origin_url: ActiveValue::set(None),
 367                            }
 368                        }),
 369                    )
 370                    .on_conflict(
 371                        OnConflict::columns([
 372                            project_repository::Column::ProjectId,
 373                            project_repository::Column::Id,
 374                        ])
 375                        .update_columns([
 376                            project_repository::Column::ScanId,
 377                            project_repository::Column::BranchSummary,
 378                            project_repository::Column::CurrentMergeConflicts,
 379                        ])
 380                        .to_owned(),
 381                    )
 382                    .exec(&*tx)
 383                    .await?;
 384
 385                    let has_any_statuses = update
 386                        .updated_repositories
 387                        .iter()
 388                        .any(|repository| !repository.updated_statuses.is_empty());
 389
 390                    if has_any_statuses {
 391                        project_repository_statuses::Entity::insert_many(
 392                            update.updated_repositories.iter().flat_map(
 393                                |repository: &proto::RepositoryEntry| {
 394                                    repository.updated_statuses.iter().map(|status_entry| {
 395                                        let (repo_path, status_kind, first_status, second_status) =
 396                                            proto_status_to_db(status_entry.clone());
 397                                        project_repository_statuses::ActiveModel {
 398                                            project_id: ActiveValue::set(project_id),
 399                                            repository_id: ActiveValue::set(
 400                                                repository.repository_id as i64,
 401                                            ),
 402                                            scan_id: ActiveValue::set(update.scan_id as i64),
 403                                            is_deleted: ActiveValue::set(false),
 404                                            repo_path: ActiveValue::set(repo_path),
 405                                            status: ActiveValue::set(0),
 406                                            status_kind: ActiveValue::set(status_kind),
 407                                            first_status: ActiveValue::set(first_status),
 408                                            second_status: ActiveValue::set(second_status),
 409                                        }
 410                                    })
 411                                },
 412                            ),
 413                        )
 414                        .on_conflict(
 415                            OnConflict::columns([
 416                                project_repository_statuses::Column::ProjectId,
 417                                project_repository_statuses::Column::RepositoryId,
 418                                project_repository_statuses::Column::RepoPath,
 419                            ])
 420                            .update_columns([
 421                                project_repository_statuses::Column::ScanId,
 422                                project_repository_statuses::Column::StatusKind,
 423                                project_repository_statuses::Column::FirstStatus,
 424                                project_repository_statuses::Column::SecondStatus,
 425                            ])
 426                            .to_owned(),
 427                        )
 428                        .exec(&*tx)
 429                        .await?;
 430                    }
 431
 432                    for repo in &update.updated_repositories {
 433                        if !repo.removed_statuses.is_empty() {
 434                            project_repository_statuses::Entity::update_many()
 435                                .filter(
 436                                    project_repository_statuses::Column::ProjectId
 437                                        .eq(project_id)
 438                                        .and(
 439                                            project_repository_statuses::Column::RepositoryId
 440                                                .eq(repo.repository_id),
 441                                        )
 442                                        .and(
 443                                            project_repository_statuses::Column::RepoPath
 444                                                .is_in(repo.removed_statuses.iter()),
 445                                        ),
 446                                )
 447                                .set(project_repository_statuses::ActiveModel {
 448                                    is_deleted: ActiveValue::Set(true),
 449                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 450                                    ..Default::default()
 451                                })
 452                                .exec(&*tx)
 453                                .await?;
 454                        }
 455                    }
 456                }
 457
 458                if !update.removed_repositories.is_empty() {
 459                    project_repository::Entity::update_many()
 460                        .filter(
 461                            project_repository::Column::ProjectId
 462                                .eq(project_id)
 463                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 464                                .and(project_repository::Column::Id.is_in(
 465                                    update.removed_repositories.iter().map(|id| *id as i64),
 466                                )),
 467                        )
 468                        .set(project_repository::ActiveModel {
 469                            is_deleted: ActiveValue::Set(true),
 470                            scan_id: ActiveValue::Set(update.scan_id as i64),
 471                            ..Default::default()
 472                        })
 473                        .exec(&*tx)
 474                        .await?;
 475                }
 476            }
 477
 478            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 479            Ok(connection_ids)
 480        })
 481        .await
 482    }
 483
 484    pub async fn update_repository(
 485        &self,
 486        update: &proto::UpdateRepository,
 487        _connection: ConnectionId,
 488    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 489        let project_id = ProjectId::from_proto(update.project_id);
 490        let repository_id = update.id as i64;
 491        self.project_transaction(project_id, |tx| async move {
 492            project_repository::Entity::insert(project_repository::ActiveModel {
 493                project_id: ActiveValue::set(project_id),
 494                id: ActiveValue::set(repository_id),
 495                legacy_worktree_id: ActiveValue::set(None),
 496                abs_path: ActiveValue::set(update.abs_path.clone()),
 497                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 498                scan_id: ActiveValue::set(update.scan_id as i64),
 499                is_deleted: ActiveValue::set(false),
 500                branch_summary: ActiveValue::Set(
 501                    update
 502                        .branch_summary
 503                        .as_ref()
 504                        .map(|summary| serde_json::to_string(summary).unwrap()),
 505                ),
 506                head_commit_details: ActiveValue::Set(
 507                    update
 508                        .head_commit_details
 509                        .as_ref()
 510                        .map(|details| serde_json::to_string(details).unwrap()),
 511                ),
 512                current_merge_conflicts: ActiveValue::Set(Some(
 513                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 514                )),
 515                merge_message: ActiveValue::set(update.merge_message.clone()),
 516                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 517                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 518            })
 519            .on_conflict(
 520                OnConflict::columns([
 521                    project_repository::Column::ProjectId,
 522                    project_repository::Column::Id,
 523                ])
 524                .update_columns([
 525                    project_repository::Column::ScanId,
 526                    project_repository::Column::BranchSummary,
 527                    project_repository::Column::EntryIds,
 528                    project_repository::Column::AbsPath,
 529                    project_repository::Column::CurrentMergeConflicts,
 530                    project_repository::Column::HeadCommitDetails,
 531                    project_repository::Column::MergeMessage,
 532                ])
 533                .to_owned(),
 534            )
 535            .exec(&*tx)
 536            .await?;
 537
 538            let has_any_statuses = !update.updated_statuses.is_empty();
 539
 540            if has_any_statuses {
 541                project_repository_statuses::Entity::insert_many(
 542                    update.updated_statuses.iter().map(|status_entry| {
 543                        let (repo_path, status_kind, first_status, second_status) =
 544                            proto_status_to_db(status_entry.clone());
 545                        project_repository_statuses::ActiveModel {
 546                            project_id: ActiveValue::set(project_id),
 547                            repository_id: ActiveValue::set(repository_id),
 548                            scan_id: ActiveValue::set(update.scan_id as i64),
 549                            is_deleted: ActiveValue::set(false),
 550                            repo_path: ActiveValue::set(repo_path),
 551                            status: ActiveValue::set(0),
 552                            status_kind: ActiveValue::set(status_kind),
 553                            first_status: ActiveValue::set(first_status),
 554                            second_status: ActiveValue::set(second_status),
 555                        }
 556                    }),
 557                )
 558                .on_conflict(
 559                    OnConflict::columns([
 560                        project_repository_statuses::Column::ProjectId,
 561                        project_repository_statuses::Column::RepositoryId,
 562                        project_repository_statuses::Column::RepoPath,
 563                    ])
 564                    .update_columns([
 565                        project_repository_statuses::Column::ScanId,
 566                        project_repository_statuses::Column::StatusKind,
 567                        project_repository_statuses::Column::FirstStatus,
 568                        project_repository_statuses::Column::SecondStatus,
 569                    ])
 570                    .to_owned(),
 571                )
 572                .exec(&*tx)
 573                .await?;
 574            }
 575
 576            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 577
 578            if has_any_removed_statuses {
 579                project_repository_statuses::Entity::update_many()
 580                    .filter(
 581                        project_repository_statuses::Column::ProjectId
 582                            .eq(project_id)
 583                            .and(
 584                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 585                            )
 586                            .and(
 587                                project_repository_statuses::Column::RepoPath
 588                                    .is_in(update.removed_statuses.iter()),
 589                            ),
 590                    )
 591                    .set(project_repository_statuses::ActiveModel {
 592                        is_deleted: ActiveValue::Set(true),
 593                        scan_id: ActiveValue::Set(update.scan_id as i64),
 594                        ..Default::default()
 595                    })
 596                    .exec(&*tx)
 597                    .await?;
 598            }
 599
 600            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 601            Ok(connection_ids)
 602        })
 603        .await
 604    }
 605
 606    pub async fn remove_repository(
 607        &self,
 608        remove: &proto::RemoveRepository,
 609        _connection: ConnectionId,
 610    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 611        let project_id = ProjectId::from_proto(remove.project_id);
 612        let repository_id = remove.id as i64;
 613        self.project_transaction(project_id, |tx| async move {
 614            project_repository::Entity::update_many()
 615                .filter(
 616                    project_repository::Column::ProjectId
 617                        .eq(project_id)
 618                        .and(project_repository::Column::Id.eq(repository_id)),
 619                )
 620                .set(project_repository::ActiveModel {
 621                    is_deleted: ActiveValue::Set(true),
 622                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 623                    ..Default::default()
 624                })
 625                .exec(&*tx)
 626                .await?;
 627
 628            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 629            Ok(connection_ids)
 630        })
 631        .await
 632    }
 633
 634    /// Updates the diagnostic summary for the given connection.
 635    pub async fn update_diagnostic_summary(
 636        &self,
 637        update: &proto::UpdateDiagnosticSummary,
 638        connection: ConnectionId,
 639    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 640        let project_id = ProjectId::from_proto(update.project_id);
 641        let worktree_id = update.worktree_id as i64;
 642        self.project_transaction(project_id, |tx| async move {
 643            let summary = update.summary.as_ref().context("invalid summary")?;
 644
 645            // Ensure the update comes from the host.
 646            let project = project::Entity::find_by_id(project_id)
 647                .one(&*tx)
 648                .await?
 649                .context("no such project")?;
 650            if project.host_connection()? != connection {
 651                return Err(anyhow!("can't update a project hosted by someone else"))?;
 652            }
 653
 654            // Update summary.
 655            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 656                project_id: ActiveValue::set(project_id),
 657                worktree_id: ActiveValue::set(worktree_id),
 658                path: ActiveValue::set(summary.path.clone()),
 659                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 660                error_count: ActiveValue::set(summary.error_count as i32),
 661                warning_count: ActiveValue::set(summary.warning_count as i32),
 662            })
 663            .on_conflict(
 664                OnConflict::columns([
 665                    worktree_diagnostic_summary::Column::ProjectId,
 666                    worktree_diagnostic_summary::Column::WorktreeId,
 667                    worktree_diagnostic_summary::Column::Path,
 668                ])
 669                .update_columns([
 670                    worktree_diagnostic_summary::Column::LanguageServerId,
 671                    worktree_diagnostic_summary::Column::ErrorCount,
 672                    worktree_diagnostic_summary::Column::WarningCount,
 673                ])
 674                .to_owned(),
 675            )
 676            .exec(&*tx)
 677            .await?;
 678
 679            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 680            Ok(connection_ids)
 681        })
 682        .await
 683    }
 684
 685    /// Starts the language server for the given connection.
 686    pub async fn start_language_server(
 687        &self,
 688        update: &proto::StartLanguageServer,
 689        connection: ConnectionId,
 690    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 691        let project_id = ProjectId::from_proto(update.project_id);
 692        self.project_transaction(project_id, |tx| async move {
 693            let server = update.server.as_ref().context("invalid language server")?;
 694
 695            // Ensure the update comes from the host.
 696            let project = project::Entity::find_by_id(project_id)
 697                .one(&*tx)
 698                .await?
 699                .context("no such project")?;
 700            if project.host_connection()? != connection {
 701                return Err(anyhow!("can't update a project hosted by someone else"))?;
 702            }
 703
 704            // Add the newly-started language server.
 705            language_server::Entity::insert(language_server::ActiveModel {
 706                project_id: ActiveValue::set(project_id),
 707                id: ActiveValue::set(server.id as i64),
 708                name: ActiveValue::set(server.name.clone()),
 709                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 710                capabilities: ActiveValue::set(update.capabilities.clone()),
 711            })
 712            .on_conflict(
 713                OnConflict::columns([
 714                    language_server::Column::ProjectId,
 715                    language_server::Column::Id,
 716                ])
 717                .update_columns([
 718                    language_server::Column::Name,
 719                    language_server::Column::Capabilities,
 720                    language_server::Column::WorktreeId,
 721                ])
 722                .to_owned(),
 723            )
 724            .exec(&*tx)
 725            .await?;
 726
 727            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 728            Ok(connection_ids)
 729        })
 730        .await
 731    }
 732
 733    /// Updates the worktree settings for the given connection.
 734    pub async fn update_worktree_settings(
 735        &self,
 736        update: &proto::UpdateWorktreeSettings,
 737        connection: ConnectionId,
 738    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 739        let project_id = ProjectId::from_proto(update.project_id);
 740        let kind = match update.kind {
 741            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 742                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 743            None => proto::LocalSettingsKind::Settings,
 744        };
 745        let kind = LocalSettingsKind::from_proto(kind);
 746        self.project_transaction(project_id, |tx| async move {
 747            // Ensure the update comes from the host.
 748            let project = project::Entity::find_by_id(project_id)
 749                .one(&*tx)
 750                .await?
 751                .context("no such project")?;
 752            if project.host_connection()? != connection {
 753                return Err(anyhow!("can't update a project hosted by someone else"))?;
 754            }
 755
 756            if let Some(content) = &update.content {
 757                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 758                    project_id: ActiveValue::Set(project_id),
 759                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 760                    path: ActiveValue::Set(update.path.clone()),
 761                    content: ActiveValue::Set(content.clone()),
 762                    kind: ActiveValue::Set(kind),
 763                })
 764                .on_conflict(
 765                    OnConflict::columns([
 766                        worktree_settings_file::Column::ProjectId,
 767                        worktree_settings_file::Column::WorktreeId,
 768                        worktree_settings_file::Column::Path,
 769                    ])
 770                    .update_column(worktree_settings_file::Column::Content)
 771                    .to_owned(),
 772                )
 773                .exec(&*tx)
 774                .await?;
 775            } else {
 776                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 777                    project_id: ActiveValue::Set(project_id),
 778                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 779                    path: ActiveValue::Set(update.path.clone()),
 780                    ..Default::default()
 781                })
 782                .exec(&*tx)
 783                .await?;
 784            }
 785
 786            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 787            Ok(connection_ids)
 788        })
 789        .await
 790    }
 791
 792    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 793        self.transaction(|tx| async move {
 794            Ok(project::Entity::find_by_id(id)
 795                .one(&*tx)
 796                .await?
 797                .context("no such project")?)
 798        })
 799        .await
 800    }
 801
 802    /// Adds the given connection to the specified project
 803    /// in the current room.
 804    pub async fn join_project(
 805        &self,
 806        project_id: ProjectId,
 807        connection: ConnectionId,
 808        user_id: UserId,
 809        committer_name: Option<String>,
 810        committer_email: Option<String>,
 811    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 812        self.project_transaction(project_id, move |tx| {
 813            let committer_name = committer_name.clone();
 814            let committer_email = committer_email.clone();
 815            async move {
 816                let (project, role) = self
 817                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 818                    .await?;
 819                self.join_project_internal(
 820                    project,
 821                    user_id,
 822                    committer_name,
 823                    committer_email,
 824                    connection,
 825                    role,
 826                    &tx,
 827                )
 828                .await
 829            }
 830        })
 831        .await
 832    }
 833
 834    async fn join_project_internal(
 835        &self,
 836        project: project::Model,
 837        user_id: UserId,
 838        committer_name: Option<String>,
 839        committer_email: Option<String>,
 840        connection: ConnectionId,
 841        role: ChannelRole,
 842        tx: &DatabaseTransaction,
 843    ) -> Result<(Project, ReplicaId)> {
 844        let mut collaborators = project
 845            .find_related(project_collaborator::Entity)
 846            .all(tx)
 847            .await?;
 848        let replica_ids = collaborators
 849            .iter()
 850            .map(|c| c.replica_id)
 851            .collect::<HashSet<_>>();
 852        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 853        while replica_ids.contains(&replica_id) {
 854            replica_id.0 += 1;
 855        }
 856        let new_collaborator = project_collaborator::ActiveModel {
 857            project_id: ActiveValue::set(project.id),
 858            connection_id: ActiveValue::set(connection.id as i32),
 859            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 860            user_id: ActiveValue::set(user_id),
 861            replica_id: ActiveValue::set(replica_id),
 862            is_host: ActiveValue::set(false),
 863            id: ActiveValue::NotSet,
 864            committer_name: ActiveValue::set(committer_name),
 865            committer_email: ActiveValue::set(committer_email),
 866        }
 867        .insert(tx)
 868        .await?;
 869        collaborators.push(new_collaborator);
 870
 871        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 872        let mut worktrees = db_worktrees
 873            .into_iter()
 874            .map(|db_worktree| {
 875                (
 876                    db_worktree.id as u64,
 877                    Worktree {
 878                        id: db_worktree.id as u64,
 879                        abs_path: db_worktree.abs_path,
 880                        root_name: db_worktree.root_name,
 881                        visible: db_worktree.visible,
 882                        entries: Default::default(),
 883                        diagnostic_summaries: Default::default(),
 884                        settings_files: Default::default(),
 885                        scan_id: db_worktree.scan_id as u64,
 886                        completed_scan_id: db_worktree.completed_scan_id as u64,
 887                        legacy_repository_entries: Default::default(),
 888                    },
 889                )
 890            })
 891            .collect::<BTreeMap<_, _>>();
 892
 893        // Populate worktree entries.
 894        {
 895            let mut db_entries = worktree_entry::Entity::find()
 896                .filter(
 897                    Condition::all()
 898                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 899                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 900                )
 901                .stream(tx)
 902                .await?;
 903            while let Some(db_entry) = db_entries.next().await {
 904                let db_entry = db_entry?;
 905                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 906                    worktree.entries.push(proto::Entry {
 907                        id: db_entry.id as u64,
 908                        is_dir: db_entry.is_dir,
 909                        path: db_entry.path,
 910                        inode: db_entry.inode as u64,
 911                        mtime: Some(proto::Timestamp {
 912                            seconds: db_entry.mtime_seconds as u64,
 913                            nanos: db_entry.mtime_nanos as u32,
 914                        }),
 915                        canonical_path: db_entry.canonical_path,
 916                        is_ignored: db_entry.is_ignored,
 917                        is_external: db_entry.is_external,
 918                        is_hidden: db_entry.is_hidden,
 919                        // This is only used in the summarization backlog, so if it's None,
 920                        // that just means we won't be able to detect when to resummarize
 921                        // based on total number of backlogged bytes - instead, we'd go
 922                        // on number of files only. That shouldn't be a huge deal in practice.
 923                        size: None,
 924                        is_fifo: db_entry.is_fifo,
 925                    });
 926                }
 927            }
 928        }
 929
 930        // Populate repository entries.
 931        let mut repositories = Vec::new();
 932        {
 933            let db_repository_entries = project_repository::Entity::find()
 934                .filter(
 935                    Condition::all()
 936                        .add(project_repository::Column::ProjectId.eq(project.id))
 937                        .add(project_repository::Column::IsDeleted.eq(false)),
 938                )
 939                .all(tx)
 940                .await?;
 941            for db_repository_entry in db_repository_entries {
 942                let mut repository_statuses = project_repository_statuses::Entity::find()
 943                    .filter(
 944                        Condition::all()
 945                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 946                            .add(
 947                                project_repository_statuses::Column::RepositoryId
 948                                    .eq(db_repository_entry.id),
 949                            )
 950                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 951                    )
 952                    .stream(tx)
 953                    .await?;
 954                let mut updated_statuses = Vec::new();
 955                while let Some(status_entry) = repository_statuses.next().await {
 956                    let status_entry = status_entry?;
 957                    updated_statuses.push(db_status_to_proto(status_entry)?);
 958                }
 959
 960                let current_merge_conflicts = db_repository_entry
 961                    .current_merge_conflicts
 962                    .as_ref()
 963                    .map(|conflicts| serde_json::from_str(conflicts))
 964                    .transpose()?
 965                    .unwrap_or_default();
 966
 967                let branch_summary = db_repository_entry
 968                    .branch_summary
 969                    .as_ref()
 970                    .map(|branch_summary| serde_json::from_str(branch_summary))
 971                    .transpose()?
 972                    .unwrap_or_default();
 973
 974                let head_commit_details = db_repository_entry
 975                    .head_commit_details
 976                    .as_ref()
 977                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 978                    .transpose()?
 979                    .unwrap_or_default();
 980
 981                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 982                    .context("failed to deserialize repository's entry ids")?;
 983
 984                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 985                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 986                        worktree.legacy_repository_entries.insert(
 987                            db_repository_entry.id as u64,
 988                            proto::RepositoryEntry {
 989                                repository_id: db_repository_entry.id as u64,
 990                                updated_statuses,
 991                                removed_statuses: Vec::new(),
 992                                current_merge_conflicts,
 993                                branch_summary,
 994                            },
 995                        );
 996                    }
 997                } else {
 998                    repositories.push(proto::UpdateRepository {
 999                        project_id: db_repository_entry.project_id.0 as u64,
1000                        id: db_repository_entry.id as u64,
1001                        abs_path: db_repository_entry.abs_path,
1002                        entry_ids,
1003                        updated_statuses,
1004                        removed_statuses: Vec::new(),
1005                        current_merge_conflicts,
1006                        branch_summary,
1007                        head_commit_details,
1008                        scan_id: db_repository_entry.scan_id as u64,
1009                        is_last_update: true,
1010                        merge_message: db_repository_entry.merge_message,
1011                        stash_entries: Vec::new(),
1012                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
1013                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
1014                    });
1015                }
1016            }
1017        }
1018
1019        // Populate worktree diagnostic summaries.
1020        {
1021            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1022                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1023                .stream(tx)
1024                .await?;
1025            while let Some(db_summary) = db_summaries.next().await {
1026                let db_summary = db_summary?;
1027                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1028                    worktree
1029                        .diagnostic_summaries
1030                        .push(proto::DiagnosticSummary {
1031                            path: db_summary.path,
1032                            language_server_id: db_summary.language_server_id as u64,
1033                            error_count: db_summary.error_count as u32,
1034                            warning_count: db_summary.warning_count as u32,
1035                        });
1036                }
1037            }
1038        }
1039
1040        // Populate worktree settings files
1041        {
1042            let mut db_settings_files = worktree_settings_file::Entity::find()
1043                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1044                .stream(tx)
1045                .await?;
1046            while let Some(db_settings_file) = db_settings_files.next().await {
1047                let db_settings_file = db_settings_file?;
1048                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1049                    worktree.settings_files.push(WorktreeSettingsFile {
1050                        path: db_settings_file.path,
1051                        content: db_settings_file.content,
1052                        kind: db_settings_file.kind,
1053                    });
1054                }
1055            }
1056        }
1057
1058        // Populate language servers.
1059        let language_servers = project
1060            .find_related(language_server::Entity)
1061            .all(tx)
1062            .await?;
1063
1064        let path_style = if project.windows_paths {
1065            PathStyle::Windows
1066        } else {
1067            PathStyle::Posix
1068        };
1069
1070        let project = Project {
1071            id: project.id,
1072            role,
1073            collaborators: collaborators
1074                .into_iter()
1075                .map(|collaborator| ProjectCollaborator {
1076                    connection_id: collaborator.connection(),
1077                    user_id: collaborator.user_id,
1078                    replica_id: collaborator.replica_id,
1079                    is_host: collaborator.is_host,
1080                    committer_name: collaborator.committer_name,
1081                    committer_email: collaborator.committer_email,
1082                })
1083                .collect(),
1084            worktrees,
1085            repositories,
1086            language_servers: language_servers
1087                .into_iter()
1088                .map(|language_server| LanguageServer {
1089                    server: proto::LanguageServer {
1090                        id: language_server.id as u64,
1091                        name: language_server.name,
1092                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1093                    },
1094                    capabilities: language_server.capabilities,
1095                })
1096                .collect(),
1097            path_style,
1098        };
1099        Ok((project, replica_id as ReplicaId))
1100    }
1101
1102    /// Removes the given connection from the specified project.
1103    pub async fn leave_project(
1104        &self,
1105        project_id: ProjectId,
1106        connection: ConnectionId,
1107    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1108        self.project_transaction(project_id, |tx| async move {
1109            let result = project_collaborator::Entity::delete_many()
1110                .filter(
1111                    Condition::all()
1112                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1113                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1114                        .add(
1115                            project_collaborator::Column::ConnectionServerId
1116                                .eq(connection.owner_id as i32),
1117                        ),
1118                )
1119                .exec(&*tx)
1120                .await?;
1121            if result.rows_affected == 0 {
1122                Err(anyhow!("not a collaborator on this project"))?;
1123            }
1124
1125            let project = project::Entity::find_by_id(project_id)
1126                .one(&*tx)
1127                .await?
1128                .context("no such project")?;
1129            let collaborators = project
1130                .find_related(project_collaborator::Entity)
1131                .all(&*tx)
1132                .await?;
1133            let connection_ids: Vec<ConnectionId> = collaborators
1134                .into_iter()
1135                .map(|collaborator| collaborator.connection())
1136                .collect();
1137
1138            follower::Entity::delete_many()
1139                .filter(
1140                    Condition::any()
1141                        .add(
1142                            Condition::all()
1143                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1144                                .add(
1145                                    follower::Column::LeaderConnectionServerId
1146                                        .eq(connection.owner_id),
1147                                )
1148                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1149                        )
1150                        .add(
1151                            Condition::all()
1152                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1153                                .add(
1154                                    follower::Column::FollowerConnectionServerId
1155                                        .eq(connection.owner_id),
1156                                )
1157                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1158                        ),
1159                )
1160                .exec(&*tx)
1161                .await?;
1162
1163            let room = if let Some(room_id) = project.room_id {
1164                Some(self.get_room(room_id, &tx).await?)
1165            } else {
1166                None
1167            };
1168
1169            let left_project = LeftProject {
1170                id: project_id,
1171                should_unshare: connection == project.host_connection()?,
1172                connection_ids,
1173            };
1174            Ok((room, left_project))
1175        })
1176        .await
1177    }
1178
1179    pub async fn check_user_is_project_host(
1180        &self,
1181        project_id: ProjectId,
1182        connection_id: ConnectionId,
1183    ) -> Result<()> {
1184        self.project_transaction(project_id, |tx| async move {
1185            project::Entity::find()
1186                .filter(
1187                    Condition::all()
1188                        .add(project::Column::Id.eq(project_id))
1189                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1190                        .add(
1191                            project::Column::HostConnectionServerId
1192                                .eq(Some(connection_id.owner_id as i32)),
1193                        ),
1194                )
1195                .one(&*tx)
1196                .await?
1197                .context("failed to read project host")?;
1198
1199            Ok(())
1200        })
1201        .await
1202        .map(|guard| guard.into_inner())
1203    }
1204
1205    /// Returns the current project if the given user is authorized to access it with the specified capability.
1206    pub async fn access_project(
1207        &self,
1208        project_id: ProjectId,
1209        connection_id: ConnectionId,
1210        capability: Capability,
1211        tx: &DatabaseTransaction,
1212    ) -> Result<(project::Model, ChannelRole)> {
1213        let project = project::Entity::find_by_id(project_id)
1214            .one(tx)
1215            .await?
1216            .context("no such project")?;
1217
1218        let role_from_room = if let Some(room_id) = project.room_id {
1219            room_participant::Entity::find()
1220                .filter(room_participant::Column::RoomId.eq(room_id))
1221                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1222                .one(tx)
1223                .await?
1224                .and_then(|participant| participant.role)
1225        } else {
1226            None
1227        };
1228
1229        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1230
1231        match capability {
1232            Capability::ReadWrite => {
1233                if !role.can_edit_projects() {
1234                    return Err(anyhow!("not authorized to edit projects"))?;
1235                }
1236            }
1237            Capability::ReadOnly => {
1238                if !role.can_read_projects() {
1239                    return Err(anyhow!("not authorized to read projects"))?;
1240                }
1241            }
1242        }
1243
1244        Ok((project, role))
1245    }
1246
1247    /// Returns the host connection for a read-only request to join a shared project.
1248    pub async fn host_for_read_only_project_request(
1249        &self,
1250        project_id: ProjectId,
1251        connection_id: ConnectionId,
1252    ) -> Result<ConnectionId> {
1253        self.project_transaction(project_id, |tx| async move {
1254            let (project, _) = self
1255                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1256                .await?;
1257            project.host_connection()
1258        })
1259        .await
1260        .map(|guard| guard.into_inner())
1261    }
1262
1263    /// Returns the host connection for a request to join a shared project.
1264    pub async fn host_for_mutating_project_request(
1265        &self,
1266        project_id: ProjectId,
1267        connection_id: ConnectionId,
1268    ) -> Result<ConnectionId> {
1269        self.project_transaction(project_id, |tx| async move {
1270            let (project, _) = self
1271                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1272                .await?;
1273            project.host_connection()
1274        })
1275        .await
1276        .map(|guard| guard.into_inner())
1277    }
1278
1279    pub async fn connections_for_buffer_update(
1280        &self,
1281        project_id: ProjectId,
1282        connection_id: ConnectionId,
1283        capability: Capability,
1284    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1285        self.project_transaction(project_id, |tx| async move {
1286            // Authorize
1287            let (project, _) = self
1288                .access_project(project_id, connection_id, capability, &tx)
1289                .await?;
1290
1291            let host_connection_id = project.host_connection()?;
1292
1293            let collaborators = project_collaborator::Entity::find()
1294                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1295                .all(&*tx)
1296                .await?;
1297
1298            let guest_connection_ids = collaborators
1299                .into_iter()
1300                .filter_map(|collaborator| {
1301                    if collaborator.is_host {
1302                        None
1303                    } else {
1304                        Some(collaborator.connection())
1305                    }
1306                })
1307                .collect();
1308
1309            Ok((host_connection_id, guest_connection_ids))
1310        })
1311        .await
1312    }
1313
1314    /// Returns the connection IDs in the given project.
1315    ///
1316    /// The provided `connection_id` must also be a collaborator in the project,
1317    /// otherwise an error will be returned.
1318    pub async fn project_connection_ids(
1319        &self,
1320        project_id: ProjectId,
1321        connection_id: ConnectionId,
1322        exclude_dev_server: bool,
1323    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1324        self.project_transaction(project_id, |tx| async move {
1325            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1326                .await
1327        })
1328        .await
1329    }
1330
1331    async fn internal_project_connection_ids(
1332        &self,
1333        project_id: ProjectId,
1334        connection_id: ConnectionId,
1335        exclude_dev_server: bool,
1336        tx: &DatabaseTransaction,
1337    ) -> Result<HashSet<ConnectionId>> {
1338        let project = project::Entity::find_by_id(project_id)
1339            .one(tx)
1340            .await?
1341            .context("no such project")?;
1342
1343        let mut collaborators = project_collaborator::Entity::find()
1344            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1345            .stream(tx)
1346            .await?;
1347
1348        let mut connection_ids = HashSet::default();
1349        if let Some(host_connection) = project.host_connection().log_err()
1350            && !exclude_dev_server
1351        {
1352            connection_ids.insert(host_connection);
1353        }
1354
1355        while let Some(collaborator) = collaborators.next().await {
1356            let collaborator = collaborator?;
1357            connection_ids.insert(collaborator.connection());
1358        }
1359
1360        if connection_ids.contains(&connection_id)
1361            || Some(connection_id) == project.host_connection().ok()
1362        {
1363            Ok(connection_ids)
1364        } else {
1365            Err(anyhow!(
1366                "can only send project updates to a project you're in"
1367            ))?
1368        }
1369    }
1370
1371    async fn project_guest_connection_ids(
1372        &self,
1373        project_id: ProjectId,
1374        tx: &DatabaseTransaction,
1375    ) -> Result<Vec<ConnectionId>> {
1376        let mut collaborators = project_collaborator::Entity::find()
1377            .filter(
1378                project_collaborator::Column::ProjectId
1379                    .eq(project_id)
1380                    .and(project_collaborator::Column::IsHost.eq(false)),
1381            )
1382            .stream(tx)
1383            .await?;
1384
1385        let mut guest_connection_ids = Vec::new();
1386        while let Some(collaborator) = collaborators.next().await {
1387            let collaborator = collaborator?;
1388            guest_connection_ids.push(collaborator.connection());
1389        }
1390        Ok(guest_connection_ids)
1391    }
1392
1393    /// Returns the [`RoomId`] for the given project.
1394    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1395        self.transaction(|tx| async move {
1396            Ok(project::Entity::find_by_id(project_id)
1397                .one(&*tx)
1398                .await?
1399                .and_then(|project| project.room_id))
1400        })
1401        .await
1402    }
1403
1404    pub async fn check_room_participants(
1405        &self,
1406        room_id: RoomId,
1407        leader_id: ConnectionId,
1408        follower_id: ConnectionId,
1409    ) -> Result<()> {
1410        self.transaction(|tx| async move {
1411            use room_participant::Column;
1412
1413            let count = room_participant::Entity::find()
1414                .filter(
1415                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1416                        Condition::any()
1417                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1418                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1419                            ))
1420                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1421                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1422                            )),
1423                    ),
1424                )
1425                .count(&*tx)
1426                .await?;
1427
1428            if count < 2 {
1429                Err(anyhow!("not room participants"))?;
1430            }
1431
1432            Ok(())
1433        })
1434        .await
1435    }
1436
1437    /// Adds the given follower connection as a follower of the given leader connection.
1438    pub async fn follow(
1439        &self,
1440        room_id: RoomId,
1441        project_id: ProjectId,
1442        leader_connection: ConnectionId,
1443        follower_connection: ConnectionId,
1444    ) -> Result<TransactionGuard<proto::Room>> {
1445        self.room_transaction(room_id, |tx| async move {
1446            follower::ActiveModel {
1447                room_id: ActiveValue::set(room_id),
1448                project_id: ActiveValue::set(project_id),
1449                leader_connection_server_id: ActiveValue::set(ServerId(
1450                    leader_connection.owner_id as i32,
1451                )),
1452                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1453                follower_connection_server_id: ActiveValue::set(ServerId(
1454                    follower_connection.owner_id as i32,
1455                )),
1456                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1457                ..Default::default()
1458            }
1459            .insert(&*tx)
1460            .await?;
1461
1462            let room = self.get_room(room_id, &tx).await?;
1463            Ok(room)
1464        })
1465        .await
1466    }
1467
1468    /// Removes the given follower connection as a follower of the given leader connection.
1469    pub async fn unfollow(
1470        &self,
1471        room_id: RoomId,
1472        project_id: ProjectId,
1473        leader_connection: ConnectionId,
1474        follower_connection: ConnectionId,
1475    ) -> Result<TransactionGuard<proto::Room>> {
1476        self.room_transaction(room_id, |tx| async move {
1477            follower::Entity::delete_many()
1478                .filter(
1479                    Condition::all()
1480                        .add(follower::Column::RoomId.eq(room_id))
1481                        .add(follower::Column::ProjectId.eq(project_id))
1482                        .add(
1483                            follower::Column::LeaderConnectionServerId
1484                                .eq(leader_connection.owner_id),
1485                        )
1486                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1487                        .add(
1488                            follower::Column::FollowerConnectionServerId
1489                                .eq(follower_connection.owner_id),
1490                        )
1491                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1492                )
1493                .exec(&*tx)
1494                .await?;
1495
1496            let room = self.get_room(room_id, &tx).await?;
1497            Ok(room)
1498        })
1499        .await
1500    }
1501}