projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36        windows_paths: bool,
  37    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  38        self.room_transaction(room_id, |tx| async move {
  39            let participant = room_participant::Entity::find()
  40                .filter(
  41                    Condition::all()
  42                        .add(
  43                            room_participant::Column::AnsweringConnectionId
  44                                .eq(connection.id as i32),
  45                        )
  46                        .add(
  47                            room_participant::Column::AnsweringConnectionServerId
  48                                .eq(connection.owner_id as i32),
  49                        ),
  50                )
  51                .one(&*tx)
  52                .await?
  53                .context("could not find participant")?;
  54            if participant.room_id != room_id {
  55                return Err(anyhow!("shared project on unexpected room"))?;
  56            }
  57            if !participant
  58                .role
  59                .unwrap_or(ChannelRole::Member)
  60                .can_edit_projects()
  61            {
  62                return Err(anyhow!("guests cannot share projects"))?;
  63            }
  64
  65            let project = project::ActiveModel {
  66                room_id: ActiveValue::set(Some(participant.room_id)),
  67                host_user_id: ActiveValue::set(Some(participant.user_id)),
  68                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  69                host_connection_server_id: ActiveValue::set(Some(ServerId(
  70                    connection.owner_id as i32,
  71                ))),
  72                id: ActiveValue::NotSet,
  73                windows_paths: ActiveValue::set(windows_paths),
  74            }
  75            .insert(&*tx)
  76            .await?;
  77
  78            if !worktrees.is_empty() {
  79                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  80                    worktree::ActiveModel {
  81                        id: ActiveValue::set(worktree.id as i64),
  82                        project_id: ActiveValue::set(project.id),
  83                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  84                        root_name: ActiveValue::set(worktree.root_name.clone()),
  85                        visible: ActiveValue::set(worktree.visible),
  86                        scan_id: ActiveValue::set(0),
  87                        completed_scan_id: ActiveValue::set(0),
  88                    }
  89                }))
  90                .exec(&*tx)
  91                .await?;
  92            }
  93
  94            let replica_id = if is_ssh_project {
  95                clock::ReplicaId::REMOTE_SERVER
  96            } else {
  97                clock::ReplicaId::LOCAL
  98            };
  99
 100            project_collaborator::ActiveModel {
 101                project_id: ActiveValue::set(project.id),
 102                connection_id: ActiveValue::set(connection.id as i32),
 103                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 104                user_id: ActiveValue::set(participant.user_id),
 105                replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
 106                is_host: ActiveValue::set(true),
 107                id: ActiveValue::NotSet,
 108                committer_name: ActiveValue::Set(None),
 109                committer_email: ActiveValue::Set(None),
 110            }
 111            .insert(&*tx)
 112            .await?;
 113
 114            let room = self.get_room(room_id, &tx).await?;
 115            Ok((project.id, room))
 116        })
 117        .await
 118    }
 119
 120    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 121        self.transaction(|tx| async move {
 122            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 123            Ok(())
 124        })
 125        .await
 126    }
 127
 128    /// Unshares the given project.
 129    pub async fn unshare_project(
 130        &self,
 131        project_id: ProjectId,
 132        connection: ConnectionId,
 133    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 134        self.project_transaction(project_id, |tx| async move {
 135            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 136            let project = project::Entity::find_by_id(project_id)
 137                .one(&*tx)
 138                .await?
 139                .context("project not found")?;
 140            let room = if let Some(room_id) = project.room_id {
 141                Some(self.get_room(room_id, &tx).await?)
 142            } else {
 143                None
 144            };
 145            if project.host_connection()? == connection {
 146                return Ok((true, room, guest_connection_ids));
 147            }
 148            Err(anyhow!("cannot unshare a project hosted by another user"))?
 149        })
 150        .await
 151    }
 152
 153    /// Updates the worktrees associated with the given project.
 154    pub async fn update_project(
 155        &self,
 156        project_id: ProjectId,
 157        connection: ConnectionId,
 158        worktrees: &[proto::WorktreeMetadata],
 159    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 160        self.project_transaction(project_id, |tx| async move {
 161            let project = project::Entity::find_by_id(project_id)
 162                .filter(
 163                    Condition::all()
 164                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 165                        .add(
 166                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 167                        ),
 168                )
 169                .one(&*tx)
 170                .await?
 171                .context("no such project")?;
 172
 173            self.update_project_worktrees(project.id, worktrees, &tx)
 174                .await?;
 175
 176            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 177
 178            let room = if let Some(room_id) = project.room_id {
 179                Some(self.get_room(room_id, &tx).await?)
 180            } else {
 181                None
 182            };
 183
 184            Ok((room, guest_connection_ids))
 185        })
 186        .await
 187    }
 188
 189    pub(in crate::db) async fn update_project_worktrees(
 190        &self,
 191        project_id: ProjectId,
 192        worktrees: &[proto::WorktreeMetadata],
 193        tx: &DatabaseTransaction,
 194    ) -> Result<()> {
 195        if !worktrees.is_empty() {
 196            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 197                id: ActiveValue::set(worktree.id as i64),
 198                project_id: ActiveValue::set(project_id),
 199                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 200                root_name: ActiveValue::set(worktree.root_name.clone()),
 201                visible: ActiveValue::set(worktree.visible),
 202                scan_id: ActiveValue::set(0),
 203                completed_scan_id: ActiveValue::set(0),
 204            }))
 205            .on_conflict(
 206                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 207                    .update_column(worktree::Column::RootName)
 208                    .to_owned(),
 209            )
 210            .exec(tx)
 211            .await?;
 212        }
 213
 214        worktree::Entity::delete_many()
 215            .filter(worktree::Column::ProjectId.eq(project_id).and(
 216                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 217            ))
 218            .exec(tx)
 219            .await?;
 220
 221        Ok(())
 222    }
 223
 224    pub async fn update_worktree(
 225        &self,
 226        update: &proto::UpdateWorktree,
 227        connection: ConnectionId,
 228    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 229        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 230            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 231        {
 232            return Err(anyhow!(
 233                "invalid worktree update. removed entries: {}, updated entries: {}",
 234                update.removed_entries.len(),
 235                update.updated_entries.len()
 236            ))?;
 237        }
 238
 239        let project_id = ProjectId::from_proto(update.project_id);
 240        let worktree_id = update.worktree_id as i64;
 241        self.project_transaction(project_id, |tx| async move {
 242            // Ensure the update comes from the host.
 243            let _project = project::Entity::find_by_id(project_id)
 244                .filter(
 245                    Condition::all()
 246                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 247                        .add(
 248                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 249                        ),
 250                )
 251                .one(&*tx)
 252                .await?
 253                .with_context(|| format!("no such project: {project_id}"))?;
 254
 255            // Update metadata.
 256            worktree::Entity::update(worktree::ActiveModel {
 257                id: ActiveValue::set(worktree_id),
 258                project_id: ActiveValue::set(project_id),
 259                root_name: ActiveValue::set(update.root_name.clone()),
 260                scan_id: ActiveValue::set(update.scan_id as i64),
 261                completed_scan_id: if update.is_last_update {
 262                    ActiveValue::set(update.scan_id as i64)
 263                } else {
 264                    ActiveValue::default()
 265                },
 266                abs_path: ActiveValue::set(update.abs_path.clone()),
 267                ..Default::default()
 268            })
 269            .exec(&*tx)
 270            .await?;
 271
 272            if !update.updated_entries.is_empty() {
 273                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 274                    let mtime = entry.mtime.clone().unwrap_or_default();
 275                    worktree_entry::ActiveModel {
 276                        project_id: ActiveValue::set(project_id),
 277                        worktree_id: ActiveValue::set(worktree_id),
 278                        id: ActiveValue::set(entry.id as i64),
 279                        is_dir: ActiveValue::set(entry.is_dir),
 280                        path: ActiveValue::set(entry.path.clone()),
 281                        inode: ActiveValue::set(entry.inode as i64),
 282                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 283                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 284                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 285                        is_ignored: ActiveValue::set(entry.is_ignored),
 286                        git_status: ActiveValue::set(None),
 287                        is_external: ActiveValue::set(entry.is_external),
 288                        is_deleted: ActiveValue::set(false),
 289                        is_hidden: ActiveValue::set(entry.is_hidden),
 290                        scan_id: ActiveValue::set(update.scan_id as i64),
 291                        is_fifo: ActiveValue::set(entry.is_fifo),
 292                    }
 293                }))
 294                .on_conflict(
 295                    OnConflict::columns([
 296                        worktree_entry::Column::ProjectId,
 297                        worktree_entry::Column::WorktreeId,
 298                        worktree_entry::Column::Id,
 299                    ])
 300                    .update_columns([
 301                        worktree_entry::Column::IsDir,
 302                        worktree_entry::Column::Path,
 303                        worktree_entry::Column::Inode,
 304                        worktree_entry::Column::MtimeSeconds,
 305                        worktree_entry::Column::MtimeNanos,
 306                        worktree_entry::Column::CanonicalPath,
 307                        worktree_entry::Column::IsIgnored,
 308                        worktree_entry::Column::IsHidden,
 309                        worktree_entry::Column::ScanId,
 310                    ])
 311                    .to_owned(),
 312                )
 313                .exec(&*tx)
 314                .await?;
 315            }
 316
 317            if !update.removed_entries.is_empty() {
 318                worktree_entry::Entity::update_many()
 319                    .filter(
 320                        worktree_entry::Column::ProjectId
 321                            .eq(project_id)
 322                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 323                            .and(
 324                                worktree_entry::Column::Id
 325                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 326                            ),
 327                    )
 328                    .set(worktree_entry::ActiveModel {
 329                        is_deleted: ActiveValue::Set(true),
 330                        scan_id: ActiveValue::Set(update.scan_id as i64),
 331                        ..Default::default()
 332                    })
 333                    .exec(&*tx)
 334                    .await?;
 335            }
 336
 337            // Backward-compatibility for old Zed clients.
 338            //
 339            // Remove this block when Zed 1.80 stable has been out for a week.
 340            {
 341                if !update.updated_repositories.is_empty() {
 342                    project_repository::Entity::insert_many(
 343                        update.updated_repositories.iter().map(|repository| {
 344                            project_repository::ActiveModel {
 345                                project_id: ActiveValue::set(project_id),
 346                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 347                                id: ActiveValue::set(repository.repository_id as i64),
 348                                scan_id: ActiveValue::set(update.scan_id as i64),
 349                                is_deleted: ActiveValue::set(false),
 350                                branch_summary: ActiveValue::Set(
 351                                    repository
 352                                        .branch_summary
 353                                        .as_ref()
 354                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 355                                ),
 356                                current_merge_conflicts: ActiveValue::Set(Some(
 357                                    serde_json::to_string(&repository.current_merge_conflicts)
 358                                        .unwrap(),
 359                                )),
 360                                // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
 361                                abs_path: ActiveValue::set(String::new()),
 362                                entry_ids: ActiveValue::set("[]".into()),
 363                                head_commit_details: ActiveValue::set(None),
 364                                merge_message: ActiveValue::set(None),
 365                                remote_upstream_url: ActiveValue::set(None),
 366                                remote_origin_url: ActiveValue::set(None),
 367                            }
 368                        }),
 369                    )
 370                    .on_conflict(
 371                        OnConflict::columns([
 372                            project_repository::Column::ProjectId,
 373                            project_repository::Column::Id,
 374                        ])
 375                        .update_columns([
 376                            project_repository::Column::ScanId,
 377                            project_repository::Column::BranchSummary,
 378                            project_repository::Column::CurrentMergeConflicts,
 379                        ])
 380                        .to_owned(),
 381                    )
 382                    .exec(&*tx)
 383                    .await?;
 384
 385                    let has_any_statuses = update
 386                        .updated_repositories
 387                        .iter()
 388                        .any(|repository| !repository.updated_statuses.is_empty());
 389
 390                    if has_any_statuses {
 391                        project_repository_statuses::Entity::insert_many(
 392                            update.updated_repositories.iter().flat_map(
 393                                |repository: &proto::RepositoryEntry| {
 394                                    repository.updated_statuses.iter().map(|status_entry| {
 395                                        let (repo_path, status_kind, first_status, second_status) =
 396                                            proto_status_to_db(status_entry.clone());
 397                                        project_repository_statuses::ActiveModel {
 398                                            project_id: ActiveValue::set(project_id),
 399                                            repository_id: ActiveValue::set(
 400                                                repository.repository_id as i64,
 401                                            ),
 402                                            scan_id: ActiveValue::set(update.scan_id as i64),
 403                                            is_deleted: ActiveValue::set(false),
 404                                            repo_path: ActiveValue::set(repo_path),
 405                                            status: ActiveValue::set(0),
 406                                            status_kind: ActiveValue::set(status_kind),
 407                                            first_status: ActiveValue::set(first_status),
 408                                            second_status: ActiveValue::set(second_status),
 409                                        }
 410                                    })
 411                                },
 412                            ),
 413                        )
 414                        .on_conflict(
 415                            OnConflict::columns([
 416                                project_repository_statuses::Column::ProjectId,
 417                                project_repository_statuses::Column::RepositoryId,
 418                                project_repository_statuses::Column::RepoPath,
 419                            ])
 420                            .update_columns([
 421                                project_repository_statuses::Column::ScanId,
 422                                project_repository_statuses::Column::StatusKind,
 423                                project_repository_statuses::Column::FirstStatus,
 424                                project_repository_statuses::Column::SecondStatus,
 425                            ])
 426                            .to_owned(),
 427                        )
 428                        .exec(&*tx)
 429                        .await?;
 430                    }
 431
 432                    for repo in &update.updated_repositories {
 433                        if !repo.removed_statuses.is_empty() {
 434                            project_repository_statuses::Entity::update_many()
 435                                .filter(
 436                                    project_repository_statuses::Column::ProjectId
 437                                        .eq(project_id)
 438                                        .and(
 439                                            project_repository_statuses::Column::RepositoryId
 440                                                .eq(repo.repository_id),
 441                                        )
 442                                        .and(
 443                                            project_repository_statuses::Column::RepoPath
 444                                                .is_in(repo.removed_statuses.iter()),
 445                                        ),
 446                                )
 447                                .set(project_repository_statuses::ActiveModel {
 448                                    is_deleted: ActiveValue::Set(true),
 449                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 450                                    ..Default::default()
 451                                })
 452                                .exec(&*tx)
 453                                .await?;
 454                        }
 455                    }
 456                }
 457
 458                if !update.removed_repositories.is_empty() {
 459                    project_repository::Entity::update_many()
 460                        .filter(
 461                            project_repository::Column::ProjectId
 462                                .eq(project_id)
 463                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 464                                .and(project_repository::Column::Id.is_in(
 465                                    update.removed_repositories.iter().map(|id| *id as i64),
 466                                )),
 467                        )
 468                        .set(project_repository::ActiveModel {
 469                            is_deleted: ActiveValue::Set(true),
 470                            scan_id: ActiveValue::Set(update.scan_id as i64),
 471                            ..Default::default()
 472                        })
 473                        .exec(&*tx)
 474                        .await?;
 475                }
 476            }
 477
 478            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 479            Ok(connection_ids)
 480        })
 481        .await
 482    }
 483
 484    pub async fn update_repository(
 485        &self,
 486        update: &proto::UpdateRepository,
 487        _connection: ConnectionId,
 488    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 489        let project_id = ProjectId::from_proto(update.project_id);
 490        let repository_id = update.id as i64;
 491        self.project_transaction(project_id, |tx| async move {
 492            project_repository::Entity::insert(project_repository::ActiveModel {
 493                project_id: ActiveValue::set(project_id),
 494                id: ActiveValue::set(repository_id),
 495                legacy_worktree_id: ActiveValue::set(None),
 496                abs_path: ActiveValue::set(update.abs_path.clone()),
 497                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 498                scan_id: ActiveValue::set(update.scan_id as i64),
 499                is_deleted: ActiveValue::set(false),
 500                branch_summary: ActiveValue::Set(
 501                    update
 502                        .branch_summary
 503                        .as_ref()
 504                        .map(|summary| serde_json::to_string(summary).unwrap()),
 505                ),
 506                head_commit_details: ActiveValue::Set(
 507                    update
 508                        .head_commit_details
 509                        .as_ref()
 510                        .map(|details| serde_json::to_string(details).unwrap()),
 511                ),
 512                current_merge_conflicts: ActiveValue::Set(Some(
 513                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 514                )),
 515                merge_message: ActiveValue::set(update.merge_message.clone()),
 516                remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
 517                remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
 518            })
 519            .on_conflict(
 520                OnConflict::columns([
 521                    project_repository::Column::ProjectId,
 522                    project_repository::Column::Id,
 523                ])
 524                .update_columns([
 525                    project_repository::Column::ScanId,
 526                    project_repository::Column::BranchSummary,
 527                    project_repository::Column::EntryIds,
 528                    project_repository::Column::AbsPath,
 529                    project_repository::Column::CurrentMergeConflicts,
 530                    project_repository::Column::HeadCommitDetails,
 531                    project_repository::Column::MergeMessage,
 532                ])
 533                .to_owned(),
 534            )
 535            .exec(&*tx)
 536            .await?;
 537
 538            let has_any_statuses = !update.updated_statuses.is_empty();
 539
 540            if has_any_statuses {
 541                project_repository_statuses::Entity::insert_many(
 542                    update.updated_statuses.iter().map(|status_entry| {
 543                        let (repo_path, status_kind, first_status, second_status) =
 544                            proto_status_to_db(status_entry.clone());
 545                        project_repository_statuses::ActiveModel {
 546                            project_id: ActiveValue::set(project_id),
 547                            repository_id: ActiveValue::set(repository_id),
 548                            scan_id: ActiveValue::set(update.scan_id as i64),
 549                            is_deleted: ActiveValue::set(false),
 550                            repo_path: ActiveValue::set(repo_path),
 551                            status: ActiveValue::set(0),
 552                            status_kind: ActiveValue::set(status_kind),
 553                            first_status: ActiveValue::set(first_status),
 554                            second_status: ActiveValue::set(second_status),
 555                        }
 556                    }),
 557                )
 558                .on_conflict(
 559                    OnConflict::columns([
 560                        project_repository_statuses::Column::ProjectId,
 561                        project_repository_statuses::Column::RepositoryId,
 562                        project_repository_statuses::Column::RepoPath,
 563                    ])
 564                    .update_columns([
 565                        project_repository_statuses::Column::ScanId,
 566                        project_repository_statuses::Column::StatusKind,
 567                        project_repository_statuses::Column::FirstStatus,
 568                        project_repository_statuses::Column::SecondStatus,
 569                    ])
 570                    .to_owned(),
 571                )
 572                .exec(&*tx)
 573                .await?;
 574            }
 575
 576            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 577
 578            if has_any_removed_statuses {
 579                project_repository_statuses::Entity::update_many()
 580                    .filter(
 581                        project_repository_statuses::Column::ProjectId
 582                            .eq(project_id)
 583                            .and(
 584                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 585                            )
 586                            .and(
 587                                project_repository_statuses::Column::RepoPath
 588                                    .is_in(update.removed_statuses.iter()),
 589                            ),
 590                    )
 591                    .set(project_repository_statuses::ActiveModel {
 592                        is_deleted: ActiveValue::Set(true),
 593                        scan_id: ActiveValue::Set(update.scan_id as i64),
 594                        ..Default::default()
 595                    })
 596                    .exec(&*tx)
 597                    .await?;
 598            }
 599
 600            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 601            Ok(connection_ids)
 602        })
 603        .await
 604    }
 605
 606    pub async fn remove_repository(
 607        &self,
 608        remove: &proto::RemoveRepository,
 609        _connection: ConnectionId,
 610    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 611        let project_id = ProjectId::from_proto(remove.project_id);
 612        let repository_id = remove.id as i64;
 613        self.project_transaction(project_id, |tx| async move {
 614            project_repository::Entity::update_many()
 615                .filter(
 616                    project_repository::Column::ProjectId
 617                        .eq(project_id)
 618                        .and(project_repository::Column::Id.eq(repository_id)),
 619                )
 620                .set(project_repository::ActiveModel {
 621                    is_deleted: ActiveValue::Set(true),
 622                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 623                    ..Default::default()
 624                })
 625                .exec(&*tx)
 626                .await?;
 627
 628            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 629            Ok(connection_ids)
 630        })
 631        .await
 632    }
 633
 634    /// Updates the diagnostic summary for the given connection.
 635    pub async fn update_diagnostic_summary(
 636        &self,
 637        update: &proto::UpdateDiagnosticSummary,
 638        connection: ConnectionId,
 639    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 640        let project_id = ProjectId::from_proto(update.project_id);
 641        let worktree_id = update.worktree_id as i64;
 642        self.project_transaction(project_id, |tx| async move {
 643            let summary = update.summary.as_ref().context("invalid summary")?;
 644
 645            // Ensure the update comes from the host.
 646            let project = project::Entity::find_by_id(project_id)
 647                .one(&*tx)
 648                .await?
 649                .context("no such project")?;
 650            if project.host_connection()? != connection {
 651                return Err(anyhow!("can't update a project hosted by someone else"))?;
 652            }
 653
 654            // Update summary.
 655            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 656                project_id: ActiveValue::set(project_id),
 657                worktree_id: ActiveValue::set(worktree_id),
 658                path: ActiveValue::set(summary.path.clone()),
 659                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 660                error_count: ActiveValue::set(summary.error_count as i32),
 661                warning_count: ActiveValue::set(summary.warning_count as i32),
 662            })
 663            .on_conflict(
 664                OnConflict::columns([
 665                    worktree_diagnostic_summary::Column::ProjectId,
 666                    worktree_diagnostic_summary::Column::WorktreeId,
 667                    worktree_diagnostic_summary::Column::Path,
 668                ])
 669                .update_columns([
 670                    worktree_diagnostic_summary::Column::LanguageServerId,
 671                    worktree_diagnostic_summary::Column::ErrorCount,
 672                    worktree_diagnostic_summary::Column::WarningCount,
 673                ])
 674                .to_owned(),
 675            )
 676            .exec(&*tx)
 677            .await?;
 678
 679            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 680            Ok(connection_ids)
 681        })
 682        .await
 683    }
 684
 685    /// Starts the language server for the given connection.
 686    pub async fn start_language_server(
 687        &self,
 688        update: &proto::StartLanguageServer,
 689        connection: ConnectionId,
 690    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 691        let project_id = ProjectId::from_proto(update.project_id);
 692        self.project_transaction(project_id, |tx| async move {
 693            let server = update.server.as_ref().context("invalid language server")?;
 694
 695            // Ensure the update comes from the host.
 696            let project = project::Entity::find_by_id(project_id)
 697                .one(&*tx)
 698                .await?
 699                .context("no such project")?;
 700            if project.host_connection()? != connection {
 701                return Err(anyhow!("can't update a project hosted by someone else"))?;
 702            }
 703
 704            // Add the newly-started language server.
 705            language_server::Entity::insert(language_server::ActiveModel {
 706                project_id: ActiveValue::set(project_id),
 707                id: ActiveValue::set(server.id as i64),
 708                name: ActiveValue::set(server.name.clone()),
 709                worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
 710                capabilities: ActiveValue::set(update.capabilities.clone()),
 711            })
 712            .on_conflict(
 713                OnConflict::columns([
 714                    language_server::Column::ProjectId,
 715                    language_server::Column::Id,
 716                ])
 717                .update_columns([
 718                    language_server::Column::Name,
 719                    language_server::Column::Capabilities,
 720                    language_server::Column::WorktreeId,
 721                ])
 722                .to_owned(),
 723            )
 724            .exec(&*tx)
 725            .await?;
 726
 727            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 728            Ok(connection_ids)
 729        })
 730        .await
 731    }
 732
 733    /// Updates the worktree settings for the given connection.
 734    pub async fn update_worktree_settings(
 735        &self,
 736        update: &proto::UpdateWorktreeSettings,
 737        connection: ConnectionId,
 738    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 739        let project_id = ProjectId::from_proto(update.project_id);
 740        let kind = match update.kind {
 741            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 742                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 743            None => proto::LocalSettingsKind::Settings,
 744        };
 745        let kind = LocalSettingsKind::from_proto(kind);
 746        self.project_transaction(project_id, |tx| async move {
 747            // Ensure the update comes from the host.
 748            let project = project::Entity::find_by_id(project_id)
 749                .one(&*tx)
 750                .await?
 751                .context("no such project")?;
 752            if project.host_connection()? != connection {
 753                return Err(anyhow!("can't update a project hosted by someone else"))?;
 754            }
 755
 756            if let Some(content) = &update.content {
 757                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 758                    project_id: ActiveValue::Set(project_id),
 759                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 760                    path: ActiveValue::Set(update.path.clone()),
 761                    content: ActiveValue::Set(content.clone()),
 762                    kind: ActiveValue::Set(kind),
 763                    outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
 764                })
 765                .on_conflict(
 766                    OnConflict::columns([
 767                        worktree_settings_file::Column::ProjectId,
 768                        worktree_settings_file::Column::WorktreeId,
 769                        worktree_settings_file::Column::Path,
 770                    ])
 771                    .update_columns([
 772                        worktree_settings_file::Column::Content,
 773                        worktree_settings_file::Column::OutsideWorktree,
 774                    ])
 775                    .to_owned(),
 776                )
 777                .exec(&*tx)
 778                .await?;
 779            } else {
 780                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 781                    project_id: ActiveValue::Set(project_id),
 782                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 783                    path: ActiveValue::Set(update.path.clone()),
 784                    ..Default::default()
 785                })
 786                .exec(&*tx)
 787                .await?;
 788            }
 789
 790            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 791            Ok(connection_ids)
 792        })
 793        .await
 794    }
 795
 796    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 797        self.transaction(|tx| async move {
 798            Ok(project::Entity::find_by_id(id)
 799                .one(&*tx)
 800                .await?
 801                .context("no such project")?)
 802        })
 803        .await
 804    }
 805
 806    /// Adds the given connection to the specified project
 807    /// in the current room.
 808    pub async fn join_project(
 809        &self,
 810        project_id: ProjectId,
 811        connection: ConnectionId,
 812        user_id: UserId,
 813        committer_name: Option<String>,
 814        committer_email: Option<String>,
 815    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 816        self.project_transaction(project_id, move |tx| {
 817            let committer_name = committer_name.clone();
 818            let committer_email = committer_email.clone();
 819            async move {
 820                let (project, role) = self
 821                    .access_project(project_id, connection, Capability::ReadOnly, &tx)
 822                    .await?;
 823                self.join_project_internal(
 824                    project,
 825                    user_id,
 826                    committer_name,
 827                    committer_email,
 828                    connection,
 829                    role,
 830                    &tx,
 831                )
 832                .await
 833            }
 834        })
 835        .await
 836    }
 837
 838    async fn join_project_internal(
 839        &self,
 840        project: project::Model,
 841        user_id: UserId,
 842        committer_name: Option<String>,
 843        committer_email: Option<String>,
 844        connection: ConnectionId,
 845        role: ChannelRole,
 846        tx: &DatabaseTransaction,
 847    ) -> Result<(Project, ReplicaId)> {
 848        let mut collaborators = project
 849            .find_related(project_collaborator::Entity)
 850            .all(tx)
 851            .await?;
 852        let replica_ids = collaborators
 853            .iter()
 854            .map(|c| c.replica_id)
 855            .collect::<HashSet<_>>();
 856        let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
 857        while replica_ids.contains(&replica_id) {
 858            replica_id.0 += 1;
 859        }
 860        let new_collaborator = project_collaborator::ActiveModel {
 861            project_id: ActiveValue::set(project.id),
 862            connection_id: ActiveValue::set(connection.id as i32),
 863            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 864            user_id: ActiveValue::set(user_id),
 865            replica_id: ActiveValue::set(replica_id),
 866            is_host: ActiveValue::set(false),
 867            id: ActiveValue::NotSet,
 868            committer_name: ActiveValue::set(committer_name),
 869            committer_email: ActiveValue::set(committer_email),
 870        }
 871        .insert(tx)
 872        .await?;
 873        collaborators.push(new_collaborator);
 874
 875        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 876        let mut worktrees = db_worktrees
 877            .into_iter()
 878            .map(|db_worktree| {
 879                (
 880                    db_worktree.id as u64,
 881                    Worktree {
 882                        id: db_worktree.id as u64,
 883                        abs_path: db_worktree.abs_path,
 884                        root_name: db_worktree.root_name,
 885                        visible: db_worktree.visible,
 886                        entries: Default::default(),
 887                        diagnostic_summaries: Default::default(),
 888                        settings_files: Default::default(),
 889                        scan_id: db_worktree.scan_id as u64,
 890                        completed_scan_id: db_worktree.completed_scan_id as u64,
 891                        legacy_repository_entries: Default::default(),
 892                    },
 893                )
 894            })
 895            .collect::<BTreeMap<_, _>>();
 896
 897        // Populate worktree entries.
 898        {
 899            let mut db_entries = worktree_entry::Entity::find()
 900                .filter(
 901                    Condition::all()
 902                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 903                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 904                )
 905                .stream(tx)
 906                .await?;
 907            while let Some(db_entry) = db_entries.next().await {
 908                let db_entry = db_entry?;
 909                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 910                    worktree.entries.push(proto::Entry {
 911                        id: db_entry.id as u64,
 912                        is_dir: db_entry.is_dir,
 913                        path: db_entry.path,
 914                        inode: db_entry.inode as u64,
 915                        mtime: Some(proto::Timestamp {
 916                            seconds: db_entry.mtime_seconds as u64,
 917                            nanos: db_entry.mtime_nanos as u32,
 918                        }),
 919                        canonical_path: db_entry.canonical_path,
 920                        is_ignored: db_entry.is_ignored,
 921                        is_external: db_entry.is_external,
 922                        is_hidden: db_entry.is_hidden,
 923                        // This is only used in the summarization backlog, so if it's None,
 924                        // that just means we won't be able to detect when to resummarize
 925                        // based on total number of backlogged bytes - instead, we'd go
 926                        // on number of files only. That shouldn't be a huge deal in practice.
 927                        size: None,
 928                        is_fifo: db_entry.is_fifo,
 929                    });
 930                }
 931            }
 932        }
 933
 934        // Populate repository entries.
 935        let mut repositories = Vec::new();
 936        {
 937            let db_repository_entries = project_repository::Entity::find()
 938                .filter(
 939                    Condition::all()
 940                        .add(project_repository::Column::ProjectId.eq(project.id))
 941                        .add(project_repository::Column::IsDeleted.eq(false)),
 942                )
 943                .all(tx)
 944                .await?;
 945            for db_repository_entry in db_repository_entries {
 946                let mut repository_statuses = project_repository_statuses::Entity::find()
 947                    .filter(
 948                        Condition::all()
 949                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 950                            .add(
 951                                project_repository_statuses::Column::RepositoryId
 952                                    .eq(db_repository_entry.id),
 953                            )
 954                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 955                    )
 956                    .stream(tx)
 957                    .await?;
 958                let mut updated_statuses = Vec::new();
 959                while let Some(status_entry) = repository_statuses.next().await {
 960                    let status_entry = status_entry?;
 961                    updated_statuses.push(db_status_to_proto(status_entry)?);
 962                }
 963
 964                let current_merge_conflicts = db_repository_entry
 965                    .current_merge_conflicts
 966                    .as_ref()
 967                    .map(|conflicts| serde_json::from_str(conflicts))
 968                    .transpose()?
 969                    .unwrap_or_default();
 970
 971                let branch_summary = db_repository_entry
 972                    .branch_summary
 973                    .as_ref()
 974                    .map(|branch_summary| serde_json::from_str(branch_summary))
 975                    .transpose()?
 976                    .unwrap_or_default();
 977
 978                let head_commit_details = db_repository_entry
 979                    .head_commit_details
 980                    .as_ref()
 981                    .map(|head_commit_details| serde_json::from_str(head_commit_details))
 982                    .transpose()?
 983                    .unwrap_or_default();
 984
 985                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 986                    .context("failed to deserialize repository's entry ids")?;
 987
 988                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 989                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 990                        worktree.legacy_repository_entries.insert(
 991                            db_repository_entry.id as u64,
 992                            proto::RepositoryEntry {
 993                                repository_id: db_repository_entry.id as u64,
 994                                updated_statuses,
 995                                removed_statuses: Vec::new(),
 996                                current_merge_conflicts,
 997                                branch_summary,
 998                            },
 999                        );
1000                    }
1001                } else {
1002                    repositories.push(proto::UpdateRepository {
1003                        project_id: db_repository_entry.project_id.0 as u64,
1004                        id: db_repository_entry.id as u64,
1005                        abs_path: db_repository_entry.abs_path.clone(),
1006                        entry_ids,
1007                        updated_statuses,
1008                        removed_statuses: Vec::new(),
1009                        current_merge_conflicts,
1010                        branch_summary,
1011                        head_commit_details,
1012                        scan_id: db_repository_entry.scan_id as u64,
1013                        is_last_update: true,
1014                        merge_message: db_repository_entry.merge_message,
1015                        stash_entries: Vec::new(),
1016                        remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
1017                        remote_origin_url: db_repository_entry.remote_origin_url.clone(),
1018                        original_repo_abs_path: Some(db_repository_entry.abs_path),
1019                    });
1020                }
1021            }
1022        }
1023
1024        // Populate worktree diagnostic summaries.
1025        {
1026            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1027                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1028                .stream(tx)
1029                .await?;
1030            while let Some(db_summary) = db_summaries.next().await {
1031                let db_summary = db_summary?;
1032                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1033                    worktree
1034                        .diagnostic_summaries
1035                        .push(proto::DiagnosticSummary {
1036                            path: db_summary.path,
1037                            language_server_id: db_summary.language_server_id as u64,
1038                            error_count: db_summary.error_count as u32,
1039                            warning_count: db_summary.warning_count as u32,
1040                        });
1041                }
1042            }
1043        }
1044
1045        // Populate worktree settings files
1046        {
1047            let mut db_settings_files = worktree_settings_file::Entity::find()
1048                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1049                .stream(tx)
1050                .await?;
1051            while let Some(db_settings_file) = db_settings_files.next().await {
1052                let db_settings_file = db_settings_file?;
1053                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1054                    worktree.settings_files.push(WorktreeSettingsFile {
1055                        path: db_settings_file.path,
1056                        content: db_settings_file.content,
1057                        kind: db_settings_file.kind,
1058                        outside_worktree: db_settings_file.outside_worktree,
1059                    });
1060                }
1061            }
1062        }
1063
1064        // Populate language servers.
1065        let language_servers = project
1066            .find_related(language_server::Entity)
1067            .all(tx)
1068            .await?;
1069
1070        let path_style = if project.windows_paths {
1071            PathStyle::Windows
1072        } else {
1073            PathStyle::Posix
1074        };
1075
1076        let project = Project {
1077            id: project.id,
1078            role,
1079            collaborators: collaborators
1080                .into_iter()
1081                .map(|collaborator| ProjectCollaborator {
1082                    connection_id: collaborator.connection(),
1083                    user_id: collaborator.user_id,
1084                    replica_id: collaborator.replica_id,
1085                    is_host: collaborator.is_host,
1086                    committer_name: collaborator.committer_name,
1087                    committer_email: collaborator.committer_email,
1088                })
1089                .collect(),
1090            worktrees,
1091            repositories,
1092            language_servers: language_servers
1093                .into_iter()
1094                .map(|language_server| LanguageServer {
1095                    server: proto::LanguageServer {
1096                        id: language_server.id as u64,
1097                        name: language_server.name,
1098                        worktree_id: language_server.worktree_id.map(|id| id as u64),
1099                    },
1100                    capabilities: language_server.capabilities,
1101                })
1102                .collect(),
1103            path_style,
1104        };
1105        Ok((project, replica_id as ReplicaId))
1106    }
1107
1108    /// Removes the given connection from the specified project.
1109    pub async fn leave_project(
1110        &self,
1111        project_id: ProjectId,
1112        connection: ConnectionId,
1113    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1114        self.project_transaction(project_id, |tx| async move {
1115            let result = project_collaborator::Entity::delete_many()
1116                .filter(
1117                    Condition::all()
1118                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1119                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1120                        .add(
1121                            project_collaborator::Column::ConnectionServerId
1122                                .eq(connection.owner_id as i32),
1123                        ),
1124                )
1125                .exec(&*tx)
1126                .await?;
1127            if result.rows_affected == 0 {
1128                Err(anyhow!("not a collaborator on this project"))?;
1129            }
1130
1131            let project = project::Entity::find_by_id(project_id)
1132                .one(&*tx)
1133                .await?
1134                .context("no such project")?;
1135            let collaborators = project
1136                .find_related(project_collaborator::Entity)
1137                .all(&*tx)
1138                .await?;
1139            let connection_ids: Vec<ConnectionId> = collaborators
1140                .into_iter()
1141                .map(|collaborator| collaborator.connection())
1142                .collect();
1143
1144            follower::Entity::delete_many()
1145                .filter(
1146                    Condition::any()
1147                        .add(
1148                            Condition::all()
1149                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1150                                .add(
1151                                    follower::Column::LeaderConnectionServerId
1152                                        .eq(connection.owner_id),
1153                                )
1154                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1155                        )
1156                        .add(
1157                            Condition::all()
1158                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1159                                .add(
1160                                    follower::Column::FollowerConnectionServerId
1161                                        .eq(connection.owner_id),
1162                                )
1163                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1164                        ),
1165                )
1166                .exec(&*tx)
1167                .await?;
1168
1169            let room = if let Some(room_id) = project.room_id {
1170                Some(self.get_room(room_id, &tx).await?)
1171            } else {
1172                None
1173            };
1174
1175            let left_project = LeftProject {
1176                id: project_id,
1177                should_unshare: connection == project.host_connection()?,
1178                connection_ids,
1179            };
1180            Ok((room, left_project))
1181        })
1182        .await
1183    }
1184
1185    pub async fn check_user_is_project_host(
1186        &self,
1187        project_id: ProjectId,
1188        connection_id: ConnectionId,
1189    ) -> Result<()> {
1190        self.project_transaction(project_id, |tx| async move {
1191            project::Entity::find()
1192                .filter(
1193                    Condition::all()
1194                        .add(project::Column::Id.eq(project_id))
1195                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1196                        .add(
1197                            project::Column::HostConnectionServerId
1198                                .eq(Some(connection_id.owner_id as i32)),
1199                        ),
1200                )
1201                .one(&*tx)
1202                .await?
1203                .context("failed to read project host")?;
1204
1205            Ok(())
1206        })
1207        .await
1208        .map(|guard| guard.into_inner())
1209    }
1210
1211    /// Returns the current project if the given user is authorized to access it with the specified capability.
1212    pub async fn access_project(
1213        &self,
1214        project_id: ProjectId,
1215        connection_id: ConnectionId,
1216        capability: Capability,
1217        tx: &DatabaseTransaction,
1218    ) -> Result<(project::Model, ChannelRole)> {
1219        let project = project::Entity::find_by_id(project_id)
1220            .one(tx)
1221            .await?
1222            .context("no such project")?;
1223
1224        let role_from_room = if let Some(room_id) = project.room_id {
1225            room_participant::Entity::find()
1226                .filter(room_participant::Column::RoomId.eq(room_id))
1227                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1228                .one(tx)
1229                .await?
1230                .and_then(|participant| participant.role)
1231        } else {
1232            None
1233        };
1234
1235        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1236
1237        match capability {
1238            Capability::ReadWrite => {
1239                if !role.can_edit_projects() {
1240                    return Err(anyhow!("not authorized to edit projects"))?;
1241                }
1242            }
1243            Capability::ReadOnly => {
1244                if !role.can_read_projects() {
1245                    return Err(anyhow!("not authorized to read projects"))?;
1246                }
1247            }
1248        }
1249
1250        Ok((project, role))
1251    }
1252
1253    /// Returns the host connection for a read-only request to join a shared project.
1254    pub async fn host_for_read_only_project_request(
1255        &self,
1256        project_id: ProjectId,
1257        connection_id: ConnectionId,
1258    ) -> Result<ConnectionId> {
1259        self.project_transaction(project_id, |tx| async move {
1260            let (project, _) = self
1261                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1262                .await?;
1263            project.host_connection()
1264        })
1265        .await
1266        .map(|guard| guard.into_inner())
1267    }
1268
1269    /// Returns the host connection for a request to join a shared project.
1270    pub async fn host_for_mutating_project_request(
1271        &self,
1272        project_id: ProjectId,
1273        connection_id: ConnectionId,
1274    ) -> Result<ConnectionId> {
1275        self.project_transaction(project_id, |tx| async move {
1276            let (project, _) = self
1277                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1278                .await?;
1279            project.host_connection()
1280        })
1281        .await
1282        .map(|guard| guard.into_inner())
1283    }
1284
1285    pub async fn connections_for_buffer_update(
1286        &self,
1287        project_id: ProjectId,
1288        connection_id: ConnectionId,
1289        capability: Capability,
1290    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1291        self.project_transaction(project_id, |tx| async move {
1292            // Authorize
1293            let (project, _) = self
1294                .access_project(project_id, connection_id, capability, &tx)
1295                .await?;
1296
1297            let host_connection_id = project.host_connection()?;
1298
1299            let collaborators = project_collaborator::Entity::find()
1300                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1301                .all(&*tx)
1302                .await?;
1303
1304            let guest_connection_ids = collaborators
1305                .into_iter()
1306                .filter_map(|collaborator| {
1307                    if collaborator.is_host {
1308                        None
1309                    } else {
1310                        Some(collaborator.connection())
1311                    }
1312                })
1313                .collect();
1314
1315            Ok((host_connection_id, guest_connection_ids))
1316        })
1317        .await
1318    }
1319
1320    /// Returns the connection IDs in the given project.
1321    ///
1322    /// The provided `connection_id` must also be a collaborator in the project,
1323    /// otherwise an error will be returned.
1324    pub async fn project_connection_ids(
1325        &self,
1326        project_id: ProjectId,
1327        connection_id: ConnectionId,
1328        exclude_dev_server: bool,
1329    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1330        self.project_transaction(project_id, |tx| async move {
1331            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1332                .await
1333        })
1334        .await
1335    }
1336
1337    async fn internal_project_connection_ids(
1338        &self,
1339        project_id: ProjectId,
1340        connection_id: ConnectionId,
1341        exclude_dev_server: bool,
1342        tx: &DatabaseTransaction,
1343    ) -> Result<HashSet<ConnectionId>> {
1344        let project = project::Entity::find_by_id(project_id)
1345            .one(tx)
1346            .await?
1347            .context("no such project")?;
1348
1349        let mut collaborators = project_collaborator::Entity::find()
1350            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1351            .stream(tx)
1352            .await?;
1353
1354        let mut connection_ids = HashSet::default();
1355        if let Some(host_connection) = project.host_connection().log_err()
1356            && !exclude_dev_server
1357        {
1358            connection_ids.insert(host_connection);
1359        }
1360
1361        while let Some(collaborator) = collaborators.next().await {
1362            let collaborator = collaborator?;
1363            connection_ids.insert(collaborator.connection());
1364        }
1365
1366        if connection_ids.contains(&connection_id)
1367            || Some(connection_id) == project.host_connection().ok()
1368        {
1369            Ok(connection_ids)
1370        } else {
1371            Err(anyhow!(
1372                "can only send project updates to a project you're in"
1373            ))?
1374        }
1375    }
1376
1377    async fn project_guest_connection_ids(
1378        &self,
1379        project_id: ProjectId,
1380        tx: &DatabaseTransaction,
1381    ) -> Result<Vec<ConnectionId>> {
1382        let mut collaborators = project_collaborator::Entity::find()
1383            .filter(
1384                project_collaborator::Column::ProjectId
1385                    .eq(project_id)
1386                    .and(project_collaborator::Column::IsHost.eq(false)),
1387            )
1388            .stream(tx)
1389            .await?;
1390
1391        let mut guest_connection_ids = Vec::new();
1392        while let Some(collaborator) = collaborators.next().await {
1393            let collaborator = collaborator?;
1394            guest_connection_ids.push(collaborator.connection());
1395        }
1396        Ok(guest_connection_ids)
1397    }
1398
1399    /// Returns the [`RoomId`] for the given project.
1400    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1401        self.transaction(|tx| async move {
1402            Ok(project::Entity::find_by_id(project_id)
1403                .one(&*tx)
1404                .await?
1405                .and_then(|project| project.room_id))
1406        })
1407        .await
1408    }
1409
1410    pub async fn check_room_participants(
1411        &self,
1412        room_id: RoomId,
1413        leader_id: ConnectionId,
1414        follower_id: ConnectionId,
1415    ) -> Result<()> {
1416        self.transaction(|tx| async move {
1417            use room_participant::Column;
1418
1419            let count = room_participant::Entity::find()
1420                .filter(
1421                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1422                        Condition::any()
1423                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1424                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1425                            ))
1426                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1427                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1428                            )),
1429                    ),
1430                )
1431                .count(&*tx)
1432                .await?;
1433
1434            if count < 2 {
1435                Err(anyhow!("not room participants"))?;
1436            }
1437
1438            Ok(())
1439        })
1440        .await
1441    }
1442
1443    /// Adds the given follower connection as a follower of the given leader connection.
1444    pub async fn follow(
1445        &self,
1446        room_id: RoomId,
1447        project_id: ProjectId,
1448        leader_connection: ConnectionId,
1449        follower_connection: ConnectionId,
1450    ) -> Result<TransactionGuard<proto::Room>> {
1451        self.room_transaction(room_id, |tx| async move {
1452            follower::ActiveModel {
1453                room_id: ActiveValue::set(room_id),
1454                project_id: ActiveValue::set(project_id),
1455                leader_connection_server_id: ActiveValue::set(ServerId(
1456                    leader_connection.owner_id as i32,
1457                )),
1458                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1459                follower_connection_server_id: ActiveValue::set(ServerId(
1460                    follower_connection.owner_id as i32,
1461                )),
1462                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1463                ..Default::default()
1464            }
1465            .insert(&*tx)
1466            .await?;
1467
1468            let room = self.get_room(room_id, &tx).await?;
1469            Ok(room)
1470        })
1471        .await
1472    }
1473
1474    /// Removes the given follower connection as a follower of the given leader connection.
1475    pub async fn unfollow(
1476        &self,
1477        room_id: RoomId,
1478        project_id: ProjectId,
1479        leader_connection: ConnectionId,
1480        follower_connection: ConnectionId,
1481    ) -> Result<TransactionGuard<proto::Room>> {
1482        self.room_transaction(room_id, |tx| async move {
1483            follower::Entity::delete_many()
1484                .filter(
1485                    Condition::all()
1486                        .add(follower::Column::RoomId.eq(room_id))
1487                        .add(follower::Column::ProjectId.eq(project_id))
1488                        .add(
1489                            follower::Column::LeaderConnectionServerId
1490                                .eq(leader_connection.owner_id),
1491                        )
1492                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1493                        .add(
1494                            follower::Column::FollowerConnectionServerId
1495                                .eq(follower_connection.owner_id),
1496                        )
1497                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1498                )
1499                .exec(&*tx)
1500                .await?;
1501
1502            let room = self.get_room(room_id, &tx).await?;
1503            Ok(room)
1504        })
1505        .await
1506    }
1507}