projects.rs

   1use anyhow::Context as _;
   2use collections::HashSet;
   3use util::ResultExt;
   4
   5use super::*;
   6
   7impl Database {
   8    /// Returns the count of all projects, excluding ones marked as admin.
   9    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
  10        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
  11        enum QueryAs {
  12            Count,
  13        }
  14
  15        self.transaction(|tx| async move {
  16            Ok(project::Entity::find()
  17                .select_only()
  18                .column_as(project::Column::Id.count(), QueryAs::Count)
  19                .inner_join(user::Entity)
  20                .filter(user::Column::Admin.eq(false))
  21                .into_values::<_, QueryAs>()
  22                .one(&*tx)
  23                .await?
  24                .unwrap_or(0i64) as usize)
  25        })
  26        .await
  27    }
  28
  29    /// Shares a project with the given room.
  30    pub async fn share_project(
  31        &self,
  32        room_id: RoomId,
  33        connection: ConnectionId,
  34        worktrees: &[proto::WorktreeMetadata],
  35        is_ssh_project: bool,
  36    ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
  37        self.room_transaction(room_id, |tx| async move {
  38            let participant = room_participant::Entity::find()
  39                .filter(
  40                    Condition::all()
  41                        .add(
  42                            room_participant::Column::AnsweringConnectionId
  43                                .eq(connection.id as i32),
  44                        )
  45                        .add(
  46                            room_participant::Column::AnsweringConnectionServerId
  47                                .eq(connection.owner_id as i32),
  48                        ),
  49                )
  50                .one(&*tx)
  51                .await?
  52                .ok_or_else(|| anyhow!("could not find participant"))?;
  53            if participant.room_id != room_id {
  54                return Err(anyhow!("shared project on unexpected room"))?;
  55            }
  56            if !participant
  57                .role
  58                .unwrap_or(ChannelRole::Member)
  59                .can_edit_projects()
  60            {
  61                return Err(anyhow!("guests cannot share projects"))?;
  62            }
  63
  64            let project = project::ActiveModel {
  65                room_id: ActiveValue::set(Some(participant.room_id)),
  66                host_user_id: ActiveValue::set(Some(participant.user_id)),
  67                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
  68                host_connection_server_id: ActiveValue::set(Some(ServerId(
  69                    connection.owner_id as i32,
  70                ))),
  71                id: ActiveValue::NotSet,
  72            }
  73            .insert(&*tx)
  74            .await?;
  75
  76            if !worktrees.is_empty() {
  77                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
  78                    worktree::ActiveModel {
  79                        id: ActiveValue::set(worktree.id as i64),
  80                        project_id: ActiveValue::set(project.id),
  81                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
  82                        root_name: ActiveValue::set(worktree.root_name.clone()),
  83                        visible: ActiveValue::set(worktree.visible),
  84                        scan_id: ActiveValue::set(0),
  85                        completed_scan_id: ActiveValue::set(0),
  86                    }
  87                }))
  88                .exec(&*tx)
  89                .await?;
  90            }
  91
  92            let replica_id = if is_ssh_project { 1 } else { 0 };
  93
  94            project_collaborator::ActiveModel {
  95                project_id: ActiveValue::set(project.id),
  96                connection_id: ActiveValue::set(connection.id as i32),
  97                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
  98                user_id: ActiveValue::set(participant.user_id),
  99                replica_id: ActiveValue::set(ReplicaId(replica_id)),
 100                is_host: ActiveValue::set(true),
 101                ..Default::default()
 102            }
 103            .insert(&*tx)
 104            .await?;
 105
 106            let room = self.get_room(room_id, &tx).await?;
 107            Ok((project.id, room))
 108        })
 109        .await
 110    }
 111
 112    pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
 113        self.weak_transaction(|tx| async move {
 114            project::Entity::delete_by_id(project_id).exec(&*tx).await?;
 115            Ok(())
 116        })
 117        .await
 118    }
 119
 120    /// Unshares the given project.
 121    pub async fn unshare_project(
 122        &self,
 123        project_id: ProjectId,
 124        connection: ConnectionId,
 125    ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
 126        self.project_transaction(project_id, |tx| async move {
 127            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 128            let project = project::Entity::find_by_id(project_id)
 129                .one(&*tx)
 130                .await?
 131                .ok_or_else(|| anyhow!("project not found"))?;
 132            let room = if let Some(room_id) = project.room_id {
 133                Some(self.get_room(room_id, &tx).await?)
 134            } else {
 135                None
 136            };
 137            if project.host_connection()? == connection {
 138                return Ok((true, room, guest_connection_ids));
 139            }
 140            Err(anyhow!("cannot unshare a project hosted by another user"))?
 141        })
 142        .await
 143    }
 144
 145    /// Updates the worktrees associated with the given project.
 146    pub async fn update_project(
 147        &self,
 148        project_id: ProjectId,
 149        connection: ConnectionId,
 150        worktrees: &[proto::WorktreeMetadata],
 151    ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
 152        self.project_transaction(project_id, |tx| async move {
 153            let project = project::Entity::find_by_id(project_id)
 154                .filter(
 155                    Condition::all()
 156                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 157                        .add(
 158                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 159                        ),
 160                )
 161                .one(&*tx)
 162                .await?
 163                .ok_or_else(|| anyhow!("no such project"))?;
 164
 165            self.update_project_worktrees(project.id, worktrees, &tx)
 166                .await?;
 167
 168            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
 169
 170            let room = if let Some(room_id) = project.room_id {
 171                Some(self.get_room(room_id, &tx).await?)
 172            } else {
 173                None
 174            };
 175
 176            Ok((room, guest_connection_ids))
 177        })
 178        .await
 179    }
 180
 181    pub(in crate::db) async fn update_project_worktrees(
 182        &self,
 183        project_id: ProjectId,
 184        worktrees: &[proto::WorktreeMetadata],
 185        tx: &DatabaseTransaction,
 186    ) -> Result<()> {
 187        if !worktrees.is_empty() {
 188            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
 189                id: ActiveValue::set(worktree.id as i64),
 190                project_id: ActiveValue::set(project_id),
 191                abs_path: ActiveValue::set(worktree.abs_path.clone()),
 192                root_name: ActiveValue::set(worktree.root_name.clone()),
 193                visible: ActiveValue::set(worktree.visible),
 194                scan_id: ActiveValue::set(0),
 195                completed_scan_id: ActiveValue::set(0),
 196            }))
 197            .on_conflict(
 198                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
 199                    .update_column(worktree::Column::RootName)
 200                    .to_owned(),
 201            )
 202            .exec(tx)
 203            .await?;
 204        }
 205
 206        worktree::Entity::delete_many()
 207            .filter(worktree::Column::ProjectId.eq(project_id).and(
 208                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
 209            ))
 210            .exec(tx)
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    pub async fn update_worktree(
 217        &self,
 218        update: &proto::UpdateWorktree,
 219        connection: ConnectionId,
 220    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 221        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 222            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
 223        {
 224            return Err(anyhow!(
 225                "invalid worktree update. removed entries: {}, updated entries: {}",
 226                update.removed_entries.len(),
 227                update.updated_entries.len()
 228            ))?;
 229        }
 230
 231        let project_id = ProjectId::from_proto(update.project_id);
 232        let worktree_id = update.worktree_id as i64;
 233        self.project_transaction(project_id, |tx| async move {
 234            // Ensure the update comes from the host.
 235            let _project = project::Entity::find_by_id(project_id)
 236                .filter(
 237                    Condition::all()
 238                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
 239                        .add(
 240                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
 241                        ),
 242                )
 243                .one(&*tx)
 244                .await?
 245                .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
 246
 247            // Update metadata.
 248            worktree::Entity::update(worktree::ActiveModel {
 249                id: ActiveValue::set(worktree_id),
 250                project_id: ActiveValue::set(project_id),
 251                root_name: ActiveValue::set(update.root_name.clone()),
 252                scan_id: ActiveValue::set(update.scan_id as i64),
 253                completed_scan_id: if update.is_last_update {
 254                    ActiveValue::set(update.scan_id as i64)
 255                } else {
 256                    ActiveValue::default()
 257                },
 258                abs_path: ActiveValue::set(update.abs_path.clone()),
 259                ..Default::default()
 260            })
 261            .exec(&*tx)
 262            .await?;
 263
 264            if !update.updated_entries.is_empty() {
 265                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
 266                    let mtime = entry.mtime.clone().unwrap_or_default();
 267                    worktree_entry::ActiveModel {
 268                        project_id: ActiveValue::set(project_id),
 269                        worktree_id: ActiveValue::set(worktree_id),
 270                        id: ActiveValue::set(entry.id as i64),
 271                        is_dir: ActiveValue::set(entry.is_dir),
 272                        path: ActiveValue::set(entry.path.clone()),
 273                        inode: ActiveValue::set(entry.inode as i64),
 274                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
 275                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
 276                        canonical_path: ActiveValue::set(entry.canonical_path.clone()),
 277                        is_ignored: ActiveValue::set(entry.is_ignored),
 278                        git_status: ActiveValue::set(None),
 279                        is_external: ActiveValue::set(entry.is_external),
 280                        is_deleted: ActiveValue::set(false),
 281                        scan_id: ActiveValue::set(update.scan_id as i64),
 282                        is_fifo: ActiveValue::set(entry.is_fifo),
 283                    }
 284                }))
 285                .on_conflict(
 286                    OnConflict::columns([
 287                        worktree_entry::Column::ProjectId,
 288                        worktree_entry::Column::WorktreeId,
 289                        worktree_entry::Column::Id,
 290                    ])
 291                    .update_columns([
 292                        worktree_entry::Column::IsDir,
 293                        worktree_entry::Column::Path,
 294                        worktree_entry::Column::Inode,
 295                        worktree_entry::Column::MtimeSeconds,
 296                        worktree_entry::Column::MtimeNanos,
 297                        worktree_entry::Column::CanonicalPath,
 298                        worktree_entry::Column::IsIgnored,
 299                        worktree_entry::Column::ScanId,
 300                    ])
 301                    .to_owned(),
 302                )
 303                .exec(&*tx)
 304                .await?;
 305            }
 306
 307            if !update.removed_entries.is_empty() {
 308                worktree_entry::Entity::update_many()
 309                    .filter(
 310                        worktree_entry::Column::ProjectId
 311                            .eq(project_id)
 312                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
 313                            .and(
 314                                worktree_entry::Column::Id
 315                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
 316                            ),
 317                    )
 318                    .set(worktree_entry::ActiveModel {
 319                        is_deleted: ActiveValue::Set(true),
 320                        scan_id: ActiveValue::Set(update.scan_id as i64),
 321                        ..Default::default()
 322                    })
 323                    .exec(&*tx)
 324                    .await?;
 325            }
 326
 327            // Backward-compatibility for old Zed clients.
 328            //
 329            // Remove this block when Zed 1.80 stable has been out for a week.
 330            {
 331                if !update.updated_repositories.is_empty() {
 332                    project_repository::Entity::insert_many(
 333                        update.updated_repositories.iter().map(|repository| {
 334                            project_repository::ActiveModel {
 335                                project_id: ActiveValue::set(project_id),
 336                                legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
 337                                id: ActiveValue::set(repository.repository_id as i64),
 338                                scan_id: ActiveValue::set(update.scan_id as i64),
 339                                is_deleted: ActiveValue::set(false),
 340                                branch_summary: ActiveValue::Set(
 341                                    repository
 342                                        .branch_summary
 343                                        .as_ref()
 344                                        .map(|summary| serde_json::to_string(summary).unwrap()),
 345                                ),
 346                                current_merge_conflicts: ActiveValue::Set(Some(
 347                                    serde_json::to_string(&repository.current_merge_conflicts)
 348                                        .unwrap(),
 349                                )),
 350
 351                                // Old clients do not use abs path or entry ids.
 352                                abs_path: ActiveValue::set(String::new()),
 353                                entry_ids: ActiveValue::set("[]".into()),
 354                            }
 355                        }),
 356                    )
 357                    .on_conflict(
 358                        OnConflict::columns([
 359                            project_repository::Column::ProjectId,
 360                            project_repository::Column::Id,
 361                        ])
 362                        .update_columns([
 363                            project_repository::Column::ScanId,
 364                            project_repository::Column::BranchSummary,
 365                            project_repository::Column::CurrentMergeConflicts,
 366                        ])
 367                        .to_owned(),
 368                    )
 369                    .exec(&*tx)
 370                    .await?;
 371
 372                    let has_any_statuses = update
 373                        .updated_repositories
 374                        .iter()
 375                        .any(|repository| !repository.updated_statuses.is_empty());
 376
 377                    if has_any_statuses {
 378                        project_repository_statuses::Entity::insert_many(
 379                            update.updated_repositories.iter().flat_map(
 380                                |repository: &proto::RepositoryEntry| {
 381                                    repository.updated_statuses.iter().map(|status_entry| {
 382                                        let (repo_path, status_kind, first_status, second_status) =
 383                                            proto_status_to_db(status_entry.clone());
 384                                        project_repository_statuses::ActiveModel {
 385                                            project_id: ActiveValue::set(project_id),
 386                                            repository_id: ActiveValue::set(
 387                                                repository.repository_id as i64,
 388                                            ),
 389                                            scan_id: ActiveValue::set(update.scan_id as i64),
 390                                            is_deleted: ActiveValue::set(false),
 391                                            repo_path: ActiveValue::set(repo_path),
 392                                            status: ActiveValue::set(0),
 393                                            status_kind: ActiveValue::set(status_kind),
 394                                            first_status: ActiveValue::set(first_status),
 395                                            second_status: ActiveValue::set(second_status),
 396                                        }
 397                                    })
 398                                },
 399                            ),
 400                        )
 401                        .on_conflict(
 402                            OnConflict::columns([
 403                                project_repository_statuses::Column::ProjectId,
 404                                project_repository_statuses::Column::RepositoryId,
 405                                project_repository_statuses::Column::RepoPath,
 406                            ])
 407                            .update_columns([
 408                                project_repository_statuses::Column::ScanId,
 409                                project_repository_statuses::Column::StatusKind,
 410                                project_repository_statuses::Column::FirstStatus,
 411                                project_repository_statuses::Column::SecondStatus,
 412                            ])
 413                            .to_owned(),
 414                        )
 415                        .exec(&*tx)
 416                        .await?;
 417                    }
 418
 419                    for repo in &update.updated_repositories {
 420                        if !repo.removed_statuses.is_empty() {
 421                            project_repository_statuses::Entity::update_many()
 422                                .filter(
 423                                    project_repository_statuses::Column::ProjectId
 424                                        .eq(project_id)
 425                                        .and(
 426                                            project_repository_statuses::Column::RepositoryId
 427                                                .eq(repo.repository_id),
 428                                        )
 429                                        .and(
 430                                            project_repository_statuses::Column::RepoPath
 431                                                .is_in(repo.removed_statuses.iter()),
 432                                        ),
 433                                )
 434                                .set(project_repository_statuses::ActiveModel {
 435                                    is_deleted: ActiveValue::Set(true),
 436                                    scan_id: ActiveValue::Set(update.scan_id as i64),
 437                                    ..Default::default()
 438                                })
 439                                .exec(&*tx)
 440                                .await?;
 441                        }
 442                    }
 443                }
 444
 445                if !update.removed_repositories.is_empty() {
 446                    project_repository::Entity::update_many()
 447                        .filter(
 448                            project_repository::Column::ProjectId
 449                                .eq(project_id)
 450                                .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
 451                                .and(project_repository::Column::Id.is_in(
 452                                    update.removed_repositories.iter().map(|id| *id as i64),
 453                                )),
 454                        )
 455                        .set(project_repository::ActiveModel {
 456                            is_deleted: ActiveValue::Set(true),
 457                            scan_id: ActiveValue::Set(update.scan_id as i64),
 458                            ..Default::default()
 459                        })
 460                        .exec(&*tx)
 461                        .await?;
 462                }
 463            }
 464
 465            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 466            Ok(connection_ids)
 467        })
 468        .await
 469    }
 470
 471    pub async fn update_repository(
 472        &self,
 473        update: &proto::UpdateRepository,
 474        _connection: ConnectionId,
 475    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 476        let project_id = ProjectId::from_proto(update.project_id);
 477        let repository_id = update.id as i64;
 478        self.project_transaction(project_id, |tx| async move {
 479            project_repository::Entity::insert(project_repository::ActiveModel {
 480                project_id: ActiveValue::set(project_id),
 481                id: ActiveValue::set(repository_id),
 482                legacy_worktree_id: ActiveValue::set(None),
 483                abs_path: ActiveValue::set(update.abs_path.clone()),
 484                entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
 485                scan_id: ActiveValue::set(update.scan_id as i64),
 486                is_deleted: ActiveValue::set(false),
 487                branch_summary: ActiveValue::Set(
 488                    update
 489                        .branch_summary
 490                        .as_ref()
 491                        .map(|summary| serde_json::to_string(summary).unwrap()),
 492                ),
 493                current_merge_conflicts: ActiveValue::Set(Some(
 494                    serde_json::to_string(&update.current_merge_conflicts).unwrap(),
 495                )),
 496            })
 497            .on_conflict(
 498                OnConflict::columns([
 499                    project_repository::Column::ProjectId,
 500                    project_repository::Column::Id,
 501                ])
 502                .update_columns([
 503                    project_repository::Column::ScanId,
 504                    project_repository::Column::BranchSummary,
 505                    project_repository::Column::EntryIds,
 506                    project_repository::Column::AbsPath,
 507                    project_repository::Column::CurrentMergeConflicts,
 508                ])
 509                .to_owned(),
 510            )
 511            .exec(&*tx)
 512            .await?;
 513
 514            let has_any_statuses = !update.updated_statuses.is_empty();
 515
 516            if has_any_statuses {
 517                project_repository_statuses::Entity::insert_many(
 518                    update.updated_statuses.iter().map(|status_entry| {
 519                        let (repo_path, status_kind, first_status, second_status) =
 520                            proto_status_to_db(status_entry.clone());
 521                        project_repository_statuses::ActiveModel {
 522                            project_id: ActiveValue::set(project_id),
 523                            repository_id: ActiveValue::set(repository_id),
 524                            scan_id: ActiveValue::set(update.scan_id as i64),
 525                            is_deleted: ActiveValue::set(false),
 526                            repo_path: ActiveValue::set(repo_path),
 527                            status: ActiveValue::set(0),
 528                            status_kind: ActiveValue::set(status_kind),
 529                            first_status: ActiveValue::set(first_status),
 530                            second_status: ActiveValue::set(second_status),
 531                        }
 532                    }),
 533                )
 534                .on_conflict(
 535                    OnConflict::columns([
 536                        project_repository_statuses::Column::ProjectId,
 537                        project_repository_statuses::Column::RepositoryId,
 538                        project_repository_statuses::Column::RepoPath,
 539                    ])
 540                    .update_columns([
 541                        project_repository_statuses::Column::ScanId,
 542                        project_repository_statuses::Column::StatusKind,
 543                        project_repository_statuses::Column::FirstStatus,
 544                        project_repository_statuses::Column::SecondStatus,
 545                    ])
 546                    .to_owned(),
 547                )
 548                .exec(&*tx)
 549                .await?;
 550            }
 551
 552            let has_any_removed_statuses = !update.removed_statuses.is_empty();
 553
 554            if has_any_removed_statuses {
 555                project_repository_statuses::Entity::update_many()
 556                    .filter(
 557                        project_repository_statuses::Column::ProjectId
 558                            .eq(project_id)
 559                            .and(
 560                                project_repository_statuses::Column::RepositoryId.eq(repository_id),
 561                            )
 562                            .and(
 563                                project_repository_statuses::Column::RepoPath
 564                                    .is_in(update.removed_statuses.iter()),
 565                            ),
 566                    )
 567                    .set(project_repository_statuses::ActiveModel {
 568                        is_deleted: ActiveValue::Set(true),
 569                        scan_id: ActiveValue::Set(update.scan_id as i64),
 570                        ..Default::default()
 571                    })
 572                    .exec(&*tx)
 573                    .await?;
 574            }
 575
 576            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 577            Ok(connection_ids)
 578        })
 579        .await
 580    }
 581
 582    pub async fn remove_repository(
 583        &self,
 584        remove: &proto::RemoveRepository,
 585        _connection: ConnectionId,
 586    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 587        let project_id = ProjectId::from_proto(remove.project_id);
 588        let repository_id = remove.id as i64;
 589        self.project_transaction(project_id, |tx| async move {
 590            project_repository::Entity::update_many()
 591                .filter(
 592                    project_repository::Column::ProjectId
 593                        .eq(project_id)
 594                        .and(project_repository::Column::Id.eq(repository_id)),
 595                )
 596                .set(project_repository::ActiveModel {
 597                    is_deleted: ActiveValue::Set(true),
 598                    // scan_id: ActiveValue::Set(update.scan_id as i64),
 599                    ..Default::default()
 600                })
 601                .exec(&*tx)
 602                .await?;
 603
 604            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 605            Ok(connection_ids)
 606        })
 607        .await
 608    }
 609
 610    /// Updates the diagnostic summary for the given connection.
 611    pub async fn update_diagnostic_summary(
 612        &self,
 613        update: &proto::UpdateDiagnosticSummary,
 614        connection: ConnectionId,
 615    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 616        let project_id = ProjectId::from_proto(update.project_id);
 617        let worktree_id = update.worktree_id as i64;
 618        self.project_transaction(project_id, |tx| async move {
 619            let summary = update
 620                .summary
 621                .as_ref()
 622                .ok_or_else(|| anyhow!("invalid summary"))?;
 623
 624            // Ensure the update comes from the host.
 625            let project = project::Entity::find_by_id(project_id)
 626                .one(&*tx)
 627                .await?
 628                .ok_or_else(|| anyhow!("no such project"))?;
 629            if project.host_connection()? != connection {
 630                return Err(anyhow!("can't update a project hosted by someone else"))?;
 631            }
 632
 633            // Update summary.
 634            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
 635                project_id: ActiveValue::set(project_id),
 636                worktree_id: ActiveValue::set(worktree_id),
 637                path: ActiveValue::set(summary.path.clone()),
 638                language_server_id: ActiveValue::set(summary.language_server_id as i64),
 639                error_count: ActiveValue::set(summary.error_count as i32),
 640                warning_count: ActiveValue::set(summary.warning_count as i32),
 641            })
 642            .on_conflict(
 643                OnConflict::columns([
 644                    worktree_diagnostic_summary::Column::ProjectId,
 645                    worktree_diagnostic_summary::Column::WorktreeId,
 646                    worktree_diagnostic_summary::Column::Path,
 647                ])
 648                .update_columns([
 649                    worktree_diagnostic_summary::Column::LanguageServerId,
 650                    worktree_diagnostic_summary::Column::ErrorCount,
 651                    worktree_diagnostic_summary::Column::WarningCount,
 652                ])
 653                .to_owned(),
 654            )
 655            .exec(&*tx)
 656            .await?;
 657
 658            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 659            Ok(connection_ids)
 660        })
 661        .await
 662    }
 663
 664    /// Starts the language server for the given connection.
 665    pub async fn start_language_server(
 666        &self,
 667        update: &proto::StartLanguageServer,
 668        connection: ConnectionId,
 669    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 670        let project_id = ProjectId::from_proto(update.project_id);
 671        self.project_transaction(project_id, |tx| async move {
 672            let server = update
 673                .server
 674                .as_ref()
 675                .ok_or_else(|| anyhow!("invalid language server"))?;
 676
 677            // Ensure the update comes from the host.
 678            let project = project::Entity::find_by_id(project_id)
 679                .one(&*tx)
 680                .await?
 681                .ok_or_else(|| anyhow!("no such project"))?;
 682            if project.host_connection()? != connection {
 683                return Err(anyhow!("can't update a project hosted by someone else"))?;
 684            }
 685
 686            // Add the newly-started language server.
 687            language_server::Entity::insert(language_server::ActiveModel {
 688                project_id: ActiveValue::set(project_id),
 689                id: ActiveValue::set(server.id as i64),
 690                name: ActiveValue::set(server.name.clone()),
 691            })
 692            .on_conflict(
 693                OnConflict::columns([
 694                    language_server::Column::ProjectId,
 695                    language_server::Column::Id,
 696                ])
 697                .update_column(language_server::Column::Name)
 698                .to_owned(),
 699            )
 700            .exec(&*tx)
 701            .await?;
 702
 703            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 704            Ok(connection_ids)
 705        })
 706        .await
 707    }
 708
 709    /// Updates the worktree settings for the given connection.
 710    pub async fn update_worktree_settings(
 711        &self,
 712        update: &proto::UpdateWorktreeSettings,
 713        connection: ConnectionId,
 714    ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
 715        let project_id = ProjectId::from_proto(update.project_id);
 716        let kind = match update.kind {
 717            Some(kind) => proto::LocalSettingsKind::from_i32(kind)
 718                .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
 719            None => proto::LocalSettingsKind::Settings,
 720        };
 721        let kind = LocalSettingsKind::from_proto(kind);
 722        self.project_transaction(project_id, |tx| async move {
 723            // Ensure the update comes from the host.
 724            let project = project::Entity::find_by_id(project_id)
 725                .one(&*tx)
 726                .await?
 727                .ok_or_else(|| anyhow!("no such project"))?;
 728            if project.host_connection()? != connection {
 729                return Err(anyhow!("can't update a project hosted by someone else"))?;
 730            }
 731
 732            if let Some(content) = &update.content {
 733                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
 734                    project_id: ActiveValue::Set(project_id),
 735                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 736                    path: ActiveValue::Set(update.path.clone()),
 737                    content: ActiveValue::Set(content.clone()),
 738                    kind: ActiveValue::Set(kind),
 739                })
 740                .on_conflict(
 741                    OnConflict::columns([
 742                        worktree_settings_file::Column::ProjectId,
 743                        worktree_settings_file::Column::WorktreeId,
 744                        worktree_settings_file::Column::Path,
 745                    ])
 746                    .update_column(worktree_settings_file::Column::Content)
 747                    .to_owned(),
 748                )
 749                .exec(&*tx)
 750                .await?;
 751            } else {
 752                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
 753                    project_id: ActiveValue::Set(project_id),
 754                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
 755                    path: ActiveValue::Set(update.path.clone()),
 756                    ..Default::default()
 757                })
 758                .exec(&*tx)
 759                .await?;
 760            }
 761
 762            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 763            Ok(connection_ids)
 764        })
 765        .await
 766    }
 767
 768    pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
 769        self.transaction(|tx| async move {
 770            Ok(project::Entity::find_by_id(id)
 771                .one(&*tx)
 772                .await?
 773                .ok_or_else(|| anyhow!("no such project"))?)
 774        })
 775        .await
 776    }
 777
 778    /// Adds the given connection to the specified project
 779    /// in the current room.
 780    pub async fn join_project(
 781        &self,
 782        project_id: ProjectId,
 783        connection: ConnectionId,
 784        user_id: UserId,
 785    ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
 786        self.project_transaction(project_id, |tx| async move {
 787            let (project, role) = self
 788                .access_project(project_id, connection, Capability::ReadOnly, &tx)
 789                .await?;
 790            self.join_project_internal(project, user_id, connection, role, &tx)
 791                .await
 792        })
 793        .await
 794    }
 795
 796    async fn join_project_internal(
 797        &self,
 798        project: project::Model,
 799        user_id: UserId,
 800        connection: ConnectionId,
 801        role: ChannelRole,
 802        tx: &DatabaseTransaction,
 803    ) -> Result<(Project, ReplicaId)> {
 804        let mut collaborators = project
 805            .find_related(project_collaborator::Entity)
 806            .all(tx)
 807            .await?;
 808        let replica_ids = collaborators
 809            .iter()
 810            .map(|c| c.replica_id)
 811            .collect::<HashSet<_>>();
 812        let mut replica_id = ReplicaId(1);
 813        while replica_ids.contains(&replica_id) {
 814            replica_id.0 += 1;
 815        }
 816        let new_collaborator = project_collaborator::ActiveModel {
 817            project_id: ActiveValue::set(project.id),
 818            connection_id: ActiveValue::set(connection.id as i32),
 819            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 820            user_id: ActiveValue::set(user_id),
 821            replica_id: ActiveValue::set(replica_id),
 822            is_host: ActiveValue::set(false),
 823            ..Default::default()
 824        }
 825        .insert(tx)
 826        .await?;
 827        collaborators.push(new_collaborator);
 828
 829        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 830        let mut worktrees = db_worktrees
 831            .into_iter()
 832            .map(|db_worktree| {
 833                (
 834                    db_worktree.id as u64,
 835                    Worktree {
 836                        id: db_worktree.id as u64,
 837                        abs_path: db_worktree.abs_path,
 838                        root_name: db_worktree.root_name,
 839                        visible: db_worktree.visible,
 840                        entries: Default::default(),
 841                        diagnostic_summaries: Default::default(),
 842                        settings_files: Default::default(),
 843                        scan_id: db_worktree.scan_id as u64,
 844                        completed_scan_id: db_worktree.completed_scan_id as u64,
 845                        legacy_repository_entries: Default::default(),
 846                    },
 847                )
 848            })
 849            .collect::<BTreeMap<_, _>>();
 850
 851        // Populate worktree entries.
 852        {
 853            let mut db_entries = worktree_entry::Entity::find()
 854                .filter(
 855                    Condition::all()
 856                        .add(worktree_entry::Column::ProjectId.eq(project.id))
 857                        .add(worktree_entry::Column::IsDeleted.eq(false)),
 858                )
 859                .stream(tx)
 860                .await?;
 861            while let Some(db_entry) = db_entries.next().await {
 862                let db_entry = db_entry?;
 863                if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
 864                    worktree.entries.push(proto::Entry {
 865                        id: db_entry.id as u64,
 866                        is_dir: db_entry.is_dir,
 867                        path: db_entry.path,
 868                        inode: db_entry.inode as u64,
 869                        mtime: Some(proto::Timestamp {
 870                            seconds: db_entry.mtime_seconds as u64,
 871                            nanos: db_entry.mtime_nanos as u32,
 872                        }),
 873                        canonical_path: db_entry.canonical_path,
 874                        is_ignored: db_entry.is_ignored,
 875                        is_external: db_entry.is_external,
 876                        // This is only used in the summarization backlog, so if it's None,
 877                        // that just means we won't be able to detect when to resummarize
 878                        // based on total number of backlogged bytes - instead, we'd go
 879                        // on number of files only. That shouldn't be a huge deal in practice.
 880                        size: None,
 881                        is_fifo: db_entry.is_fifo,
 882                    });
 883                }
 884            }
 885        }
 886
 887        // Populate repository entries.
 888        let mut repositories = Vec::new();
 889        {
 890            let db_repository_entries = project_repository::Entity::find()
 891                .filter(
 892                    Condition::all()
 893                        .add(project_repository::Column::ProjectId.eq(project.id))
 894                        .add(project_repository::Column::IsDeleted.eq(false)),
 895                )
 896                .all(tx)
 897                .await?;
 898            for db_repository_entry in db_repository_entries {
 899                let mut repository_statuses = project_repository_statuses::Entity::find()
 900                    .filter(
 901                        Condition::all()
 902                            .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 903                            .add(
 904                                project_repository_statuses::Column::RepositoryId
 905                                    .eq(db_repository_entry.id),
 906                            )
 907                            .add(project_repository_statuses::Column::IsDeleted.eq(false)),
 908                    )
 909                    .stream(tx)
 910                    .await?;
 911                let mut updated_statuses = Vec::new();
 912                while let Some(status_entry) = repository_statuses.next().await {
 913                    let status_entry = status_entry?;
 914                    updated_statuses.push(db_status_to_proto(status_entry)?);
 915                }
 916
 917                let current_merge_conflicts = db_repository_entry
 918                    .current_merge_conflicts
 919                    .as_ref()
 920                    .map(|conflicts| serde_json::from_str(&conflicts))
 921                    .transpose()?
 922                    .unwrap_or_default();
 923
 924                let branch_summary = db_repository_entry
 925                    .branch_summary
 926                    .as_ref()
 927                    .map(|branch_summary| serde_json::from_str(&branch_summary))
 928                    .transpose()?
 929                    .unwrap_or_default();
 930
 931                let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
 932                    .context("failed to deserialize repository's entry ids")?;
 933
 934                if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
 935                    if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
 936                        worktree.legacy_repository_entries.insert(
 937                            db_repository_entry.id as u64,
 938                            proto::RepositoryEntry {
 939                                repository_id: db_repository_entry.id as u64,
 940                                updated_statuses,
 941                                removed_statuses: Vec::new(),
 942                                current_merge_conflicts,
 943                                branch_summary,
 944                            },
 945                        );
 946                    }
 947                } else {
 948                    repositories.push(proto::UpdateRepository {
 949                        project_id: db_repository_entry.project_id.0 as u64,
 950                        id: db_repository_entry.id as u64,
 951                        abs_path: db_repository_entry.abs_path,
 952                        entry_ids,
 953                        updated_statuses,
 954                        removed_statuses: Vec::new(),
 955                        current_merge_conflicts,
 956                        branch_summary,
 957                        scan_id: db_repository_entry.scan_id as u64,
 958                        is_last_update: true,
 959                    });
 960                }
 961            }
 962        }
 963
 964        // Populate worktree diagnostic summaries.
 965        {
 966            let mut db_summaries = worktree_diagnostic_summary::Entity::find()
 967                .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
 968                .stream(tx)
 969                .await?;
 970            while let Some(db_summary) = db_summaries.next().await {
 971                let db_summary = db_summary?;
 972                if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
 973                    worktree
 974                        .diagnostic_summaries
 975                        .push(proto::DiagnosticSummary {
 976                            path: db_summary.path,
 977                            language_server_id: db_summary.language_server_id as u64,
 978                            error_count: db_summary.error_count as u32,
 979                            warning_count: db_summary.warning_count as u32,
 980                        });
 981                }
 982            }
 983        }
 984
 985        // Populate worktree settings files
 986        {
 987            let mut db_settings_files = worktree_settings_file::Entity::find()
 988                .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
 989                .stream(tx)
 990                .await?;
 991            while let Some(db_settings_file) = db_settings_files.next().await {
 992                let db_settings_file = db_settings_file?;
 993                if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
 994                    worktree.settings_files.push(WorktreeSettingsFile {
 995                        path: db_settings_file.path,
 996                        content: db_settings_file.content,
 997                        kind: db_settings_file.kind,
 998                    });
 999                }
1000            }
1001        }
1002
1003        // Populate language servers.
1004        let language_servers = project
1005            .find_related(language_server::Entity)
1006            .all(tx)
1007            .await?;
1008
1009        let project = Project {
1010            id: project.id,
1011            role,
1012            collaborators: collaborators
1013                .into_iter()
1014                .map(|collaborator| ProjectCollaborator {
1015                    connection_id: collaborator.connection(),
1016                    user_id: collaborator.user_id,
1017                    replica_id: collaborator.replica_id,
1018                    is_host: collaborator.is_host,
1019                })
1020                .collect(),
1021            worktrees,
1022            repositories,
1023            language_servers: language_servers
1024                .into_iter()
1025                .map(|language_server| proto::LanguageServer {
1026                    id: language_server.id as u64,
1027                    name: language_server.name,
1028                    worktree_id: None,
1029                })
1030                .collect(),
1031        };
1032        Ok((project, replica_id as ReplicaId))
1033    }
1034
1035    /// Removes the given connection from the specified project.
1036    pub async fn leave_project(
1037        &self,
1038        project_id: ProjectId,
1039        connection: ConnectionId,
1040    ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1041        self.project_transaction(project_id, |tx| async move {
1042            let result = project_collaborator::Entity::delete_many()
1043                .filter(
1044                    Condition::all()
1045                        .add(project_collaborator::Column::ProjectId.eq(project_id))
1046                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1047                        .add(
1048                            project_collaborator::Column::ConnectionServerId
1049                                .eq(connection.owner_id as i32),
1050                        ),
1051                )
1052                .exec(&*tx)
1053                .await?;
1054            if result.rows_affected == 0 {
1055                Err(anyhow!("not a collaborator on this project"))?;
1056            }
1057
1058            let project = project::Entity::find_by_id(project_id)
1059                .one(&*tx)
1060                .await?
1061                .ok_or_else(|| anyhow!("no such project"))?;
1062            let collaborators = project
1063                .find_related(project_collaborator::Entity)
1064                .all(&*tx)
1065                .await?;
1066            let connection_ids: Vec<ConnectionId> = collaborators
1067                .into_iter()
1068                .map(|collaborator| collaborator.connection())
1069                .collect();
1070
1071            follower::Entity::delete_many()
1072                .filter(
1073                    Condition::any()
1074                        .add(
1075                            Condition::all()
1076                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1077                                .add(
1078                                    follower::Column::LeaderConnectionServerId
1079                                        .eq(connection.owner_id),
1080                                )
1081                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1082                        )
1083                        .add(
1084                            Condition::all()
1085                                .add(follower::Column::ProjectId.eq(Some(project_id)))
1086                                .add(
1087                                    follower::Column::FollowerConnectionServerId
1088                                        .eq(connection.owner_id),
1089                                )
1090                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1091                        ),
1092                )
1093                .exec(&*tx)
1094                .await?;
1095
1096            let room = if let Some(room_id) = project.room_id {
1097                Some(self.get_room(room_id, &tx).await?)
1098            } else {
1099                None
1100            };
1101
1102            let left_project = LeftProject {
1103                id: project_id,
1104                should_unshare: connection == project.host_connection()?,
1105                connection_ids,
1106            };
1107            Ok((room, left_project))
1108        })
1109        .await
1110    }
1111
1112    pub async fn check_user_is_project_host(
1113        &self,
1114        project_id: ProjectId,
1115        connection_id: ConnectionId,
1116    ) -> Result<()> {
1117        self.project_transaction(project_id, |tx| async move {
1118            project::Entity::find()
1119                .filter(
1120                    Condition::all()
1121                        .add(project::Column::Id.eq(project_id))
1122                        .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1123                        .add(
1124                            project::Column::HostConnectionServerId
1125                                .eq(Some(connection_id.owner_id as i32)),
1126                        ),
1127                )
1128                .one(&*tx)
1129                .await?
1130                .ok_or_else(|| anyhow!("failed to read project host"))?;
1131
1132            Ok(())
1133        })
1134        .await
1135        .map(|guard| guard.into_inner())
1136    }
1137
1138    /// Returns the current project if the given user is authorized to access it with the specified capability.
1139    pub async fn access_project(
1140        &self,
1141        project_id: ProjectId,
1142        connection_id: ConnectionId,
1143        capability: Capability,
1144        tx: &DatabaseTransaction,
1145    ) -> Result<(project::Model, ChannelRole)> {
1146        let project = project::Entity::find_by_id(project_id)
1147            .one(tx)
1148            .await?
1149            .ok_or_else(|| anyhow!("no such project"))?;
1150
1151        let role_from_room = if let Some(room_id) = project.room_id {
1152            room_participant::Entity::find()
1153                .filter(room_participant::Column::RoomId.eq(room_id))
1154                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1155                .one(tx)
1156                .await?
1157                .and_then(|participant| participant.role)
1158        } else {
1159            None
1160        };
1161
1162        let role = role_from_room.unwrap_or(ChannelRole::Banned);
1163
1164        match capability {
1165            Capability::ReadWrite => {
1166                if !role.can_edit_projects() {
1167                    return Err(anyhow!("not authorized to edit projects"))?;
1168                }
1169            }
1170            Capability::ReadOnly => {
1171                if !role.can_read_projects() {
1172                    return Err(anyhow!("not authorized to read projects"))?;
1173                }
1174            }
1175        }
1176
1177        Ok((project, role))
1178    }
1179
1180    /// Returns the host connection for a read-only request to join a shared project.
1181    pub async fn host_for_read_only_project_request(
1182        &self,
1183        project_id: ProjectId,
1184        connection_id: ConnectionId,
1185    ) -> Result<ConnectionId> {
1186        self.project_transaction(project_id, |tx| async move {
1187            let (project, _) = self
1188                .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1189                .await?;
1190            project.host_connection()
1191        })
1192        .await
1193        .map(|guard| guard.into_inner())
1194    }
1195
1196    /// Returns the host connection for a request to join a shared project.
1197    pub async fn host_for_mutating_project_request(
1198        &self,
1199        project_id: ProjectId,
1200        connection_id: ConnectionId,
1201    ) -> Result<ConnectionId> {
1202        self.project_transaction(project_id, |tx| async move {
1203            let (project, _) = self
1204                .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1205                .await?;
1206            project.host_connection()
1207        })
1208        .await
1209        .map(|guard| guard.into_inner())
1210    }
1211
1212    pub async fn connections_for_buffer_update(
1213        &self,
1214        project_id: ProjectId,
1215        connection_id: ConnectionId,
1216        capability: Capability,
1217    ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1218        self.project_transaction(project_id, |tx| async move {
1219            // Authorize
1220            let (project, _) = self
1221                .access_project(project_id, connection_id, capability, &tx)
1222                .await?;
1223
1224            let host_connection_id = project.host_connection()?;
1225
1226            let collaborators = project_collaborator::Entity::find()
1227                .filter(project_collaborator::Column::ProjectId.eq(project_id))
1228                .all(&*tx)
1229                .await?;
1230
1231            let guest_connection_ids = collaborators
1232                .into_iter()
1233                .filter_map(|collaborator| {
1234                    if collaborator.is_host {
1235                        None
1236                    } else {
1237                        Some(collaborator.connection())
1238                    }
1239                })
1240                .collect();
1241
1242            Ok((host_connection_id, guest_connection_ids))
1243        })
1244        .await
1245    }
1246
1247    /// Returns the connection IDs in the given project.
1248    ///
1249    /// The provided `connection_id` must also be a collaborator in the project,
1250    /// otherwise an error will be returned.
1251    pub async fn project_connection_ids(
1252        &self,
1253        project_id: ProjectId,
1254        connection_id: ConnectionId,
1255        exclude_dev_server: bool,
1256    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1257        self.project_transaction(project_id, |tx| async move {
1258            self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1259                .await
1260        })
1261        .await
1262    }
1263
1264    async fn internal_project_connection_ids(
1265        &self,
1266        project_id: ProjectId,
1267        connection_id: ConnectionId,
1268        exclude_dev_server: bool,
1269        tx: &DatabaseTransaction,
1270    ) -> Result<HashSet<ConnectionId>> {
1271        let project = project::Entity::find_by_id(project_id)
1272            .one(tx)
1273            .await?
1274            .ok_or_else(|| anyhow!("no such project"))?;
1275
1276        let mut collaborators = project_collaborator::Entity::find()
1277            .filter(project_collaborator::Column::ProjectId.eq(project_id))
1278            .stream(tx)
1279            .await?;
1280
1281        let mut connection_ids = HashSet::default();
1282        if let Some(host_connection) = project.host_connection().log_err() {
1283            if !exclude_dev_server {
1284                connection_ids.insert(host_connection);
1285            }
1286        }
1287
1288        while let Some(collaborator) = collaborators.next().await {
1289            let collaborator = collaborator?;
1290            connection_ids.insert(collaborator.connection());
1291        }
1292
1293        if connection_ids.contains(&connection_id)
1294            || Some(connection_id) == project.host_connection().ok()
1295        {
1296            Ok(connection_ids)
1297        } else {
1298            Err(anyhow!(
1299                "can only send project updates to a project you're in"
1300            ))?
1301        }
1302    }
1303
1304    async fn project_guest_connection_ids(
1305        &self,
1306        project_id: ProjectId,
1307        tx: &DatabaseTransaction,
1308    ) -> Result<Vec<ConnectionId>> {
1309        let mut collaborators = project_collaborator::Entity::find()
1310            .filter(
1311                project_collaborator::Column::ProjectId
1312                    .eq(project_id)
1313                    .and(project_collaborator::Column::IsHost.eq(false)),
1314            )
1315            .stream(tx)
1316            .await?;
1317
1318        let mut guest_connection_ids = Vec::new();
1319        while let Some(collaborator) = collaborators.next().await {
1320            let collaborator = collaborator?;
1321            guest_connection_ids.push(collaborator.connection());
1322        }
1323        Ok(guest_connection_ids)
1324    }
1325
1326    /// Returns the [`RoomId`] for the given project.
1327    pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1328        self.transaction(|tx| async move {
1329            Ok(project::Entity::find_by_id(project_id)
1330                .one(&*tx)
1331                .await?
1332                .and_then(|project| project.room_id))
1333        })
1334        .await
1335    }
1336
1337    pub async fn check_room_participants(
1338        &self,
1339        room_id: RoomId,
1340        leader_id: ConnectionId,
1341        follower_id: ConnectionId,
1342    ) -> Result<()> {
1343        self.transaction(|tx| async move {
1344            use room_participant::Column;
1345
1346            let count = room_participant::Entity::find()
1347                .filter(
1348                    Condition::all().add(Column::RoomId.eq(room_id)).add(
1349                        Condition::any()
1350                            .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1351                                Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1352                            ))
1353                            .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1354                                Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1355                            )),
1356                    ),
1357                )
1358                .count(&*tx)
1359                .await?;
1360
1361            if count < 2 {
1362                Err(anyhow!("not room participants"))?;
1363            }
1364
1365            Ok(())
1366        })
1367        .await
1368    }
1369
1370    /// Adds the given follower connection as a follower of the given leader connection.
1371    pub async fn follow(
1372        &self,
1373        room_id: RoomId,
1374        project_id: ProjectId,
1375        leader_connection: ConnectionId,
1376        follower_connection: ConnectionId,
1377    ) -> Result<TransactionGuard<proto::Room>> {
1378        self.room_transaction(room_id, |tx| async move {
1379            follower::ActiveModel {
1380                room_id: ActiveValue::set(room_id),
1381                project_id: ActiveValue::set(project_id),
1382                leader_connection_server_id: ActiveValue::set(ServerId(
1383                    leader_connection.owner_id as i32,
1384                )),
1385                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1386                follower_connection_server_id: ActiveValue::set(ServerId(
1387                    follower_connection.owner_id as i32,
1388                )),
1389                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1390                ..Default::default()
1391            }
1392            .insert(&*tx)
1393            .await?;
1394
1395            let room = self.get_room(room_id, &tx).await?;
1396            Ok(room)
1397        })
1398        .await
1399    }
1400
1401    /// Removes the given follower connection as a follower of the given leader connection.
1402    pub async fn unfollow(
1403        &self,
1404        room_id: RoomId,
1405        project_id: ProjectId,
1406        leader_connection: ConnectionId,
1407        follower_connection: ConnectionId,
1408    ) -> Result<TransactionGuard<proto::Room>> {
1409        self.room_transaction(room_id, |tx| async move {
1410            follower::Entity::delete_many()
1411                .filter(
1412                    Condition::all()
1413                        .add(follower::Column::RoomId.eq(room_id))
1414                        .add(follower::Column::ProjectId.eq(project_id))
1415                        .add(
1416                            follower::Column::LeaderConnectionServerId
1417                                .eq(leader_connection.owner_id),
1418                        )
1419                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1420                        .add(
1421                            follower::Column::FollowerConnectionServerId
1422                                .eq(follower_connection.owner_id),
1423                        )
1424                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1425                )
1426                .exec(&*tx)
1427                .await?;
1428
1429            let room = self.get_room(room_id, &tx).await?;
1430            Ok(room)
1431        })
1432        .await
1433    }
1434}