rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel_id) = channel_id {
  56                channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
  57            } else {
  58                channel_members = Vec::new();
  59
  60                // Delete the room if it becomes empty.
  61                if room.participants.is_empty() {
  62                    project::Entity::delete_many()
  63                        .filter(project::Column::RoomId.eq(room_id))
  64                        .exec(&*tx)
  65                        .await?;
  66                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  67                }
  68            };
  69
  70            Ok(RefreshedRoom {
  71                room,
  72                channel_id,
  73                channel_members,
  74                stale_participant_user_ids,
  75                canceled_calls_to_user_ids,
  76            })
  77        })
  78        .await
  79    }
  80
  81    pub async fn incoming_call_for_user(
  82        &self,
  83        user_id: UserId,
  84    ) -> Result<Option<proto::IncomingCall>> {
  85        self.transaction(|tx| async move {
  86            let pending_participant = room_participant::Entity::find()
  87                .filter(
  88                    room_participant::Column::UserId
  89                        .eq(user_id)
  90                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  91                )
  92                .one(&*tx)
  93                .await?;
  94
  95            if let Some(pending_participant) = pending_participant {
  96                let room = self.get_room(pending_participant.room_id, &tx).await?;
  97                Ok(Self::build_incoming_call(&room, user_id))
  98            } else {
  99                Ok(None)
 100            }
 101        })
 102        .await
 103    }
 104
 105    pub async fn create_room(
 106        &self,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109        live_kit_room: &str,
 110        release_channel: &str,
 111    ) -> Result<proto::Room> {
 112        self.transaction(|tx| async move {
 113            let room = room::ActiveModel {
 114                live_kit_room: ActiveValue::set(live_kit_room.into()),
 115                enviroment: ActiveValue::set(Some(release_channel.to_string())),
 116                ..Default::default()
 117            }
 118            .insert(&*tx)
 119            .await?;
 120            room_participant::ActiveModel {
 121                room_id: ActiveValue::set(room.id),
 122                user_id: ActiveValue::set(user_id),
 123                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 124                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 125                    connection.owner_id as i32,
 126                ))),
 127                answering_connection_lost: ActiveValue::set(false),
 128                calling_user_id: ActiveValue::set(user_id),
 129                calling_connection_id: ActiveValue::set(connection.id as i32),
 130                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 131                    connection.owner_id as i32,
 132                ))),
 133                participant_index: ActiveValue::set(Some(0)),
 134                ..Default::default()
 135            }
 136            .insert(&*tx)
 137            .await?;
 138
 139            let room = self.get_room(room.id, &tx).await?;
 140            Ok(room)
 141        })
 142        .await
 143    }
 144
 145    pub async fn call(
 146        &self,
 147        room_id: RoomId,
 148        calling_user_id: UserId,
 149        calling_connection: ConnectionId,
 150        called_user_id: UserId,
 151        initial_project_id: Option<ProjectId>,
 152    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 153        self.room_transaction(room_id, |tx| async move {
 154            room_participant::ActiveModel {
 155                room_id: ActiveValue::set(room_id),
 156                user_id: ActiveValue::set(called_user_id),
 157                answering_connection_lost: ActiveValue::set(false),
 158                participant_index: ActiveValue::NotSet,
 159                calling_user_id: ActiveValue::set(calling_user_id),
 160                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 161                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 162                    calling_connection.owner_id as i32,
 163                ))),
 164                initial_project_id: ActiveValue::set(initial_project_id),
 165                ..Default::default()
 166            }
 167            .insert(&*tx)
 168            .await?;
 169
 170            let room = self.get_room(room_id, &tx).await?;
 171            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 172                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 173            Ok((room, incoming_call))
 174        })
 175        .await
 176    }
 177
 178    pub async fn call_failed(
 179        &self,
 180        room_id: RoomId,
 181        called_user_id: UserId,
 182    ) -> Result<RoomGuard<proto::Room>> {
 183        self.room_transaction(room_id, |tx| async move {
 184            room_participant::Entity::delete_many()
 185                .filter(
 186                    room_participant::Column::RoomId
 187                        .eq(room_id)
 188                        .and(room_participant::Column::UserId.eq(called_user_id)),
 189                )
 190                .exec(&*tx)
 191                .await?;
 192            let room = self.get_room(room_id, &tx).await?;
 193            Ok(room)
 194        })
 195        .await
 196    }
 197
 198    pub async fn decline_call(
 199        &self,
 200        expected_room_id: Option<RoomId>,
 201        user_id: UserId,
 202    ) -> Result<Option<RoomGuard<proto::Room>>> {
 203        self.optional_room_transaction(|tx| async move {
 204            let mut filter = Condition::all()
 205                .add(room_participant::Column::UserId.eq(user_id))
 206                .add(room_participant::Column::AnsweringConnectionId.is_null());
 207            if let Some(room_id) = expected_room_id {
 208                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 209            }
 210            let participant = room_participant::Entity::find()
 211                .filter(filter)
 212                .one(&*tx)
 213                .await?;
 214
 215            let participant = if let Some(participant) = participant {
 216                participant
 217            } else if expected_room_id.is_some() {
 218                return Err(anyhow!("could not find call to decline"))?;
 219            } else {
 220                return Ok(None);
 221            };
 222
 223            let room_id = participant.room_id;
 224            room_participant::Entity::delete(participant.into_active_model())
 225                .exec(&*tx)
 226                .await?;
 227
 228            let room = self.get_room(room_id, &tx).await?;
 229            Ok(Some((room_id, room)))
 230        })
 231        .await
 232    }
 233
 234    pub async fn cancel_call(
 235        &self,
 236        room_id: RoomId,
 237        calling_connection: ConnectionId,
 238        called_user_id: UserId,
 239    ) -> Result<RoomGuard<proto::Room>> {
 240        self.room_transaction(room_id, |tx| async move {
 241            let participant = room_participant::Entity::find()
 242                .filter(
 243                    Condition::all()
 244                        .add(room_participant::Column::UserId.eq(called_user_id))
 245                        .add(room_participant::Column::RoomId.eq(room_id))
 246                        .add(
 247                            room_participant::Column::CallingConnectionId
 248                                .eq(calling_connection.id as i32),
 249                        )
 250                        .add(
 251                            room_participant::Column::CallingConnectionServerId
 252                                .eq(calling_connection.owner_id as i32),
 253                        )
 254                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 255                )
 256                .one(&*tx)
 257                .await?
 258                .ok_or_else(|| anyhow!("no call to cancel"))?;
 259
 260            room_participant::Entity::delete(participant.into_active_model())
 261                .exec(&*tx)
 262                .await?;
 263
 264            let room = self.get_room(room_id, &tx).await?;
 265            Ok(room)
 266        })
 267        .await
 268    }
 269
 270    pub async fn join_room(
 271        &self,
 272        room_id: RoomId,
 273        user_id: UserId,
 274        connection: ConnectionId,
 275        enviroment: &str,
 276    ) -> Result<RoomGuard<JoinRoom>> {
 277        self.room_transaction(room_id, |tx| async move {
 278            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 279            enum QueryChannelIdAndEnviroment {
 280                ChannelId,
 281                Enviroment,
 282            }
 283
 284            let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
 285                room::Entity::find()
 286                    .select_only()
 287                    .column(room::Column::ChannelId)
 288                    .column(room::Column::Enviroment)
 289                    .filter(room::Column::Id.eq(room_id))
 290                    .into_values::<_, QueryChannelIdAndEnviroment>()
 291                    .one(&*tx)
 292                    .await?
 293                    .ok_or_else(|| anyhow!("no such room"))?;
 294
 295            if let Some(release_channel) = release_channel {
 296                if &release_channel != enviroment {
 297                    Err(anyhow!("must join using the {} release", release_channel))?;
 298                }
 299            }
 300
 301            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 302            enum QueryParticipantIndices {
 303                ParticipantIndex,
 304            }
 305            let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 306                .filter(
 307                    room_participant::Column::RoomId
 308                        .eq(room_id)
 309                        .and(room_participant::Column::ParticipantIndex.is_not_null()),
 310                )
 311                .select_only()
 312                .column(room_participant::Column::ParticipantIndex)
 313                .into_values::<_, QueryParticipantIndices>()
 314                .all(&*tx)
 315                .await?;
 316
 317            let mut participant_index = 0;
 318            while existing_participant_indices.contains(&participant_index) {
 319                participant_index += 1;
 320            }
 321
 322            if let Some(channel_id) = channel_id {
 323                self.check_user_is_channel_member(channel_id, user_id, &*tx)
 324                    .await?;
 325
 326                room_participant::Entity::insert_many([room_participant::ActiveModel {
 327                    room_id: ActiveValue::set(room_id),
 328                    user_id: ActiveValue::set(user_id),
 329                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 330                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 331                        connection.owner_id as i32,
 332                    ))),
 333                    answering_connection_lost: ActiveValue::set(false),
 334                    calling_user_id: ActiveValue::set(user_id),
 335                    calling_connection_id: ActiveValue::set(connection.id as i32),
 336                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
 337                        connection.owner_id as i32,
 338                    ))),
 339                    participant_index: ActiveValue::Set(Some(participant_index)),
 340                    ..Default::default()
 341                }])
 342                .on_conflict(
 343                    OnConflict::columns([room_participant::Column::UserId])
 344                        .update_columns([
 345                            room_participant::Column::AnsweringConnectionId,
 346                            room_participant::Column::AnsweringConnectionServerId,
 347                            room_participant::Column::AnsweringConnectionLost,
 348                            room_participant::Column::ParticipantIndex,
 349                        ])
 350                        .to_owned(),
 351                )
 352                .exec(&*tx)
 353                .await?;
 354            } else {
 355                let result = room_participant::Entity::update_many()
 356                    .filter(
 357                        Condition::all()
 358                            .add(room_participant::Column::RoomId.eq(room_id))
 359                            .add(room_participant::Column::UserId.eq(user_id))
 360                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 361                    )
 362                    .set(room_participant::ActiveModel {
 363                        participant_index: ActiveValue::Set(Some(participant_index)),
 364                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 365                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
 366                            connection.owner_id as i32,
 367                        ))),
 368                        answering_connection_lost: ActiveValue::set(false),
 369                        ..Default::default()
 370                    })
 371                    .exec(&*tx)
 372                    .await?;
 373                if result.rows_affected == 0 {
 374                    Err(anyhow!("room does not exist or was already joined"))?;
 375                }
 376            }
 377
 378            let room = self.get_room(room_id, &tx).await?;
 379            let channel_members = if let Some(channel_id) = channel_id {
 380                self.get_channel_members_internal(channel_id, &tx).await?
 381            } else {
 382                Vec::new()
 383            };
 384            Ok(JoinRoom {
 385                room,
 386                channel_id,
 387                channel_members,
 388            })
 389        })
 390        .await
 391    }
 392
 393    pub async fn rejoin_room(
 394        &self,
 395        rejoin_room: proto::RejoinRoom,
 396        user_id: UserId,
 397        connection: ConnectionId,
 398    ) -> Result<RoomGuard<RejoinedRoom>> {
 399        let room_id = RoomId::from_proto(rejoin_room.id);
 400        self.room_transaction(room_id, |tx| async {
 401            let tx = tx;
 402            let participant_update = room_participant::Entity::update_many()
 403                .filter(
 404                    Condition::all()
 405                        .add(room_participant::Column::RoomId.eq(room_id))
 406                        .add(room_participant::Column::UserId.eq(user_id))
 407                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 408                        .add(
 409                            Condition::any()
 410                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 411                                .add(
 412                                    room_participant::Column::AnsweringConnectionServerId
 413                                        .ne(connection.owner_id as i32),
 414                                ),
 415                        ),
 416                )
 417                .set(room_participant::ActiveModel {
 418                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 419                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 420                        connection.owner_id as i32,
 421                    ))),
 422                    answering_connection_lost: ActiveValue::set(false),
 423                    ..Default::default()
 424                })
 425                .exec(&*tx)
 426                .await?;
 427            if participant_update.rows_affected == 0 {
 428                return Err(anyhow!("room does not exist or was already joined"))?;
 429            }
 430
 431            let mut reshared_projects = Vec::new();
 432            for reshared_project in &rejoin_room.reshared_projects {
 433                let project_id = ProjectId::from_proto(reshared_project.project_id);
 434                let project = project::Entity::find_by_id(project_id)
 435                    .one(&*tx)
 436                    .await?
 437                    .ok_or_else(|| anyhow!("project does not exist"))?;
 438                if project.host_user_id != user_id {
 439                    return Err(anyhow!("no such project"))?;
 440                }
 441
 442                let mut collaborators = project
 443                    .find_related(project_collaborator::Entity)
 444                    .all(&*tx)
 445                    .await?;
 446                let host_ix = collaborators
 447                    .iter()
 448                    .position(|collaborator| {
 449                        collaborator.user_id == user_id && collaborator.is_host
 450                    })
 451                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 452                let host = collaborators.swap_remove(host_ix);
 453                let old_connection_id = host.connection();
 454
 455                project::Entity::update(project::ActiveModel {
 456                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 457                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 458                        connection.owner_id as i32,
 459                    ))),
 460                    ..project.into_active_model()
 461                })
 462                .exec(&*tx)
 463                .await?;
 464                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 465                    connection_id: ActiveValue::set(connection.id as i32),
 466                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 467                    ..host.into_active_model()
 468                })
 469                .exec(&*tx)
 470                .await?;
 471
 472                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 473                    .await?;
 474
 475                reshared_projects.push(ResharedProject {
 476                    id: project_id,
 477                    old_connection_id,
 478                    collaborators: collaborators
 479                        .iter()
 480                        .map(|collaborator| ProjectCollaborator {
 481                            connection_id: collaborator.connection(),
 482                            user_id: collaborator.user_id,
 483                            replica_id: collaborator.replica_id,
 484                            is_host: collaborator.is_host,
 485                        })
 486                        .collect(),
 487                    worktrees: reshared_project.worktrees.clone(),
 488                });
 489            }
 490
 491            project::Entity::delete_many()
 492                .filter(
 493                    Condition::all()
 494                        .add(project::Column::RoomId.eq(room_id))
 495                        .add(project::Column::HostUserId.eq(user_id))
 496                        .add(
 497                            project::Column::Id
 498                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 499                        ),
 500                )
 501                .exec(&*tx)
 502                .await?;
 503
 504            let mut rejoined_projects = Vec::new();
 505            for rejoined_project in &rejoin_room.rejoined_projects {
 506                let project_id = ProjectId::from_proto(rejoined_project.id);
 507                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 508                    continue;
 509                };
 510
 511                let mut worktrees = Vec::new();
 512                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 513                for db_worktree in db_worktrees {
 514                    let mut worktree = RejoinedWorktree {
 515                        id: db_worktree.id as u64,
 516                        abs_path: db_worktree.abs_path,
 517                        root_name: db_worktree.root_name,
 518                        visible: db_worktree.visible,
 519                        updated_entries: Default::default(),
 520                        removed_entries: Default::default(),
 521                        updated_repositories: Default::default(),
 522                        removed_repositories: Default::default(),
 523                        diagnostic_summaries: Default::default(),
 524                        settings_files: Default::default(),
 525                        scan_id: db_worktree.scan_id as u64,
 526                        completed_scan_id: db_worktree.completed_scan_id as u64,
 527                    };
 528
 529                    let rejoined_worktree = rejoined_project
 530                        .worktrees
 531                        .iter()
 532                        .find(|worktree| worktree.id == db_worktree.id as u64);
 533
 534                    // File entries
 535                    {
 536                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 537                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 538                        } else {
 539                            worktree_entry::Column::IsDeleted.eq(false)
 540                        };
 541
 542                        let mut db_entries = worktree_entry::Entity::find()
 543                            .filter(
 544                                Condition::all()
 545                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 546                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 547                                    .add(entry_filter),
 548                            )
 549                            .stream(&*tx)
 550                            .await?;
 551
 552                        while let Some(db_entry) = db_entries.next().await {
 553                            let db_entry = db_entry?;
 554                            if db_entry.is_deleted {
 555                                worktree.removed_entries.push(db_entry.id as u64);
 556                            } else {
 557                                worktree.updated_entries.push(proto::Entry {
 558                                    id: db_entry.id as u64,
 559                                    is_dir: db_entry.is_dir,
 560                                    path: db_entry.path,
 561                                    inode: db_entry.inode as u64,
 562                                    mtime: Some(proto::Timestamp {
 563                                        seconds: db_entry.mtime_seconds as u64,
 564                                        nanos: db_entry.mtime_nanos as u32,
 565                                    }),
 566                                    is_symlink: db_entry.is_symlink,
 567                                    is_ignored: db_entry.is_ignored,
 568                                    is_external: db_entry.is_external,
 569                                    git_status: db_entry.git_status.map(|status| status as i32),
 570                                });
 571                            }
 572                        }
 573                    }
 574
 575                    // Repository Entries
 576                    {
 577                        let repository_entry_filter =
 578                            if let Some(rejoined_worktree) = rejoined_worktree {
 579                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 580                            } else {
 581                                worktree_repository::Column::IsDeleted.eq(false)
 582                            };
 583
 584                        let mut db_repositories = worktree_repository::Entity::find()
 585                            .filter(
 586                                Condition::all()
 587                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 588                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 589                                    .add(repository_entry_filter),
 590                            )
 591                            .stream(&*tx)
 592                            .await?;
 593
 594                        while let Some(db_repository) = db_repositories.next().await {
 595                            let db_repository = db_repository?;
 596                            if db_repository.is_deleted {
 597                                worktree
 598                                    .removed_repositories
 599                                    .push(db_repository.work_directory_id as u64);
 600                            } else {
 601                                worktree.updated_repositories.push(proto::RepositoryEntry {
 602                                    work_directory_id: db_repository.work_directory_id as u64,
 603                                    branch: db_repository.branch,
 604                                });
 605                            }
 606                        }
 607                    }
 608
 609                    worktrees.push(worktree);
 610                }
 611
 612                let language_servers = project
 613                    .find_related(language_server::Entity)
 614                    .all(&*tx)
 615                    .await?
 616                    .into_iter()
 617                    .map(|language_server| proto::LanguageServer {
 618                        id: language_server.id as u64,
 619                        name: language_server.name,
 620                    })
 621                    .collect::<Vec<_>>();
 622
 623                {
 624                    let mut db_settings_files = worktree_settings_file::Entity::find()
 625                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 626                        .stream(&*tx)
 627                        .await?;
 628                    while let Some(db_settings_file) = db_settings_files.next().await {
 629                        let db_settings_file = db_settings_file?;
 630                        if let Some(worktree) = worktrees
 631                            .iter_mut()
 632                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 633                        {
 634                            worktree.settings_files.push(WorktreeSettingsFile {
 635                                path: db_settings_file.path,
 636                                content: db_settings_file.content,
 637                            });
 638                        }
 639                    }
 640                }
 641
 642                let mut collaborators = project
 643                    .find_related(project_collaborator::Entity)
 644                    .all(&*tx)
 645                    .await?;
 646                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 647                    .iter()
 648                    .position(|collaborator| collaborator.user_id == user_id)
 649                {
 650                    collaborators.swap_remove(self_collaborator_ix)
 651                } else {
 652                    continue;
 653                };
 654                let old_connection_id = self_collaborator.connection();
 655                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 656                    connection_id: ActiveValue::set(connection.id as i32),
 657                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 658                    ..self_collaborator.into_active_model()
 659                })
 660                .exec(&*tx)
 661                .await?;
 662
 663                let collaborators = collaborators
 664                    .into_iter()
 665                    .map(|collaborator| ProjectCollaborator {
 666                        connection_id: collaborator.connection(),
 667                        user_id: collaborator.user_id,
 668                        replica_id: collaborator.replica_id,
 669                        is_host: collaborator.is_host,
 670                    })
 671                    .collect::<Vec<_>>();
 672
 673                rejoined_projects.push(RejoinedProject {
 674                    id: project_id,
 675                    old_connection_id,
 676                    collaborators,
 677                    worktrees,
 678                    language_servers,
 679                });
 680            }
 681
 682            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 683            let channel_members = if let Some(channel_id) = channel_id {
 684                self.get_channel_members_internal(channel_id, &tx).await?
 685            } else {
 686                Vec::new()
 687            };
 688
 689            Ok(RejoinedRoom {
 690                room,
 691                channel_id,
 692                channel_members,
 693                rejoined_projects,
 694                reshared_projects,
 695            })
 696        })
 697        .await
 698    }
 699
 700    pub async fn leave_room(
 701        &self,
 702        connection: ConnectionId,
 703    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 704        self.optional_room_transaction(|tx| async move {
 705            let leaving_participant = room_participant::Entity::find()
 706                .filter(
 707                    Condition::all()
 708                        .add(
 709                            room_participant::Column::AnsweringConnectionId
 710                                .eq(connection.id as i32),
 711                        )
 712                        .add(
 713                            room_participant::Column::AnsweringConnectionServerId
 714                                .eq(connection.owner_id as i32),
 715                        ),
 716                )
 717                .one(&*tx)
 718                .await?;
 719
 720            if let Some(leaving_participant) = leaving_participant {
 721                // Leave room.
 722                let room_id = leaving_participant.room_id;
 723                room_participant::Entity::delete_by_id(leaving_participant.id)
 724                    .exec(&*tx)
 725                    .await?;
 726
 727                // Cancel pending calls initiated by the leaving user.
 728                let called_participants = room_participant::Entity::find()
 729                    .filter(
 730                        Condition::all()
 731                            .add(
 732                                room_participant::Column::CallingUserId
 733                                    .eq(leaving_participant.user_id),
 734                            )
 735                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 736                    )
 737                    .all(&*tx)
 738                    .await?;
 739                room_participant::Entity::delete_many()
 740                    .filter(
 741                        room_participant::Column::Id
 742                            .is_in(called_participants.iter().map(|participant| participant.id)),
 743                    )
 744                    .exec(&*tx)
 745                    .await?;
 746                let canceled_calls_to_user_ids = called_participants
 747                    .into_iter()
 748                    .map(|participant| participant.user_id)
 749                    .collect();
 750
 751                // Detect left projects.
 752                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 753                enum QueryProjectIds {
 754                    ProjectId,
 755                }
 756                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 757                    .select_only()
 758                    .column_as(
 759                        project_collaborator::Column::ProjectId,
 760                        QueryProjectIds::ProjectId,
 761                    )
 762                    .filter(
 763                        Condition::all()
 764                            .add(
 765                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 766                            )
 767                            .add(
 768                                project_collaborator::Column::ConnectionServerId
 769                                    .eq(connection.owner_id as i32),
 770                            ),
 771                    )
 772                    .into_values::<_, QueryProjectIds>()
 773                    .all(&*tx)
 774                    .await?;
 775                let mut left_projects = HashMap::default();
 776                let mut collaborators = project_collaborator::Entity::find()
 777                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 778                    .stream(&*tx)
 779                    .await?;
 780                while let Some(collaborator) = collaborators.next().await {
 781                    let collaborator = collaborator?;
 782                    let left_project =
 783                        left_projects
 784                            .entry(collaborator.project_id)
 785                            .or_insert(LeftProject {
 786                                id: collaborator.project_id,
 787                                host_user_id: Default::default(),
 788                                connection_ids: Default::default(),
 789                                host_connection_id: Default::default(),
 790                            });
 791
 792                    let collaborator_connection_id = collaborator.connection();
 793                    if collaborator_connection_id != connection {
 794                        left_project.connection_ids.push(collaborator_connection_id);
 795                    }
 796
 797                    if collaborator.is_host {
 798                        left_project.host_user_id = collaborator.user_id;
 799                        left_project.host_connection_id = collaborator_connection_id;
 800                    }
 801                }
 802                drop(collaborators);
 803
 804                // Leave projects.
 805                project_collaborator::Entity::delete_many()
 806                    .filter(
 807                        Condition::all()
 808                            .add(
 809                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 810                            )
 811                            .add(
 812                                project_collaborator::Column::ConnectionServerId
 813                                    .eq(connection.owner_id as i32),
 814                            ),
 815                    )
 816                    .exec(&*tx)
 817                    .await?;
 818
 819                // Unshare projects.
 820                project::Entity::delete_many()
 821                    .filter(
 822                        Condition::all()
 823                            .add(project::Column::RoomId.eq(room_id))
 824                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 825                            .add(
 826                                project::Column::HostConnectionServerId
 827                                    .eq(connection.owner_id as i32),
 828                            ),
 829                    )
 830                    .exec(&*tx)
 831                    .await?;
 832
 833                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 834                let deleted = if room.participants.is_empty() {
 835                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 836                    result.rows_affected > 0
 837                } else {
 838                    false
 839                };
 840
 841                let channel_members = if let Some(channel_id) = channel_id {
 842                    self.get_channel_members_internal(channel_id, &tx).await?
 843                } else {
 844                    Vec::new()
 845                };
 846                let left_room = LeftRoom {
 847                    room,
 848                    channel_id,
 849                    channel_members,
 850                    left_projects,
 851                    canceled_calls_to_user_ids,
 852                    deleted,
 853                };
 854
 855                if left_room.room.participants.is_empty() {
 856                    self.rooms.remove(&room_id);
 857                }
 858
 859                Ok(Some((room_id, left_room)))
 860            } else {
 861                Ok(None)
 862            }
 863        })
 864        .await
 865    }
 866
 867    pub async fn update_room_participant_location(
 868        &self,
 869        room_id: RoomId,
 870        connection: ConnectionId,
 871        location: proto::ParticipantLocation,
 872    ) -> Result<RoomGuard<proto::Room>> {
 873        self.room_transaction(room_id, |tx| async {
 874            let tx = tx;
 875            let location_kind;
 876            let location_project_id;
 877            match location
 878                .variant
 879                .as_ref()
 880                .ok_or_else(|| anyhow!("invalid location"))?
 881            {
 882                proto::participant_location::Variant::SharedProject(project) => {
 883                    location_kind = 0;
 884                    location_project_id = Some(ProjectId::from_proto(project.id));
 885                }
 886                proto::participant_location::Variant::UnsharedProject(_) => {
 887                    location_kind = 1;
 888                    location_project_id = None;
 889                }
 890                proto::participant_location::Variant::External(_) => {
 891                    location_kind = 2;
 892                    location_project_id = None;
 893                }
 894            }
 895
 896            let result = room_participant::Entity::update_many()
 897                .filter(
 898                    Condition::all()
 899                        .add(room_participant::Column::RoomId.eq(room_id))
 900                        .add(
 901                            room_participant::Column::AnsweringConnectionId
 902                                .eq(connection.id as i32),
 903                        )
 904                        .add(
 905                            room_participant::Column::AnsweringConnectionServerId
 906                                .eq(connection.owner_id as i32),
 907                        ),
 908                )
 909                .set(room_participant::ActiveModel {
 910                    location_kind: ActiveValue::set(Some(location_kind)),
 911                    location_project_id: ActiveValue::set(location_project_id),
 912                    ..Default::default()
 913                })
 914                .exec(&*tx)
 915                .await?;
 916
 917            if result.rows_affected == 1 {
 918                let room = self.get_room(room_id, &tx).await?;
 919                Ok(room)
 920            } else {
 921                Err(anyhow!("could not update room participant location"))?
 922            }
 923        })
 924        .await
 925    }
 926
 927    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
 928        self.transaction(|tx| async move {
 929            self.room_connection_lost(connection, &*tx).await?;
 930            self.channel_buffer_connection_lost(connection, &*tx)
 931                .await?;
 932            self.channel_chat_connection_lost(connection, &*tx).await?;
 933            Ok(())
 934        })
 935        .await
 936    }
 937
 938    pub async fn room_connection_lost(
 939        &self,
 940        connection: ConnectionId,
 941        tx: &DatabaseTransaction,
 942    ) -> Result<()> {
 943        let participant = room_participant::Entity::find()
 944            .filter(
 945                Condition::all()
 946                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
 947                    .add(
 948                        room_participant::Column::AnsweringConnectionServerId
 949                            .eq(connection.owner_id as i32),
 950                    ),
 951            )
 952            .one(&*tx)
 953            .await?;
 954
 955        if let Some(participant) = participant {
 956            room_participant::Entity::update(room_participant::ActiveModel {
 957                answering_connection_lost: ActiveValue::set(true),
 958                ..participant.into_active_model()
 959            })
 960            .exec(&*tx)
 961            .await?;
 962        }
 963        Ok(())
 964    }
 965
 966    fn build_incoming_call(
 967        room: &proto::Room,
 968        called_user_id: UserId,
 969    ) -> Option<proto::IncomingCall> {
 970        let pending_participant = room
 971            .pending_participants
 972            .iter()
 973            .find(|participant| participant.user_id == called_user_id.to_proto())?;
 974
 975        Some(proto::IncomingCall {
 976            room_id: room.id,
 977            calling_user_id: pending_participant.calling_user_id,
 978            participant_user_ids: room
 979                .participants
 980                .iter()
 981                .map(|participant| participant.user_id)
 982                .collect(),
 983            initial_project: room.participants.iter().find_map(|participant| {
 984                let initial_project_id = pending_participant.initial_project_id?;
 985                participant
 986                    .projects
 987                    .iter()
 988                    .find(|project| project.id == initial_project_id)
 989                    .cloned()
 990            }),
 991        })
 992    }
 993
 994    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
 995        let (_, room) = self.get_channel_room(room_id, tx).await?;
 996        Ok(room)
 997    }
 998
 999    pub async fn room_connection_ids(
1000        &self,
1001        room_id: RoomId,
1002        connection_id: ConnectionId,
1003    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1004        self.room_transaction(room_id, |tx| async move {
1005            let mut participants = room_participant::Entity::find()
1006                .filter(room_participant::Column::RoomId.eq(room_id))
1007                .stream(&*tx)
1008                .await?;
1009
1010            let mut is_participant = false;
1011            let mut connection_ids = HashSet::default();
1012            while let Some(participant) = participants.next().await {
1013                let participant = participant?;
1014                if let Some(answering_connection) = participant.answering_connection() {
1015                    if answering_connection == connection_id {
1016                        is_participant = true;
1017                    } else {
1018                        connection_ids.insert(answering_connection);
1019                    }
1020                }
1021            }
1022
1023            if !is_participant {
1024                Err(anyhow!("not a room participant"))?;
1025            }
1026
1027            Ok(connection_ids)
1028        })
1029        .await
1030    }
1031
1032    async fn get_channel_room(
1033        &self,
1034        room_id: RoomId,
1035        tx: &DatabaseTransaction,
1036    ) -> Result<(Option<ChannelId>, proto::Room)> {
1037        let db_room = room::Entity::find_by_id(room_id)
1038            .one(tx)
1039            .await?
1040            .ok_or_else(|| anyhow!("could not find room"))?;
1041
1042        let mut db_participants = db_room
1043            .find_related(room_participant::Entity)
1044            .stream(tx)
1045            .await?;
1046        let mut participants = HashMap::default();
1047        let mut pending_participants = Vec::new();
1048        while let Some(db_participant) = db_participants.next().await {
1049            let db_participant = db_participant?;
1050            if let (
1051                Some(answering_connection_id),
1052                Some(answering_connection_server_id),
1053                Some(participant_index),
1054            ) = (
1055                db_participant.answering_connection_id,
1056                db_participant.answering_connection_server_id,
1057                db_participant.participant_index,
1058            ) {
1059                let location = match (
1060                    db_participant.location_kind,
1061                    db_participant.location_project_id,
1062                ) {
1063                    (Some(0), Some(project_id)) => {
1064                        Some(proto::participant_location::Variant::SharedProject(
1065                            proto::participant_location::SharedProject {
1066                                id: project_id.to_proto(),
1067                            },
1068                        ))
1069                    }
1070                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1071                        Default::default(),
1072                    )),
1073                    _ => Some(proto::participant_location::Variant::External(
1074                        Default::default(),
1075                    )),
1076                };
1077
1078                let answering_connection = ConnectionId {
1079                    owner_id: answering_connection_server_id.0 as u32,
1080                    id: answering_connection_id as u32,
1081                };
1082                participants.insert(
1083                    answering_connection,
1084                    proto::Participant {
1085                        user_id: db_participant.user_id.to_proto(),
1086                        peer_id: Some(answering_connection.into()),
1087                        projects: Default::default(),
1088                        location: Some(proto::ParticipantLocation { variant: location }),
1089                        participant_index: participant_index as u32,
1090                    },
1091                );
1092            } else {
1093                pending_participants.push(proto::PendingParticipant {
1094                    user_id: db_participant.user_id.to_proto(),
1095                    calling_user_id: db_participant.calling_user_id.to_proto(),
1096                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1097                });
1098            }
1099        }
1100        drop(db_participants);
1101
1102        let mut db_projects = db_room
1103            .find_related(project::Entity)
1104            .find_with_related(worktree::Entity)
1105            .stream(tx)
1106            .await?;
1107
1108        while let Some(row) = db_projects.next().await {
1109            let (db_project, db_worktree) = row?;
1110            let host_connection = db_project.host_connection()?;
1111            if let Some(participant) = participants.get_mut(&host_connection) {
1112                let project = if let Some(project) = participant
1113                    .projects
1114                    .iter_mut()
1115                    .find(|project| project.id == db_project.id.to_proto())
1116                {
1117                    project
1118                } else {
1119                    participant.projects.push(proto::ParticipantProject {
1120                        id: db_project.id.to_proto(),
1121                        worktree_root_names: Default::default(),
1122                    });
1123                    participant.projects.last_mut().unwrap()
1124                };
1125
1126                if let Some(db_worktree) = db_worktree {
1127                    if db_worktree.visible {
1128                        project.worktree_root_names.push(db_worktree.root_name);
1129                    }
1130                }
1131            }
1132        }
1133        drop(db_projects);
1134
1135        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1136        let mut followers = Vec::new();
1137        while let Some(db_follower) = db_followers.next().await {
1138            let db_follower = db_follower?;
1139            followers.push(proto::Follower {
1140                leader_id: Some(db_follower.leader_connection().into()),
1141                follower_id: Some(db_follower.follower_connection().into()),
1142                project_id: db_follower.project_id.to_proto(),
1143            });
1144        }
1145
1146        Ok((
1147            db_room.channel_id,
1148            proto::Room {
1149                id: db_room.id.to_proto(),
1150                live_kit_room: db_room.live_kit_room,
1151                participants: participants.into_values().collect(),
1152                pending_participants,
1153                followers,
1154            },
1155        ))
1156    }
1157}