rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel) = &channel {
  56                channel_members = self.get_channel_participants(channel, &tx).await?;
  57            } else {
  58                channel_members = Vec::new();
  59
  60                // Delete the room if it becomes empty.
  61                if room.participants.is_empty() {
  62                    project::Entity::delete_many()
  63                        .filter(project::Column::RoomId.eq(room_id))
  64                        .exec(&*tx)
  65                        .await?;
  66                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  67                }
  68            };
  69
  70            Ok(RefreshedRoom {
  71                room,
  72                channel_id: channel.map(|channel| channel.id),
  73                channel_members,
  74                stale_participant_user_ids,
  75                canceled_calls_to_user_ids,
  76            })
  77        })
  78        .await
  79    }
  80
  81    pub async fn incoming_call_for_user(
  82        &self,
  83        user_id: UserId,
  84    ) -> Result<Option<proto::IncomingCall>> {
  85        self.transaction(|tx| async move {
  86            let pending_participant = room_participant::Entity::find()
  87                .filter(
  88                    room_participant::Column::UserId
  89                        .eq(user_id)
  90                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  91                )
  92                .one(&*tx)
  93                .await?;
  94
  95            if let Some(pending_participant) = pending_participant {
  96                let room = self.get_room(pending_participant.room_id, &tx).await?;
  97                Ok(Self::build_incoming_call(&room, user_id))
  98            } else {
  99                Ok(None)
 100            }
 101        })
 102        .await
 103    }
 104
 105    pub async fn create_room(
 106        &self,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109        live_kit_room: &str,
 110        release_channel: &str,
 111    ) -> Result<proto::Room> {
 112        self.transaction(|tx| async move {
 113            let room = room::ActiveModel {
 114                live_kit_room: ActiveValue::set(live_kit_room.into()),
 115                enviroment: ActiveValue::set(Some(release_channel.to_string())),
 116                ..Default::default()
 117            }
 118            .insert(&*tx)
 119            .await?;
 120            room_participant::ActiveModel {
 121                room_id: ActiveValue::set(room.id),
 122                user_id: ActiveValue::set(user_id),
 123                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 124                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 125                    connection.owner_id as i32,
 126                ))),
 127                answering_connection_lost: ActiveValue::set(false),
 128                calling_user_id: ActiveValue::set(user_id),
 129                calling_connection_id: ActiveValue::set(connection.id as i32),
 130                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 131                    connection.owner_id as i32,
 132                ))),
 133                participant_index: ActiveValue::set(Some(0)),
 134                ..Default::default()
 135            }
 136            .insert(&*tx)
 137            .await?;
 138
 139            let room = self.get_room(room.id, &tx).await?;
 140            Ok(room)
 141        })
 142        .await
 143    }
 144
 145    pub async fn call(
 146        &self,
 147        room_id: RoomId,
 148        calling_user_id: UserId,
 149        calling_connection: ConnectionId,
 150        called_user_id: UserId,
 151        initial_project_id: Option<ProjectId>,
 152    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 153        self.room_transaction(room_id, |tx| async move {
 154            room_participant::ActiveModel {
 155                room_id: ActiveValue::set(room_id),
 156                user_id: ActiveValue::set(called_user_id),
 157                answering_connection_lost: ActiveValue::set(false),
 158                participant_index: ActiveValue::NotSet,
 159                calling_user_id: ActiveValue::set(calling_user_id),
 160                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 161                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 162                    calling_connection.owner_id as i32,
 163                ))),
 164                initial_project_id: ActiveValue::set(initial_project_id),
 165                ..Default::default()
 166            }
 167            .insert(&*tx)
 168            .await?;
 169
 170            let room = self.get_room(room_id, &tx).await?;
 171            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 172                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 173            Ok((room, incoming_call))
 174        })
 175        .await
 176    }
 177
 178    pub async fn call_failed(
 179        &self,
 180        room_id: RoomId,
 181        called_user_id: UserId,
 182    ) -> Result<RoomGuard<proto::Room>> {
 183        self.room_transaction(room_id, |tx| async move {
 184            room_participant::Entity::delete_many()
 185                .filter(
 186                    room_participant::Column::RoomId
 187                        .eq(room_id)
 188                        .and(room_participant::Column::UserId.eq(called_user_id)),
 189                )
 190                .exec(&*tx)
 191                .await?;
 192            let room = self.get_room(room_id, &tx).await?;
 193            Ok(room)
 194        })
 195        .await
 196    }
 197
 198    pub async fn decline_call(
 199        &self,
 200        expected_room_id: Option<RoomId>,
 201        user_id: UserId,
 202    ) -> Result<Option<RoomGuard<proto::Room>>> {
 203        self.optional_room_transaction(|tx| async move {
 204            let mut filter = Condition::all()
 205                .add(room_participant::Column::UserId.eq(user_id))
 206                .add(room_participant::Column::AnsweringConnectionId.is_null());
 207            if let Some(room_id) = expected_room_id {
 208                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 209            }
 210            let participant = room_participant::Entity::find()
 211                .filter(filter)
 212                .one(&*tx)
 213                .await?;
 214
 215            let participant = if let Some(participant) = participant {
 216                participant
 217            } else if expected_room_id.is_some() {
 218                return Err(anyhow!("could not find call to decline"))?;
 219            } else {
 220                return Ok(None);
 221            };
 222
 223            let room_id = participant.room_id;
 224            room_participant::Entity::delete(participant.into_active_model())
 225                .exec(&*tx)
 226                .await?;
 227
 228            let room = self.get_room(room_id, &tx).await?;
 229            Ok(Some((room_id, room)))
 230        })
 231        .await
 232    }
 233
 234    pub async fn cancel_call(
 235        &self,
 236        room_id: RoomId,
 237        calling_connection: ConnectionId,
 238        called_user_id: UserId,
 239    ) -> Result<RoomGuard<proto::Room>> {
 240        self.room_transaction(room_id, |tx| async move {
 241            let participant = room_participant::Entity::find()
 242                .filter(
 243                    Condition::all()
 244                        .add(room_participant::Column::UserId.eq(called_user_id))
 245                        .add(room_participant::Column::RoomId.eq(room_id))
 246                        .add(
 247                            room_participant::Column::CallingConnectionId
 248                                .eq(calling_connection.id as i32),
 249                        )
 250                        .add(
 251                            room_participant::Column::CallingConnectionServerId
 252                                .eq(calling_connection.owner_id as i32),
 253                        )
 254                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 255                )
 256                .one(&*tx)
 257                .await?
 258                .ok_or_else(|| anyhow!("no call to cancel"))?;
 259
 260            room_participant::Entity::delete(participant.into_active_model())
 261                .exec(&*tx)
 262                .await?;
 263
 264            let room = self.get_room(room_id, &tx).await?;
 265            Ok(room)
 266        })
 267        .await
 268    }
 269
 270    pub async fn join_room(
 271        &self,
 272        room_id: RoomId,
 273        user_id: UserId,
 274        connection: ConnectionId,
 275        enviroment: &str,
 276    ) -> Result<RoomGuard<JoinRoom>> {
 277        self.room_transaction(room_id, |tx| async move {
 278            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 279            enum QueryChannelIdAndEnviroment {
 280                ChannelId,
 281                Enviroment,
 282            }
 283
 284            let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
 285                room::Entity::find()
 286                    .select_only()
 287                    .column(room::Column::ChannelId)
 288                    .column(room::Column::Enviroment)
 289                    .filter(room::Column::Id.eq(room_id))
 290                    .into_values::<_, QueryChannelIdAndEnviroment>()
 291                    .one(&*tx)
 292                    .await?
 293                    .ok_or_else(|| anyhow!("no such room"))?;
 294
 295            if let Some(release_channel) = release_channel {
 296                if &release_channel != enviroment {
 297                    Err(anyhow!("must join using the {} release", release_channel))?;
 298                }
 299            }
 300
 301            if channel_id.is_some() {
 302                Err(anyhow!("tried to join channel call directly"))?
 303            }
 304
 305            let participant_index = self
 306                .get_next_participant_index_internal(room_id, &*tx)
 307                .await?;
 308
 309            let result = room_participant::Entity::update_many()
 310                .filter(
 311                    Condition::all()
 312                        .add(room_participant::Column::RoomId.eq(room_id))
 313                        .add(room_participant::Column::UserId.eq(user_id))
 314                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 315                )
 316                .set(room_participant::ActiveModel {
 317                    participant_index: ActiveValue::Set(Some(participant_index)),
 318                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 319                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 320                        connection.owner_id as i32,
 321                    ))),
 322                    answering_connection_lost: ActiveValue::set(false),
 323                    ..Default::default()
 324                })
 325                .exec(&*tx)
 326                .await?;
 327            if result.rows_affected == 0 {
 328                Err(anyhow!("room does not exist or was already joined"))?;
 329            }
 330
 331            let room = self.get_room(room_id, &tx).await?;
 332            Ok(JoinRoom {
 333                room,
 334                channel_id: None,
 335                channel_members: vec![],
 336            })
 337        })
 338        .await
 339    }
 340
 341    async fn get_next_participant_index_internal(
 342        &self,
 343        room_id: RoomId,
 344        tx: &DatabaseTransaction,
 345    ) -> Result<i32> {
 346        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 347        enum QueryParticipantIndices {
 348            ParticipantIndex,
 349        }
 350        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 351            .filter(
 352                room_participant::Column::RoomId
 353                    .eq(room_id)
 354                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 355            )
 356            .select_only()
 357            .column(room_participant::Column::ParticipantIndex)
 358            .into_values::<_, QueryParticipantIndices>()
 359            .all(&*tx)
 360            .await?;
 361
 362        let mut participant_index = 0;
 363        while existing_participant_indices.contains(&participant_index) {
 364            participant_index += 1;
 365        }
 366
 367        Ok(participant_index)
 368    }
 369
 370    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 371        self.transaction(|tx| async move {
 372            let room: Option<room::Model> = room::Entity::find()
 373                .filter(room::Column::Id.eq(room_id))
 374                .one(&*tx)
 375                .await?;
 376
 377            Ok(room.and_then(|room| room.channel_id))
 378        })
 379        .await
 380    }
 381
 382    pub(crate) async fn join_channel_room_internal(
 383        &self,
 384        room_id: RoomId,
 385        user_id: UserId,
 386        connection: ConnectionId,
 387        tx: &DatabaseTransaction,
 388    ) -> Result<JoinRoom> {
 389        let participant_index = self
 390            .get_next_participant_index_internal(room_id, &*tx)
 391            .await?;
 392
 393        room_participant::Entity::insert_many([room_participant::ActiveModel {
 394            room_id: ActiveValue::set(room_id),
 395            user_id: ActiveValue::set(user_id),
 396            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 397            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 398                connection.owner_id as i32,
 399            ))),
 400            answering_connection_lost: ActiveValue::set(false),
 401            calling_user_id: ActiveValue::set(user_id),
 402            calling_connection_id: ActiveValue::set(connection.id as i32),
 403            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 404                connection.owner_id as i32,
 405            ))),
 406            participant_index: ActiveValue::Set(Some(participant_index)),
 407            ..Default::default()
 408        }])
 409        .on_conflict(
 410            OnConflict::columns([room_participant::Column::UserId])
 411                .update_columns([
 412                    room_participant::Column::AnsweringConnectionId,
 413                    room_participant::Column::AnsweringConnectionServerId,
 414                    room_participant::Column::AnsweringConnectionLost,
 415                    room_participant::Column::ParticipantIndex,
 416                ])
 417                .to_owned(),
 418        )
 419        .exec(&*tx)
 420        .await?;
 421
 422        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 423        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 424        let channel_members = self.get_channel_participants(&channel, &*tx).await?;
 425        Ok(JoinRoom {
 426            room,
 427            channel_id: Some(channel.id),
 428            channel_members,
 429        })
 430    }
 431
 432    pub async fn rejoin_room(
 433        &self,
 434        rejoin_room: proto::RejoinRoom,
 435        user_id: UserId,
 436        connection: ConnectionId,
 437    ) -> Result<RoomGuard<RejoinedRoom>> {
 438        let room_id = RoomId::from_proto(rejoin_room.id);
 439        self.room_transaction(room_id, |tx| async {
 440            let tx = tx;
 441            let participant_update = room_participant::Entity::update_many()
 442                .filter(
 443                    Condition::all()
 444                        .add(room_participant::Column::RoomId.eq(room_id))
 445                        .add(room_participant::Column::UserId.eq(user_id))
 446                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 447                        .add(
 448                            Condition::any()
 449                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 450                                .add(
 451                                    room_participant::Column::AnsweringConnectionServerId
 452                                        .ne(connection.owner_id as i32),
 453                                ),
 454                        ),
 455                )
 456                .set(room_participant::ActiveModel {
 457                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 458                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 459                        connection.owner_id as i32,
 460                    ))),
 461                    answering_connection_lost: ActiveValue::set(false),
 462                    ..Default::default()
 463                })
 464                .exec(&*tx)
 465                .await?;
 466            if participant_update.rows_affected == 0 {
 467                return Err(anyhow!("room does not exist or was already joined"))?;
 468            }
 469
 470            let mut reshared_projects = Vec::new();
 471            for reshared_project in &rejoin_room.reshared_projects {
 472                let project_id = ProjectId::from_proto(reshared_project.project_id);
 473                let project = project::Entity::find_by_id(project_id)
 474                    .one(&*tx)
 475                    .await?
 476                    .ok_or_else(|| anyhow!("project does not exist"))?;
 477                if project.host_user_id != user_id {
 478                    return Err(anyhow!("no such project"))?;
 479                }
 480
 481                let mut collaborators = project
 482                    .find_related(project_collaborator::Entity)
 483                    .all(&*tx)
 484                    .await?;
 485                let host_ix = collaborators
 486                    .iter()
 487                    .position(|collaborator| {
 488                        collaborator.user_id == user_id && collaborator.is_host
 489                    })
 490                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 491                let host = collaborators.swap_remove(host_ix);
 492                let old_connection_id = host.connection();
 493
 494                project::Entity::update(project::ActiveModel {
 495                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 496                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 497                        connection.owner_id as i32,
 498                    ))),
 499                    ..project.into_active_model()
 500                })
 501                .exec(&*tx)
 502                .await?;
 503                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 504                    connection_id: ActiveValue::set(connection.id as i32),
 505                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 506                    ..host.into_active_model()
 507                })
 508                .exec(&*tx)
 509                .await?;
 510
 511                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 512                    .await?;
 513
 514                reshared_projects.push(ResharedProject {
 515                    id: project_id,
 516                    old_connection_id,
 517                    collaborators: collaborators
 518                        .iter()
 519                        .map(|collaborator| ProjectCollaborator {
 520                            connection_id: collaborator.connection(),
 521                            user_id: collaborator.user_id,
 522                            replica_id: collaborator.replica_id,
 523                            is_host: collaborator.is_host,
 524                        })
 525                        .collect(),
 526                    worktrees: reshared_project.worktrees.clone(),
 527                });
 528            }
 529
 530            project::Entity::delete_many()
 531                .filter(
 532                    Condition::all()
 533                        .add(project::Column::RoomId.eq(room_id))
 534                        .add(project::Column::HostUserId.eq(user_id))
 535                        .add(
 536                            project::Column::Id
 537                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 538                        ),
 539                )
 540                .exec(&*tx)
 541                .await?;
 542
 543            let mut rejoined_projects = Vec::new();
 544            for rejoined_project in &rejoin_room.rejoined_projects {
 545                let project_id = ProjectId::from_proto(rejoined_project.id);
 546                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 547                    continue;
 548                };
 549
 550                let mut worktrees = Vec::new();
 551                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 552                for db_worktree in db_worktrees {
 553                    let mut worktree = RejoinedWorktree {
 554                        id: db_worktree.id as u64,
 555                        abs_path: db_worktree.abs_path,
 556                        root_name: db_worktree.root_name,
 557                        visible: db_worktree.visible,
 558                        updated_entries: Default::default(),
 559                        removed_entries: Default::default(),
 560                        updated_repositories: Default::default(),
 561                        removed_repositories: Default::default(),
 562                        diagnostic_summaries: Default::default(),
 563                        settings_files: Default::default(),
 564                        scan_id: db_worktree.scan_id as u64,
 565                        completed_scan_id: db_worktree.completed_scan_id as u64,
 566                    };
 567
 568                    let rejoined_worktree = rejoined_project
 569                        .worktrees
 570                        .iter()
 571                        .find(|worktree| worktree.id == db_worktree.id as u64);
 572
 573                    // File entries
 574                    {
 575                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 576                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 577                        } else {
 578                            worktree_entry::Column::IsDeleted.eq(false)
 579                        };
 580
 581                        let mut db_entries = worktree_entry::Entity::find()
 582                            .filter(
 583                                Condition::all()
 584                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 585                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 586                                    .add(entry_filter),
 587                            )
 588                            .stream(&*tx)
 589                            .await?;
 590
 591                        while let Some(db_entry) = db_entries.next().await {
 592                            let db_entry = db_entry?;
 593                            if db_entry.is_deleted {
 594                                worktree.removed_entries.push(db_entry.id as u64);
 595                            } else {
 596                                worktree.updated_entries.push(proto::Entry {
 597                                    id: db_entry.id as u64,
 598                                    is_dir: db_entry.is_dir,
 599                                    path: db_entry.path,
 600                                    inode: db_entry.inode as u64,
 601                                    mtime: Some(proto::Timestamp {
 602                                        seconds: db_entry.mtime_seconds as u64,
 603                                        nanos: db_entry.mtime_nanos as u32,
 604                                    }),
 605                                    is_symlink: db_entry.is_symlink,
 606                                    is_ignored: db_entry.is_ignored,
 607                                    is_external: db_entry.is_external,
 608                                    git_status: db_entry.git_status.map(|status| status as i32),
 609                                });
 610                            }
 611                        }
 612                    }
 613
 614                    // Repository Entries
 615                    {
 616                        let repository_entry_filter =
 617                            if let Some(rejoined_worktree) = rejoined_worktree {
 618                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 619                            } else {
 620                                worktree_repository::Column::IsDeleted.eq(false)
 621                            };
 622
 623                        let mut db_repositories = worktree_repository::Entity::find()
 624                            .filter(
 625                                Condition::all()
 626                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 627                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 628                                    .add(repository_entry_filter),
 629                            )
 630                            .stream(&*tx)
 631                            .await?;
 632
 633                        while let Some(db_repository) = db_repositories.next().await {
 634                            let db_repository = db_repository?;
 635                            if db_repository.is_deleted {
 636                                worktree
 637                                    .removed_repositories
 638                                    .push(db_repository.work_directory_id as u64);
 639                            } else {
 640                                worktree.updated_repositories.push(proto::RepositoryEntry {
 641                                    work_directory_id: db_repository.work_directory_id as u64,
 642                                    branch: db_repository.branch,
 643                                });
 644                            }
 645                        }
 646                    }
 647
 648                    worktrees.push(worktree);
 649                }
 650
 651                let language_servers = project
 652                    .find_related(language_server::Entity)
 653                    .all(&*tx)
 654                    .await?
 655                    .into_iter()
 656                    .map(|language_server| proto::LanguageServer {
 657                        id: language_server.id as u64,
 658                        name: language_server.name,
 659                    })
 660                    .collect::<Vec<_>>();
 661
 662                {
 663                    let mut db_settings_files = worktree_settings_file::Entity::find()
 664                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 665                        .stream(&*tx)
 666                        .await?;
 667                    while let Some(db_settings_file) = db_settings_files.next().await {
 668                        let db_settings_file = db_settings_file?;
 669                        if let Some(worktree) = worktrees
 670                            .iter_mut()
 671                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 672                        {
 673                            worktree.settings_files.push(WorktreeSettingsFile {
 674                                path: db_settings_file.path,
 675                                content: db_settings_file.content,
 676                            });
 677                        }
 678                    }
 679                }
 680
 681                let mut collaborators = project
 682                    .find_related(project_collaborator::Entity)
 683                    .all(&*tx)
 684                    .await?;
 685                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 686                    .iter()
 687                    .position(|collaborator| collaborator.user_id == user_id)
 688                {
 689                    collaborators.swap_remove(self_collaborator_ix)
 690                } else {
 691                    continue;
 692                };
 693                let old_connection_id = self_collaborator.connection();
 694                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 695                    connection_id: ActiveValue::set(connection.id as i32),
 696                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 697                    ..self_collaborator.into_active_model()
 698                })
 699                .exec(&*tx)
 700                .await?;
 701
 702                let collaborators = collaborators
 703                    .into_iter()
 704                    .map(|collaborator| ProjectCollaborator {
 705                        connection_id: collaborator.connection(),
 706                        user_id: collaborator.user_id,
 707                        replica_id: collaborator.replica_id,
 708                        is_host: collaborator.is_host,
 709                    })
 710                    .collect::<Vec<_>>();
 711
 712                rejoined_projects.push(RejoinedProject {
 713                    id: project_id,
 714                    old_connection_id,
 715                    collaborators,
 716                    worktrees,
 717                    language_servers,
 718                });
 719            }
 720
 721            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 722            let channel_members = if let Some(channel) = &channel {
 723                self.get_channel_participants(&channel, &tx).await?
 724            } else {
 725                Vec::new()
 726            };
 727
 728            Ok(RejoinedRoom {
 729                room,
 730                channel_id: channel.map(|channel| channel.id),
 731                channel_members,
 732                rejoined_projects,
 733                reshared_projects,
 734            })
 735        })
 736        .await
 737    }
 738
 739    pub async fn leave_room(
 740        &self,
 741        connection: ConnectionId,
 742    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 743        self.optional_room_transaction(|tx| async move {
 744            let leaving_participant = room_participant::Entity::find()
 745                .filter(
 746                    Condition::all()
 747                        .add(
 748                            room_participant::Column::AnsweringConnectionId
 749                                .eq(connection.id as i32),
 750                        )
 751                        .add(
 752                            room_participant::Column::AnsweringConnectionServerId
 753                                .eq(connection.owner_id as i32),
 754                        ),
 755                )
 756                .one(&*tx)
 757                .await?;
 758
 759            if let Some(leaving_participant) = leaving_participant {
 760                // Leave room.
 761                let room_id = leaving_participant.room_id;
 762                room_participant::Entity::delete_by_id(leaving_participant.id)
 763                    .exec(&*tx)
 764                    .await?;
 765
 766                // Cancel pending calls initiated by the leaving user.
 767                let called_participants = room_participant::Entity::find()
 768                    .filter(
 769                        Condition::all()
 770                            .add(
 771                                room_participant::Column::CallingUserId
 772                                    .eq(leaving_participant.user_id),
 773                            )
 774                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 775                    )
 776                    .all(&*tx)
 777                    .await?;
 778                room_participant::Entity::delete_many()
 779                    .filter(
 780                        room_participant::Column::Id
 781                            .is_in(called_participants.iter().map(|participant| participant.id)),
 782                    )
 783                    .exec(&*tx)
 784                    .await?;
 785                let canceled_calls_to_user_ids = called_participants
 786                    .into_iter()
 787                    .map(|participant| participant.user_id)
 788                    .collect();
 789
 790                // Detect left projects.
 791                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 792                enum QueryProjectIds {
 793                    ProjectId,
 794                }
 795                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 796                    .select_only()
 797                    .column_as(
 798                        project_collaborator::Column::ProjectId,
 799                        QueryProjectIds::ProjectId,
 800                    )
 801                    .filter(
 802                        Condition::all()
 803                            .add(
 804                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 805                            )
 806                            .add(
 807                                project_collaborator::Column::ConnectionServerId
 808                                    .eq(connection.owner_id as i32),
 809                            ),
 810                    )
 811                    .into_values::<_, QueryProjectIds>()
 812                    .all(&*tx)
 813                    .await?;
 814                let mut left_projects = HashMap::default();
 815                let mut collaborators = project_collaborator::Entity::find()
 816                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 817                    .stream(&*tx)
 818                    .await?;
 819                while let Some(collaborator) = collaborators.next().await {
 820                    let collaborator = collaborator?;
 821                    let left_project =
 822                        left_projects
 823                            .entry(collaborator.project_id)
 824                            .or_insert(LeftProject {
 825                                id: collaborator.project_id,
 826                                host_user_id: Default::default(),
 827                                connection_ids: Default::default(),
 828                                host_connection_id: Default::default(),
 829                            });
 830
 831                    let collaborator_connection_id = collaborator.connection();
 832                    if collaborator_connection_id != connection {
 833                        left_project.connection_ids.push(collaborator_connection_id);
 834                    }
 835
 836                    if collaborator.is_host {
 837                        left_project.host_user_id = collaborator.user_id;
 838                        left_project.host_connection_id = collaborator_connection_id;
 839                    }
 840                }
 841                drop(collaborators);
 842
 843                // Leave projects.
 844                project_collaborator::Entity::delete_many()
 845                    .filter(
 846                        Condition::all()
 847                            .add(
 848                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 849                            )
 850                            .add(
 851                                project_collaborator::Column::ConnectionServerId
 852                                    .eq(connection.owner_id as i32),
 853                            ),
 854                    )
 855                    .exec(&*tx)
 856                    .await?;
 857
 858                // Unshare projects.
 859                project::Entity::delete_many()
 860                    .filter(
 861                        Condition::all()
 862                            .add(project::Column::RoomId.eq(room_id))
 863                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 864                            .add(
 865                                project::Column::HostConnectionServerId
 866                                    .eq(connection.owner_id as i32),
 867                            ),
 868                    )
 869                    .exec(&*tx)
 870                    .await?;
 871
 872                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 873                let deleted = if room.participants.is_empty() {
 874                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 875                    result.rows_affected > 0
 876                } else {
 877                    false
 878                };
 879
 880                let channel_members = if let Some(channel) = &channel {
 881                    self.get_channel_participants(channel, &tx).await?
 882                } else {
 883                    Vec::new()
 884                };
 885                let left_room = LeftRoom {
 886                    room,
 887                    channel_id: channel.map(|channel| channel.id),
 888                    channel_members,
 889                    left_projects,
 890                    canceled_calls_to_user_ids,
 891                    deleted,
 892                };
 893
 894                if left_room.room.participants.is_empty() {
 895                    self.rooms.remove(&room_id);
 896                }
 897
 898                Ok(Some((room_id, left_room)))
 899            } else {
 900                Ok(None)
 901            }
 902        })
 903        .await
 904    }
 905
 906    pub async fn update_room_participant_location(
 907        &self,
 908        room_id: RoomId,
 909        connection: ConnectionId,
 910        location: proto::ParticipantLocation,
 911    ) -> Result<RoomGuard<proto::Room>> {
 912        self.room_transaction(room_id, |tx| async {
 913            let tx = tx;
 914            let location_kind;
 915            let location_project_id;
 916            match location
 917                .variant
 918                .as_ref()
 919                .ok_or_else(|| anyhow!("invalid location"))?
 920            {
 921                proto::participant_location::Variant::SharedProject(project) => {
 922                    location_kind = 0;
 923                    location_project_id = Some(ProjectId::from_proto(project.id));
 924                }
 925                proto::participant_location::Variant::UnsharedProject(_) => {
 926                    location_kind = 1;
 927                    location_project_id = None;
 928                }
 929                proto::participant_location::Variant::External(_) => {
 930                    location_kind = 2;
 931                    location_project_id = None;
 932                }
 933            }
 934
 935            let result = room_participant::Entity::update_many()
 936                .filter(
 937                    Condition::all()
 938                        .add(room_participant::Column::RoomId.eq(room_id))
 939                        .add(
 940                            room_participant::Column::AnsweringConnectionId
 941                                .eq(connection.id as i32),
 942                        )
 943                        .add(
 944                            room_participant::Column::AnsweringConnectionServerId
 945                                .eq(connection.owner_id as i32),
 946                        ),
 947                )
 948                .set(room_participant::ActiveModel {
 949                    location_kind: ActiveValue::set(Some(location_kind)),
 950                    location_project_id: ActiveValue::set(location_project_id),
 951                    ..Default::default()
 952                })
 953                .exec(&*tx)
 954                .await?;
 955
 956            if result.rows_affected == 1 {
 957                let room = self.get_room(room_id, &tx).await?;
 958                Ok(room)
 959            } else {
 960                Err(anyhow!("could not update room participant location"))?
 961            }
 962        })
 963        .await
 964    }
 965
 966    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
 967        self.transaction(|tx| async move {
 968            self.room_connection_lost(connection, &*tx).await?;
 969            self.channel_buffer_connection_lost(connection, &*tx)
 970                .await?;
 971            self.channel_chat_connection_lost(connection, &*tx).await?;
 972            Ok(())
 973        })
 974        .await
 975    }
 976
 977    pub async fn room_connection_lost(
 978        &self,
 979        connection: ConnectionId,
 980        tx: &DatabaseTransaction,
 981    ) -> Result<()> {
 982        let participant = room_participant::Entity::find()
 983            .filter(
 984                Condition::all()
 985                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
 986                    .add(
 987                        room_participant::Column::AnsweringConnectionServerId
 988                            .eq(connection.owner_id as i32),
 989                    ),
 990            )
 991            .one(&*tx)
 992            .await?;
 993
 994        if let Some(participant) = participant {
 995            room_participant::Entity::update(room_participant::ActiveModel {
 996                answering_connection_lost: ActiveValue::set(true),
 997                ..participant.into_active_model()
 998            })
 999            .exec(&*tx)
1000            .await?;
1001        }
1002        Ok(())
1003    }
1004
1005    fn build_incoming_call(
1006        room: &proto::Room,
1007        called_user_id: UserId,
1008    ) -> Option<proto::IncomingCall> {
1009        let pending_participant = room
1010            .pending_participants
1011            .iter()
1012            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1013
1014        Some(proto::IncomingCall {
1015            room_id: room.id,
1016            calling_user_id: pending_participant.calling_user_id,
1017            participant_user_ids: room
1018                .participants
1019                .iter()
1020                .map(|participant| participant.user_id)
1021                .collect(),
1022            initial_project: room.participants.iter().find_map(|participant| {
1023                let initial_project_id = pending_participant.initial_project_id?;
1024                participant
1025                    .projects
1026                    .iter()
1027                    .find(|project| project.id == initial_project_id)
1028                    .cloned()
1029            }),
1030        })
1031    }
1032
1033    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1034        let (_, room) = self.get_channel_room(room_id, tx).await?;
1035        Ok(room)
1036    }
1037
1038    pub async fn room_connection_ids(
1039        &self,
1040        room_id: RoomId,
1041        connection_id: ConnectionId,
1042    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1043        self.room_transaction(room_id, |tx| async move {
1044            let mut participants = room_participant::Entity::find()
1045                .filter(room_participant::Column::RoomId.eq(room_id))
1046                .stream(&*tx)
1047                .await?;
1048
1049            let mut is_participant = false;
1050            let mut connection_ids = HashSet::default();
1051            while let Some(participant) = participants.next().await {
1052                let participant = participant?;
1053                if let Some(answering_connection) = participant.answering_connection() {
1054                    if answering_connection == connection_id {
1055                        is_participant = true;
1056                    } else {
1057                        connection_ids.insert(answering_connection);
1058                    }
1059                }
1060            }
1061
1062            if !is_participant {
1063                Err(anyhow!("not a room participant"))?;
1064            }
1065
1066            Ok(connection_ids)
1067        })
1068        .await
1069    }
1070
1071    async fn get_channel_room(
1072        &self,
1073        room_id: RoomId,
1074        tx: &DatabaseTransaction,
1075    ) -> Result<(Option<channel::Model>, proto::Room)> {
1076        let db_room = room::Entity::find_by_id(room_id)
1077            .one(tx)
1078            .await?
1079            .ok_or_else(|| anyhow!("could not find room"))?;
1080
1081        let mut db_participants = db_room
1082            .find_related(room_participant::Entity)
1083            .stream(tx)
1084            .await?;
1085        let mut participants = HashMap::default();
1086        let mut pending_participants = Vec::new();
1087        while let Some(db_participant) = db_participants.next().await {
1088            let db_participant = db_participant?;
1089            if let (
1090                Some(answering_connection_id),
1091                Some(answering_connection_server_id),
1092                Some(participant_index),
1093            ) = (
1094                db_participant.answering_connection_id,
1095                db_participant.answering_connection_server_id,
1096                db_participant.participant_index,
1097            ) {
1098                let location = match (
1099                    db_participant.location_kind,
1100                    db_participant.location_project_id,
1101                ) {
1102                    (Some(0), Some(project_id)) => {
1103                        Some(proto::participant_location::Variant::SharedProject(
1104                            proto::participant_location::SharedProject {
1105                                id: project_id.to_proto(),
1106                            },
1107                        ))
1108                    }
1109                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1110                        Default::default(),
1111                    )),
1112                    _ => Some(proto::participant_location::Variant::External(
1113                        Default::default(),
1114                    )),
1115                };
1116
1117                let answering_connection = ConnectionId {
1118                    owner_id: answering_connection_server_id.0 as u32,
1119                    id: answering_connection_id as u32,
1120                };
1121                participants.insert(
1122                    answering_connection,
1123                    proto::Participant {
1124                        user_id: db_participant.user_id.to_proto(),
1125                        peer_id: Some(answering_connection.into()),
1126                        projects: Default::default(),
1127                        location: Some(proto::ParticipantLocation { variant: location }),
1128                        participant_index: participant_index as u32,
1129                    },
1130                );
1131            } else {
1132                pending_participants.push(proto::PendingParticipant {
1133                    user_id: db_participant.user_id.to_proto(),
1134                    calling_user_id: db_participant.calling_user_id.to_proto(),
1135                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1136                });
1137            }
1138        }
1139        drop(db_participants);
1140
1141        let mut db_projects = db_room
1142            .find_related(project::Entity)
1143            .find_with_related(worktree::Entity)
1144            .stream(tx)
1145            .await?;
1146
1147        while let Some(row) = db_projects.next().await {
1148            let (db_project, db_worktree) = row?;
1149            let host_connection = db_project.host_connection()?;
1150            if let Some(participant) = participants.get_mut(&host_connection) {
1151                let project = if let Some(project) = participant
1152                    .projects
1153                    .iter_mut()
1154                    .find(|project| project.id == db_project.id.to_proto())
1155                {
1156                    project
1157                } else {
1158                    participant.projects.push(proto::ParticipantProject {
1159                        id: db_project.id.to_proto(),
1160                        worktree_root_names: Default::default(),
1161                    });
1162                    participant.projects.last_mut().unwrap()
1163                };
1164
1165                if let Some(db_worktree) = db_worktree {
1166                    if db_worktree.visible {
1167                        project.worktree_root_names.push(db_worktree.root_name);
1168                    }
1169                }
1170            }
1171        }
1172        drop(db_projects);
1173
1174        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1175        let mut followers = Vec::new();
1176        while let Some(db_follower) = db_followers.next().await {
1177            let db_follower = db_follower?;
1178            followers.push(proto::Follower {
1179                leader_id: Some(db_follower.leader_connection().into()),
1180                follower_id: Some(db_follower.follower_connection().into()),
1181                project_id: db_follower.project_id.to_proto(),
1182            });
1183        }
1184        drop(db_followers);
1185
1186        let channel = if let Some(channel_id) = db_room.channel_id {
1187            Some(self.get_channel_internal(channel_id, &*tx).await?)
1188        } else {
1189            None
1190        };
1191
1192        Ok((
1193            channel,
1194            proto::Room {
1195                id: db_room.id.to_proto(),
1196                live_kit_room: db_room.live_kit_room,
1197                participants: participants.into_values().collect(),
1198                pending_participants,
1199                followers,
1200            },
1201        ))
1202    }
1203}