projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .context("could not find participant")?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                ..Default::default()
 102            }
 103            .insert(&*tx)
 104            .await?;
 105
 106            let room = self.get_room(room_id, &tx).await?;
 107            Ok((project.id, room))
 108        })
 109        .await
 110    }
 111
 112    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 113        self.weak_transaction(|tx| async move {
 114            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 115            Ok(())
 116        })
 117        .await
 118    }
 119
 120    /// Unshares the given project.
 121    pub async fn unshare_project(
 122        &self,
 123        project_id: ProjectId,
 124        connection: ConnectionId,
 125    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 126        self.project_transaction(project_id, |tx| async move {
 127            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 128            let project = project::Entity::find_by_id(project_id)
 129                .one(&*tx)
 130                .await?
 131                .context("project not found")?;
 132            let room = if let Some(room_id) = project.room_id {
 133                Some(self.get_room(room_id, &tx).await?)
 134            } else {
 135                None
 136            };
 137            if project.host_connection()? == connection {
 138                return Ok((true, room, guest_connection_ids));
 139            }
 140            Err(anyhow!("cannot unshare a project hosted by another user"))?
 141        })
 142        .await
 143    }
 144
 145    /// Updates the worktrees associated with the given project.
 146    pub async fn update_project(
 147        &self,
 148        project_id: ProjectId,
 149        connection: ConnectionId,
 150        worktrees: &[proto::WorktreeMetadata],
 151    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 152        self.project_transaction(project_id, |tx| async move {
 153            let project = project::Entity::find_by_id(project_id)
 154                .filter(
 155                    Condition::all()
 156                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 157                        .add(
 158                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 159                        ),
 160                )
 161                .one(&*tx)
 162                .await?
 163                .context("no such project")?;
 164
 165            self.update_project_worktrees(project.id, worktrees, &tx)
 166                .await?;
 167
 168            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 169
 170            let room = if let Some(room_id) = project.room_id {
 171                Some(self.get_room(room_id, &tx).await?)
 172            } else {
 173                None
 174            };
 175
 176            Ok((room, guest_connection_ids))
 177        })
 178        .await
 179    }
 180
 181    pub(in crate::db) async fn update_project_worktrees(
 182        &self,
 183        project_id: ProjectId,
 184        worktrees: &[proto::WorktreeMetadata],
 185        tx: &DatabaseTransaction,
 186    ) -> Result<()> {
 187        if !worktrees.is_empty() {
 188            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 189                id: ActiveValue::set(worktree.id as i64),
 190                project_id: ActiveValue::set(project_id),
 191                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 192                root_name: ActiveValue::set(worktree.root_name.clone()),
 193                visible: ActiveValue::set(worktree.visible),
 194                scan_id: ActiveValue::set(0),
 195                completed_scan_id: ActiveValue::set(0),
 196            }))
 197            .on_conflict(
 198                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 199                    .update_column(worktree::Column::RootName)
 200                    .to_owned(),
 201            )
 202            .exec(tx)
 203            .await?;
 204        }
 205
 206        worktree::Entity::delete_many()
 207            .filter(worktree::Column::ProjectId.eq(project_id).and(
 208                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 209            ))
 210            .exec(tx)
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    pub async fn update_worktree(
 217        &self,
 218        update: &proto::UpdateWorktree,
 219        connection: ConnectionId,
 220    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 221        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 222            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 223        {
 224            return Err(anyhow!(
 225                "invalid worktree update. removed entries: {}, updated entries: {}",
 226                update.removed_entries.len(),
 227                update.updated_entries.len()
 228            ))?;
 229        }
 230
 231        let project_id = ProjectId::from_proto(update.project_id);
 232        let worktree_id = update.worktree_id as i64;
 233        self.project_transaction(project_id, |tx| async move {
 234            // Ensure the update comes from the host.
 235            let _project = project::Entity::find_by_id(project_id)
 236                .filter(
 237                    Condition::all()
 238                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 239                        .add(
 240                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 241                        ),
 242                )
 243                .one(&*tx)
 244                .await?
 245                .with_context(|| format!("no such project: {project_id}"))?;
 246
 247            // Update metadata.
 248            worktree::Entity::update(worktree::ActiveModel {
 249                id: ActiveValue::set(worktree_id),
 250                project_id: ActiveValue::set(project_id),
 251                root_name: ActiveValue::set(update.root_name.clone()),
 252                scan_id: ActiveValue::set(update.scan_id as i64),
 253                completed_scan_id: if update.is_last_update {
 254                    ActiveValue::set(update.scan_id as i64)
 255                } else {
 256                    ActiveValue::default()
 257                },
 258                abs_path: ActiveValue::set(update.abs_path.clone()),
 259                ..Default::default()
 260            })
 261            .exec(&*tx)
 262            .await?;
 263
 264            if !update.updated_entries.is_empty() {
 265                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 266                    let mtime = entry.mtime.clone().unwrap_or_default();
 267                    worktree_entry::ActiveModel {
 268                        project_id: ActiveValue::set(project_id),
 269                        worktree_id: ActiveValue::set(worktree_id),
 270                        id: ActiveValue::set(entry.id as i64),
 271                        is_dir: ActiveValue::set(entry.is_dir),
 272                        path: ActiveValue::set(entry.path.clone()),
 273                        inode: ActiveValue::set(entry.inode as i64),
 274                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 275                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 276                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 277                        is_ignored: ActiveValue::set(entry.is_ignored),
 278                        git_status: ActiveValue::set(None),
 279                        is_external: ActiveValue::set(entry.is_external),
 280                        is_deleted: ActiveValue::set(false),
 281                        scan_id: ActiveValue::set(update.scan_id as i64),
 282                        is_fifo: ActiveValue::set(entry.is_fifo),
 283                    }
 284                }))
 285                .on_conflict(
 286                    OnConflict::columns([
 287                        worktree_entry::Column::ProjectId,
 288                        worktree_entry::Column::WorktreeId,
 289                        worktree_entry::Column::Id,
 290                    ])
 291                    .update_columns([
 292                        worktree_entry::Column::IsDir,
 293                        worktree_entry::Column::Path,
 294                        worktree_entry::Column::Inode,
 295                        worktree_entry::Column::MtimeSeconds,
 296                        worktree_entry::Column::MtimeNanos,
 297                        worktree_entry::Column::CanonicalPath,
 298                        worktree_entry::Column::IsIgnored,
 299                        worktree_entry::Column::ScanId,
 300                    ])
 301                    .to_owned(),
 302                )
 303                .exec(&*tx)
 304                .await?;
 305            }
 306
 307            if !update.removed_entries.is_empty() {
 308                worktree_entry::Entity::update_many()
 309                    .filter(
 310                        worktree_entry::Column::ProjectId
 311                            .eq(project_id)
 312                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 313                            .and(
 314                                worktree_entry::Column::Id
 315                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 316                            ),
 317                    )
 318                    .set(worktree_entry::ActiveModel {
 319                        is_deleted: ActiveValue::Set(true),
 320                        scan_id: ActiveValue::Set(update.scan_id as i64),
 321                        ..Default::default()
 322                    })
 323                    .exec(&*tx)
 324                    .await?;
 325            }
 326
 327            // Backward-compatibility for old Zed clients.
 328            //
 329            // Remove this block when Zed 1.80 stable has been out for a week.
 330            {
 331                if !update.updated_repositories.is_empty() {
 332                    project_repository::Entity::insert_many(
 333                        update.updated_repositories.iter().map(|repository| {
 334                            project_repository::ActiveModel {
 335                                project_id: ActiveValue::set(project_id),
 336                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 337                                id: ActiveValue::set(repository.repository_id as i64),
 338                                scan_id: ActiveValue::set(update.scan_id as i64),
 339                                is_deleted: ActiveValue::set(false),
 340                                branch_summary: ActiveValue::Set(
 341                                    repository
 342                                        .branch_summary
 343                                        .as_ref()
 344                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 345                                ),
 346                                current_merge_conflicts: ActiveValue::Set(Some(
 347                                    serde_json::to_string(&repository.current_merge_conflicts)
 348                                        .unwrap(),
 349                                )),
 350
 351                                // Old clients do not use abs path, entry ids or head_commit_details.
 352                                abs_path: ActiveValue::set(String::new()),
 353                                entry_ids: ActiveValue::set("[]".into()),
 354                                head_commit_details: ActiveValue::set(None),
 355                            }
 356                        }),
 357                    )
 358                    .on_conflict(
 359                        OnConflict::columns([
 360                            project_repository::Column::ProjectId,
 361                            project_repository::Column::Id,
 362                        ])
 363                        .update_columns([
 364                            project_repository::Column::ScanId,
 365                            project_repository::Column::BranchSummary,
 366                            project_repository::Column::CurrentMergeConflicts,
 367                        ])
 368                        .to_owned(),
 369                    )
 370                    .exec(&*tx)
 371                    .await?;
 372
 373                    let has_any_statuses = update
 374                        .updated_repositories
 375                        .iter()
 376                        .any(|repository| !repository.updated_statuses.is_empty());
 377
 378                    if has_any_statuses {
 379                        project_repository_statuses::Entity::insert_many(
 380                            update.updated_repositories.iter().flat_map(
 381                                |repository: &proto::RepositoryEntry| {
 382                                    repository.updated_statuses.iter().map(|status_entry| {
 383                                        let (repo_path, status_kind, first_status, second_status) =
 384                                            proto_status_to_db(status_entry.clone());
 385                                        project_repository_statuses::ActiveModel {
 386                                            project_id: ActiveValue::set(project_id),
 387                                            repository_id: ActiveValue::set(
 388                                                repository.repository_id as i64,
 389                                            ),
 390                                            scan_id: ActiveValue::set(update.scan_id as i64),
 391                                            is_deleted: ActiveValue::set(false),
 392                                            repo_path: ActiveValue::set(repo_path),
 393                                            status: ActiveValue::set(0),
 394                                            status_kind: ActiveValue::set(status_kind),
 395                                            first_status: ActiveValue::set(first_status),
 396                                            second_status: ActiveValue::set(second_status),
 397                                        }
 398                                    })
 399                                },
 400                            ),
 401                        )
 402                        .on_conflict(
 403                            OnConflict::columns([
 404                                project_repository_statuses::Column::ProjectId,
 405                                project_repository_statuses::Column::RepositoryId,
 406                                project_repository_statuses::Column::RepoPath,
 407                            ])
 408                            .update_columns([
 409                                project_repository_statuses::Column::ScanId,
 410                                project_repository_statuses::Column::StatusKind,
 411                                project_repository_statuses::Column::FirstStatus,
 412                                project_repository_statuses::Column::SecondStatus,
 413                            ])
 414                            .to_owned(),
 415                        )
 416                        .exec(&*tx)
 417                        .await?;
 418                    }
 419
 420                    for repo in &update.updated_repositories {
 421                        if !repo.removed_statuses.is_empty() {
 422                            project_repository_statuses::Entity::update_many()
 423                                .filter(
 424                                    project_repository_statuses::Column::ProjectId
 425                                        .eq(project_id)
 426                                        .and(
 427                                            project_repository_statuses::Column::RepositoryId
 428                                                .eq(repo.repository_id),
 429                                        )
 430                                        .and(
 431                                            project_repository_statuses::Column::RepoPath
 432                                                .is_in(repo.removed_statuses.iter()),
 433                                        ),
 434                                )
 435                                .set(project_repository_statuses::ActiveModel {
 436                                    is_deleted: ActiveValue::Set(true),
 437                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 438                                    ..Default::default()
 439                                })
 440                                .exec(&*tx)
 441                                .await?;
 442                        }
 443                    }
 444                }
 445
 446                if !update.removed_repositories.is_empty() {
 447                    project_repository::Entity::update_many()
 448                        .filter(
 449                            project_repository::Column::ProjectId
 450                                .eq(project_id)
 451                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 452                                .and(project_repository::Column::Id.is_in(
 453                                    update.removed_repositories.iter().map(|id| *id as i64),
 454                                )),
 455                        )
 456                        .set(project_repository::ActiveModel {
 457                            is_deleted: ActiveValue::Set(true),
 458                            scan_id: ActiveValue::Set(update.scan_id as i64),
 459                            ..Default::default()
 460                        })
 461                        .exec(&*tx)
 462                        .await?;
 463                }
 464            }
 465
 466            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 467            Ok(connection_ids)
 468        })
 469        .await
 470    }
 471
 472    pub async fn update_repository(
 473        &self,
 474        update: &proto::UpdateRepository,
 475        _connection: ConnectionId,
 476    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 477        let project_id = ProjectId::from_proto(update.project_id);
 478        let repository_id = update.id as i64;
 479        self.project_transaction(project_id, |tx| async move {
 480            project_repository::Entity::insert(project_repository::ActiveModel {
 481                project_id: ActiveValue::set(project_id),
 482                id: ActiveValue::set(repository_id),
 483                legacy_worktree_id: ActiveValue::set(None),
 484                abs_path: ActiveValue::set(update.abs_path.clone()),
 485                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 486                scan_id: ActiveValue::set(update.scan_id as i64),
 487                is_deleted: ActiveValue::set(false),
 488                branch_summary: ActiveValue::Set(
 489                    update
 490                        .branch_summary
 491                        .as_ref()
 492                        .map(|summary| serde_json::to_string(summary).unwrap()),
 493                ),
 494                head_commit_details: ActiveValue::Set(
 495                    update
 496                        .head_commit_details
 497                        .as_ref()
 498                        .map(|details| serde_json::to_string(details).unwrap()),
 499                ),
 500                current_merge_conflicts: ActiveValue::Set(Some(
 501                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 502                )),
 503            })
 504            .on_conflict(
 505                OnConflict::columns([
 506                    project_repository::Column::ProjectId,
 507                    project_repository::Column::Id,
 508                ])
 509                .update_columns([
 510                    project_repository::Column::ScanId,
 511                    project_repository::Column::BranchSummary,
 512                    project_repository::Column::EntryIds,
 513                    project_repository::Column::AbsPath,
 514                    project_repository::Column::CurrentMergeConflicts,
 515                    project_repository::Column::HeadCommitDetails,
 516                ])
 517                .to_owned(),
 518            )
 519            .exec(&*tx)
 520            .await?;
 521
 522            let has_any_statuses = !update.updated_statuses.is_empty();
 523
 524            if has_any_statuses {
 525                project_repository_statuses::Entity::insert_many(
 526                    update.updated_statuses.iter().map(|status_entry| {
 527                        let (repo_path, status_kind, first_status, second_status) =
 528                            proto_status_to_db(status_entry.clone());
 529                        project_repository_statuses::ActiveModel {
 530                            project_id: ActiveValue::set(project_id),
 531                            repository_id: ActiveValue::set(repository_id),
 532                            scan_id: ActiveValue::set(update.scan_id as i64),
 533                            is_deleted: ActiveValue::set(false),
 534                            repo_path: ActiveValue::set(repo_path),
 535                            status: ActiveValue::set(0),
 536                            status_kind: ActiveValue::set(status_kind),
 537                            first_status: ActiveValue::set(first_status),
 538                            second_status: ActiveValue::set(second_status),
 539                        }
 540                    }),
 541                )
 542                .on_conflict(
 543                    OnConflict::columns([
 544                        project_repository_statuses::Column::ProjectId,
 545                        project_repository_statuses::Column::RepositoryId,
 546                        project_repository_statuses::Column::RepoPath,
 547                    ])
 548                    .update_columns([
 549                        project_repository_statuses::Column::ScanId,
 550                        project_repository_statuses::Column::StatusKind,
 551                        project_repository_statuses::Column::FirstStatus,
 552                        project_repository_statuses::Column::SecondStatus,
 553                    ])
 554                    .to_owned(),
 555                )
 556                .exec(&*tx)
 557                .await?;
 558            }
 559
 560            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 561
 562            if has_any_removed_statuses {
 563                project_repository_statuses::Entity::update_many()
 564                    .filter(
 565                        project_repository_statuses::Column::ProjectId
 566                            .eq(project_id)
 567                            .and(
 568                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 569                            )
 570                            .and(
 571                                project_repository_statuses::Column::RepoPath
 572                                    .is_in(update.removed_statuses.iter()),
 573                            ),
 574                    )
 575                    .set(project_repository_statuses::ActiveModel {
 576                        is_deleted: ActiveValue::Set(true),
 577                        scan_id: ActiveValue::Set(update.scan_id as i64),
 578                        ..Default::default()
 579                    })
 580                    .exec(&*tx)
 581                    .await?;
 582            }
 583
 584            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 585            Ok(connection_ids)
 586        })
 587        .await
 588    }
 589
 590    pub async fn remove_repository(
 591        &self,
 592        remove: &proto::RemoveRepository,
 593        _connection: ConnectionId,
 594    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 595        let project_id = ProjectId::from_proto(remove.project_id);
 596        let repository_id = remove.id as i64;
 597        self.project_transaction(project_id, |tx| async move {
 598            project_repository::Entity::update_many()
 599                .filter(
 600                    project_repository::Column::ProjectId
 601                        .eq(project_id)
 602                        .and(project_repository::Column::Id.eq(repository_id)),
 603                )
 604                .set(project_repository::ActiveModel {
 605                    is_deleted: ActiveValue::Set(true),
 606                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 607                    ..Default::default()
 608                })
 609                .exec(&*tx)
 610                .await?;
 611
 612            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 613            Ok(connection_ids)
 614        })
 615        .await
 616    }
 617
 618    /// Updates the diagnostic summary for the given connection.
 619    pub async fn update_diagnostic_summary(
 620        &self,
 621        update: &proto::UpdateDiagnosticSummary,
 622        connection: ConnectionId,
 623    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 624        let project_id = ProjectId::from_proto(update.project_id);
 625        let worktree_id = update.worktree_id as i64;
 626        self.project_transaction(project_id, |tx| async move {
 627            let summary = update.summary.as_ref().context("invalid summary")?;
 628
 629            // Ensure the update comes from the host.
 630            let project = project::Entity::find_by_id(project_id)
 631                .one(&*tx)
 632                .await?
 633                .context("no such project")?;
 634            if project.host_connection()? != connection {
 635                return Err(anyhow!("can't update a project hosted by someone else"))?;
 636            }
 637
 638            // Update summary.
 639            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 640                project_id: ActiveValue::set(project_id),
 641                worktree_id: ActiveValue::set(worktree_id),
 642                path: ActiveValue::set(summary.path.clone()),
 643                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 644                error_count: ActiveValue::set(summary.error_count as i32),
 645                warning_count: ActiveValue::set(summary.warning_count as i32),
 646            })
 647            .on_conflict(
 648                OnConflict::columns([
 649                    worktree_diagnostic_summary::Column::ProjectId,
 650                    worktree_diagnostic_summary::Column::WorktreeId,
 651                    worktree_diagnostic_summary::Column::Path,
 652                ])
 653                .update_columns([
 654                    worktree_diagnostic_summary::Column::LanguageServerId,
 655                    worktree_diagnostic_summary::Column::ErrorCount,
 656                    worktree_diagnostic_summary::Column::WarningCount,
 657                ])
 658                .to_owned(),
 659            )
 660            .exec(&*tx)
 661            .await?;
 662
 663            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 664            Ok(connection_ids)
 665        })
 666        .await
 667    }
 668
 669    /// Starts the language server for the given connection.
 670    pub async fn start_language_server(
 671        &self,
 672        update: &proto::StartLanguageServer,
 673        connection: ConnectionId,
 674    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 675        let project_id = ProjectId::from_proto(update.project_id);
 676        self.project_transaction(project_id, |tx| async move {
 677            let server = update.server.as_ref().context("invalid language server")?;
 678
 679            // Ensure the update comes from the host.
 680            let project = project::Entity::find_by_id(project_id)
 681                .one(&*tx)
 682                .await?
 683                .context("no such project")?;
 684            if project.host_connection()? != connection {
 685                return Err(anyhow!("can't update a project hosted by someone else"))?;
 686            }
 687
 688            // Add the newly-started language server.
 689            language_server::Entity::insert(language_server::ActiveModel {
 690                project_id: ActiveValue::set(project_id),
 691                id: ActiveValue::set(server.id as i64),
 692                name: ActiveValue::set(server.name.clone()),
 693            })
 694            .on_conflict(
 695                OnConflict::columns([
 696                    language_server::Column::ProjectId,
 697                    language_server::Column::Id,
 698                ])
 699                .update_column(language_server::Column::Name)
 700                .to_owned(),
 701            )
 702            .exec(&*tx)
 703            .await?;
 704
 705            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 706            Ok(connection_ids)
 707        })
 708        .await
 709    }
 710
 711    /// Updates the worktree settings for the given connection.
 712    pub async fn update_worktree_settings(
 713        &self,
 714        update: &proto::UpdateWorktreeSettings,
 715        connection: ConnectionId,
 716    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 717        let project_id = ProjectId::from_proto(update.project_id);
 718        let kind = match update.kind {
 719            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 720                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 721            None => proto::LocalSettingsKind::Settings,
 722        };
 723        let kind = LocalSettingsKind::from_proto(kind);
 724        self.project_transaction(project_id, |tx| async move {
 725            // Ensure the update comes from the host.
 726            let project = project::Entity::find_by_id(project_id)
 727                .one(&*tx)
 728                .await?
 729                .context("no such project")?;
 730            if project.host_connection()? != connection {
 731                return Err(anyhow!("can't update a project hosted by someone else"))?;
 732            }
 733
 734            if let Some(content) = &update.content {
 735                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 736                    project_id: ActiveValue::Set(project_id),
 737                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 738                    path: ActiveValue::Set(update.path.clone()),
 739                    content: ActiveValue::Set(content.clone()),
 740                    kind: ActiveValue::Set(kind),
 741                })
 742                .on_conflict(
 743                    OnConflict::columns([
 744                        worktree_settings_file::Column::ProjectId,
 745                        worktree_settings_file::Column::WorktreeId,
 746                        worktree_settings_file::Column::Path,
 747                    ])
 748                    .update_column(worktree_settings_file::Column::Content)
 749                    .to_owned(),
 750                )
 751                .exec(&*tx)
 752                .await?;
 753            } else {
 754                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 755                    project_id: ActiveValue::Set(project_id),
 756                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 757                    path: ActiveValue::Set(update.path.clone()),
 758                    ..Default::default()
 759                })
 760                .exec(&*tx)
 761                .await?;
 762            }
 763
 764            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 765            Ok(connection_ids)
 766        })
 767        .await
 768    }
 769
 770    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 771        self.transaction(|tx| async move {
 772            Ok(project::Entity::find_by_id(id)
 773                .one(&*tx)
 774                .await?
 775                .context("no such project")?)
 776        })
 777        .await
 778    }
 779
 780    /// Adds the given connection to the specified project
 781    /// in the current room.
 782    pub async fn join_project(
 783        &self,
 784        project_id: ProjectId,
 785        connection: ConnectionId,
 786        user_id: UserId,
 787    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 788        self.project_transaction(project_id, |tx| async move {
 789            let (project, role) = self
 790                .access_project(project_id, connection, Capability::ReadOnly, &tx)
 791                .await?;
 792            self.join_project_internal(project, user_id, connection, role, &tx)
 793                .await
 794        })
 795        .await
 796    }
 797
 798    async fn join_project_internal(
 799        &self,
 800        project: project::Model,
 801        user_id: UserId,
 802        connection: ConnectionId,
 803        role: ChannelRole,
 804        tx: &DatabaseTransaction,
 805    ) -> Result<(Project, ReplicaId)> {
 806        let mut collaborators = project
 807            .find_related(project_collaborator::Entity)
 808            .all(tx)
 809            .await?;
 810        let replica_ids = collaborators
 811            .iter()
 812            .map(|c| c.replica_id)
 813            .collect::<HashSet<_>>();
 814        let mut replica_id = ReplicaId(1);
 815        while replica_ids.contains(&replica_id) {
 816            replica_id.0 += 1;
 817        }
 818        let new_collaborator = project_collaborator::ActiveModel {
 819            project_id: ActiveValue::set(project.id),
 820            connection_id: ActiveValue::set(connection.id as i32),
 821            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 822            user_id: ActiveValue::set(user_id),
 823            replica_id: ActiveValue::set(replica_id),
 824            is_host: ActiveValue::set(false),
 825            ..Default::default()
 826        }
 827        .insert(tx)
 828        .await?;
 829        collaborators.push(new_collaborator);
 830
 831        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 832        let mut worktrees = db_worktrees
 833            .into_iter()
 834            .map(|db_worktree| {
 835                (
 836                    db_worktree.id as u64,
 837                    Worktree {
 838                        id: db_worktree.id as u64,
 839                        abs_path: db_worktree.abs_path,
 840                        root_name: db_worktree.root_name,
 841                        visible: db_worktree.visible,
 842                        entries: Default::default(),
 843                        diagnostic_summaries: Default::default(),
 844                        settings_files: Default::default(),
 845                        scan_id: db_worktree.scan_id as u64,
 846                        completed_scan_id: db_worktree.completed_scan_id as u64,
 847                        legacy_repository_entries: Default::default(),
 848                    },
 849                )
 850            })
 851            .collect::<BTreeMap<_, _>>();
 852
 853        // Populate worktree entries.
 854        {
 855            let mut db_entries = worktree_entry::Entity::find()
 856                .filter(
 857                    Condition::all()
 858                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 859                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 860                )
 861                .stream(tx)
 862                .await?;
 863            while let Some(db_entry) = db_entries.next().await {
 864                let db_entry = db_entry?;
 865                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 866                    worktree.entries.push(proto::Entry {
 867                        id: db_entry.id as u64,
 868                        is_dir: db_entry.is_dir,
 869                        path: db_entry.path,
 870                        inode: db_entry.inode as u64,
 871                        mtime: Some(proto::Timestamp {
 872                            seconds: db_entry.mtime_seconds as u64,
 873                            nanos: db_entry.mtime_nanos as u32,
 874                        }),
 875                        canonical_path: db_entry.canonical_path,
 876                        is_ignored: db_entry.is_ignored,
 877                        is_external: db_entry.is_external,
 878                        // This is only used in the summarization backlog, so if it's None,
 879                        // that just means we won't be able to detect when to resummarize
 880                        // based on total number of backlogged bytes - instead, we'd go
 881                        // on number of files only. That shouldn't be a huge deal in practice.
 882                        size: None,
 883                        is_fifo: db_entry.is_fifo,
 884                    });
 885                }
 886            }
 887        }
 888
 889        // Populate repository entries.
 890        let mut repositories = Vec::new();
 891        {
 892            let db_repository_entries = project_repository::Entity::find()
 893                .filter(
 894                    Condition::all()
 895                        .add(project_repository::Column::ProjectId.eq(project.id))
 896                        .add(project_repository::Column::IsDeleted.eq(false)),
 897                )
 898                .all(tx)
 899                .await?;
 900            for db_repository_entry in db_repository_entries {
 901                let mut repository_statuses = project_repository_statuses::Entity::find()
 902                    .filter(
 903                        Condition::all()
 904                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 905                            .add(
 906                                project_repository_statuses::Column::RepositoryId
 907                                    .eq(db_repository_entry.id),
 908                            )
 909                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 910                    )
 911                    .stream(tx)
 912                    .await?;
 913                let mut updated_statuses = Vec::new();
 914                while let Some(status_entry) = repository_statuses.next().await {
 915                    let status_entry = status_entry?;
 916                    updated_statuses.push(db_status_to_proto(status_entry)?);
 917                }
 918
 919                let current_merge_conflicts = db_repository_entry
 920                    .current_merge_conflicts
 921                    .as_ref()
 922                    .map(|conflicts| serde_json::from_str(&conflicts))
 923                    .transpose()?
 924                    .unwrap_or_default();
 925
 926                let branch_summary = db_repository_entry
 927                    .branch_summary
 928                    .as_ref()
 929                    .map(|branch_summary| serde_json::from_str(&branch_summary))
 930                    .transpose()?
 931                    .unwrap_or_default();
 932
 933                let head_commit_details = db_repository_entry
 934                    .head_commit_details
 935                    .as_ref()
 936                    .map(|head_commit_details| serde_json::from_str(&head_commit_details))
 937                    .transpose()?
 938                    .unwrap_or_default();
 939
 940                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 941                    .context("failed to deserialize repository's entry ids")?;
 942
 943                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 944                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 945                        worktree.legacy_repository_entries.insert(
 946                            db_repository_entry.id as u64,
 947                            proto::RepositoryEntry {
 948                                repository_id: db_repository_entry.id as u64,
 949                                updated_statuses,
 950                                removed_statuses: Vec::new(),
 951                                current_merge_conflicts,
 952                                branch_summary,
 953                            },
 954                        );
 955                    }
 956                } else {
 957                    repositories.push(proto::UpdateRepository {
 958                        project_id: db_repository_entry.project_id.0 as u64,
 959                        id: db_repository_entry.id as u64,
 960                        abs_path: db_repository_entry.abs_path,
 961                        entry_ids,
 962                        updated_statuses,
 963                        removed_statuses: Vec::new(),
 964                        current_merge_conflicts,
 965                        branch_summary,
 966                        head_commit_details,
 967                        scan_id: db_repository_entry.scan_id as u64,
 968                        is_last_update: true,
 969                    });
 970                }
 971            }
 972        }
 973
 974        // Populate worktree diagnostic summaries.
 975        {
 976            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 977                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 978                .stream(tx)
 979                .await?;
 980            while let Some(db_summary) = db_summaries.next().await {
 981                let db_summary = db_summary?;
 982                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 983                    worktree
 984                        .diagnostic_summaries
 985                        .push(proto::DiagnosticSummary {
 986                            path: db_summary.path,
 987                            language_server_id: db_summary.language_server_id as u64,
 988                            error_count: db_summary.error_count as u32,
 989                            warning_count: db_summary.warning_count as u32,
 990                        });
 991                }
 992            }
 993        }
 994
 995        // Populate worktree settings files
 996        {
 997            let mut db_settings_files = worktree_settings_file::Entity::find()
 998                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 999                .stream(tx)
1000                .await?;
1001            while let Some(db_settings_file) = db_settings_files.next().await {
1002                let db_settings_file = db_settings_file?;
1003                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1004                    worktree.settings_files.push(WorktreeSettingsFile {
1005                        path: db_settings_file.path,
1006                        content: db_settings_file.content,
1007                        kind: db_settings_file.kind,
1008                    });
1009                }
1010            }
1011        }
1012
1013        // Populate language servers.
1014        let language_servers = project
1015            .find_related(language_server::Entity)
1016            .all(tx)
1017            .await?;
1018
1019        let project = Project {
1020            id: project.id,
1021            role,
1022            collaborators: collaborators
1023                .into_iter()
1024                .map(|collaborator| ProjectCollaborator {
1025                    connection_id: collaborator.connection(),
1026                    user_id: collaborator.user_id,
1027                    replica_id: collaborator.replica_id,
1028                    is_host: collaborator.is_host,
1029                })
1030                .collect(),
1031            worktrees,
1032            repositories,
1033            language_servers: language_servers
1034                .into_iter()
1035                .map(|language_server| proto::LanguageServer {
1036                    id: language_server.id as u64,
1037                    name: language_server.name,
1038                    worktree_id: None,
1039                })
1040                .collect(),
1041        };
1042        Ok((project, replica_id as ReplicaId))
1043    }
1044
1045    /// Removes the given connection from the specified project.
1046    pub async fn leave_project(
1047        &self,
1048        project_id: ProjectId,
1049        connection: ConnectionId,
1050    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1051        self.project_transaction(project_id, |tx| async move {
1052            let result = project_collaborator::Entity::delete_many()
1053                .filter(
1054                    Condition::all()
1055                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1056                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1057                        .add(
1058                            project_collaborator::Column::ConnectionServerId
1059                                .eq(connection.owner_id as i32),
1060                        ),
1061                )
1062                .exec(&*tx)
1063                .await?;
1064            if result.rows_affected == 0 {
1065                Err(anyhow!("not a collaborator on this project"))?;
1066            }
1067
1068            let project = project::Entity::find_by_id(project_id)
1069                .one(&*tx)
1070                .await?
1071                .context("no such project")?;
1072            let collaborators = project
1073                .find_related(project_collaborator::Entity)
1074                .all(&*tx)
1075                .await?;
1076            let connection_ids: Vec<ConnectionId> = collaborators
1077                .into_iter()
1078                .map(|collaborator| collaborator.connection())
1079                .collect();
1080
1081            follower::Entity::delete_many()
1082                .filter(
1083                    Condition::any()
1084                        .add(
1085                            Condition::all()
1086                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1087                                .add(
1088                                    follower::Column::LeaderConnectionServerId
1089                                        .eq(connection.owner_id),
1090                                )
1091                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1092                        )
1093                        .add(
1094                            Condition::all()
1095                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1096                                .add(
1097                                    follower::Column::FollowerConnectionServerId
1098                                        .eq(connection.owner_id),
1099                                )
1100                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1101                        ),
1102                )
1103                .exec(&*tx)
1104                .await?;
1105
1106            let room = if let Some(room_id) = project.room_id {
1107                Some(self.get_room(room_id, &tx).await?)
1108            } else {
1109                None
1110            };
1111
1112            let left_project = LeftProject {
1113                id: project_id,
1114                should_unshare: connection == project.host_connection()?,
1115                connection_ids,
1116            };
1117            Ok((room, left_project))
1118        })
1119        .await
1120    }
1121
1122    pub async fn check_user_is_project_host(
1123        &self,
1124        project_id: ProjectId,
1125        connection_id: ConnectionId,
1126    ) -> Result<()> {
1127        self.project_transaction(project_id, |tx| async move {
1128            project::Entity::find()
1129                .filter(
1130                    Condition::all()
1131                        .add(project::Column::Id.eq(project_id))
1132                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1133                        .add(
1134                            project::Column::HostConnectionServerId
1135                                .eq(Some(connection_id.owner_id as i32)),
1136                        ),
1137                )
1138                .one(&*tx)
1139                .await?
1140                .context("failed to read project host")?;
1141
1142            Ok(())
1143        })
1144        .await
1145        .map(|guard| guard.into_inner())
1146    }
1147
1148    /// Returns the current project if the given user is authorized to access it with the specified capability.
1149    pub async fn access_project(
1150        &self,
1151        project_id: ProjectId,
1152        connection_id: ConnectionId,
1153        capability: Capability,
1154        tx: &DatabaseTransaction,
1155    ) -> Result<(project::Model, ChannelRole)> {
1156        let project = project::Entity::find_by_id(project_id)
1157            .one(tx)
1158            .await?
1159            .context("no such project")?;
1160
1161        let role_from_room = if let Some(room_id) = project.room_id {
1162            room_participant::Entity::find()
1163                .filter(room_participant::Column::RoomId.eq(room_id))
1164                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1165                .one(tx)
1166                .await?
1167                .and_then(|participant| participant.role)
1168        } else {
1169            None
1170        };
1171
1172        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1173
1174        match capability {
1175            Capability::ReadWrite => {
1176                if !role.can_edit_projects() {
1177                    return Err(anyhow!("not authorized to edit projects"))?;
1178                }
1179            }
1180            Capability::ReadOnly => {
1181                if !role.can_read_projects() {
1182                    return Err(anyhow!("not authorized to read projects"))?;
1183                }
1184            }
1185        }
1186
1187        Ok((project, role))
1188    }
1189
1190    /// Returns the host connection for a read-only request to join a shared project.
1191    pub async fn host_for_read_only_project_request(
1192        &self,
1193        project_id: ProjectId,
1194        connection_id: ConnectionId,
1195    ) -> Result<ConnectionId> {
1196        self.project_transaction(project_id, |tx| async move {
1197            let (project, _) = self
1198                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1199                .await?;
1200            project.host_connection()
1201        })
1202        .await
1203        .map(|guard| guard.into_inner())
1204    }
1205
1206    /// Returns the host connection for a request to join a shared project.
1207    pub async fn host_for_mutating_project_request(
1208        &self,
1209        project_id: ProjectId,
1210        connection_id: ConnectionId,
1211    ) -> Result<ConnectionId> {
1212        self.project_transaction(project_id, |tx| async move {
1213            let (project, _) = self
1214                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1215                .await?;
1216            project.host_connection()
1217        })
1218        .await
1219        .map(|guard| guard.into_inner())
1220    }
1221
1222    pub async fn connections_for_buffer_update(
1223        &self,
1224        project_id: ProjectId,
1225        connection_id: ConnectionId,
1226        capability: Capability,
1227    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1228        self.project_transaction(project_id, |tx| async move {
1229            // Authorize
1230            let (project, _) = self
1231                .access_project(project_id, connection_id, capability, &tx)
1232                .await?;
1233
1234            let host_connection_id = project.host_connection()?;
1235
1236            let collaborators = project_collaborator::Entity::find()
1237                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1238                .all(&*tx)
1239                .await?;
1240
1241            let guest_connection_ids = collaborators
1242                .into_iter()
1243                .filter_map(|collaborator| {
1244                    if collaborator.is_host {
1245                        None
1246                    } else {
1247                        Some(collaborator.connection())
1248                    }
1249                })
1250                .collect();
1251
1252            Ok((host_connection_id, guest_connection_ids))
1253        })
1254        .await
1255    }
1256
1257    /// Returns the connection IDs in the given project.
1258    ///
1259    /// The provided `connection_id` must also be a collaborator in the project,
1260    /// otherwise an error will be returned.
1261    pub async fn project_connection_ids(
1262        &self,
1263        project_id: ProjectId,
1264        connection_id: ConnectionId,
1265        exclude_dev_server: bool,
1266    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1267        self.project_transaction(project_id, |tx| async move {
1268            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1269                .await
1270        })
1271        .await
1272    }
1273
1274    async fn internal_project_connection_ids(
1275        &self,
1276        project_id: ProjectId,
1277        connection_id: ConnectionId,
1278        exclude_dev_server: bool,
1279        tx: &DatabaseTransaction,
1280    ) -> Result<HashSet<ConnectionId>> {
1281        let project = project::Entity::find_by_id(project_id)
1282            .one(tx)
1283            .await?
1284            .context("no such project")?;
1285
1286        let mut collaborators = project_collaborator::Entity::find()
1287            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1288            .stream(tx)
1289            .await?;
1290
1291        let mut connection_ids = HashSet::default();
1292        if let Some(host_connection) = project.host_connection().log_err() {
1293            if !exclude_dev_server {
1294                connection_ids.insert(host_connection);
1295            }
1296        }
1297
1298        while let Some(collaborator) = collaborators.next().await {
1299            let collaborator = collaborator?;
1300            connection_ids.insert(collaborator.connection());
1301        }
1302
1303        if connection_ids.contains(&connection_id)
1304            || Some(connection_id) == project.host_connection().ok()
1305        {
1306            Ok(connection_ids)
1307        } else {
1308            Err(anyhow!(
1309                "can only send project updates to a project you're in"
1310            ))?
1311        }
1312    }
1313
1314    async fn project_guest_connection_ids(
1315        &self,
1316        project_id: ProjectId,
1317        tx: &DatabaseTransaction,
1318    ) -> Result<Vec<ConnectionId>> {
1319        let mut collaborators = project_collaborator::Entity::find()
1320            .filter(
1321                project_collaborator::Column::ProjectId
1322                    .eq(project_id)
1323                    .and(project_collaborator::Column::IsHost.eq(false)),
1324            )
1325            .stream(tx)
1326            .await?;
1327
1328        let mut guest_connection_ids = Vec::new();
1329        while let Some(collaborator) = collaborators.next().await {
1330            let collaborator = collaborator?;
1331            guest_connection_ids.push(collaborator.connection());
1332        }
1333        Ok(guest_connection_ids)
1334    }
1335
1336    /// Returns the [`RoomId`] for the given project.
1337    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1338        self.transaction(|tx| async move {
1339            Ok(project::Entity::find_by_id(project_id)
1340                .one(&*tx)
1341                .await?
1342                .and_then(|project| project.room_id))
1343        })
1344        .await
1345    }
1346
1347    pub async fn check_room_participants(
1348        &self,
1349        room_id: RoomId,
1350        leader_id: ConnectionId,
1351        follower_id: ConnectionId,
1352    ) -> Result<()> {
1353        self.transaction(|tx| async move {
1354            use room_participant::Column;
1355
1356            let count = room_participant::Entity::find()
1357                .filter(
1358                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1359                        Condition::any()
1360                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1361                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1362                            ))
1363                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1364                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1365                            )),
1366                    ),
1367                )
1368                .count(&*tx)
1369                .await?;
1370
1371            if count < 2 {
1372                Err(anyhow!("not room participants"))?;
1373            }
1374
1375            Ok(())
1376        })
1377        .await
1378    }
1379
1380    /// Adds the given follower connection as a follower of the given leader connection.
1381    pub async fn follow(
1382        &self,
1383        room_id: RoomId,
1384        project_id: ProjectId,
1385        leader_connection: ConnectionId,
1386        follower_connection: ConnectionId,
1387    ) -> Result<TransactionGuard<proto::Room>> {
1388        self.room_transaction(room_id, |tx| async move {
1389            follower::ActiveModel {
1390                room_id: ActiveValue::set(room_id),
1391                project_id: ActiveValue::set(project_id),
1392                leader_connection_server_id: ActiveValue::set(ServerId(
1393                    leader_connection.owner_id as i32,
1394                )),
1395                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1396                follower_connection_server_id: ActiveValue::set(ServerId(
1397                    follower_connection.owner_id as i32,
1398                )),
1399                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1400                ..Default::default()
1401            }
1402            .insert(&*tx)
1403            .await?;
1404
1405            let room = self.get_room(room_id, &tx).await?;
1406            Ok(room)
1407        })
1408        .await
1409    }
1410
1411    /// Removes the given follower connection as a follower of the given leader connection.
1412    pub async fn unfollow(
1413        &self,
1414        room_id: RoomId,
1415        project_id: ProjectId,
1416        leader_connection: ConnectionId,
1417        follower_connection: ConnectionId,
1418    ) -> Result<TransactionGuard<proto::Room>> {
1419        self.room_transaction(room_id, |tx| async move {
1420            follower::Entity::delete_many()
1421                .filter(
1422                    Condition::all()
1423                        .add(follower::Column::RoomId.eq(room_id))
1424                        .add(follower::Column::ProjectId.eq(project_id))
1425                        .add(
1426                            follower::Column::LeaderConnectionServerId
1427                                .eq(leader_connection.owner_id),
1428                        )
1429                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1430                        .add(
1431                            follower::Column::FollowerConnectionServerId
1432                                .eq(follower_connection.owner_id),
1433                        )
1434                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1435                )
1436                .exec(&*tx)
1437                .await?;
1438
1439            let room = self.get_room(room_id, &tx).await?;
1440            Ok(room)
1441        })
1442        .await
1443    }
1444}