rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel_id) = channel_id {
  56                channel_members = self
  57                    .get_channel_participants_internal(channel_id, &tx)
  58                    .await?;
  59            } else {
  60                channel_members = Vec::new();
  61
  62                // Delete the room if it becomes empty.
  63                if room.participants.is_empty() {
  64                    project::Entity::delete_many()
  65                        .filter(project::Column::RoomId.eq(room_id))
  66                        .exec(&*tx)
  67                        .await?;
  68                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  69                }
  70            };
  71
  72            Ok(RefreshedRoom {
  73                room,
  74                channel_id,
  75                channel_members,
  76                stale_participant_user_ids,
  77                canceled_calls_to_user_ids,
  78            })
  79        })
  80        .await
  81    }
  82
  83    pub async fn incoming_call_for_user(
  84        &self,
  85        user_id: UserId,
  86    ) -> Result<Option<proto::IncomingCall>> {
  87        self.transaction(|tx| async move {
  88            let pending_participant = room_participant::Entity::find()
  89                .filter(
  90                    room_participant::Column::UserId
  91                        .eq(user_id)
  92                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  93                )
  94                .one(&*tx)
  95                .await?;
  96
  97            if let Some(pending_participant) = pending_participant {
  98                let room = self.get_room(pending_participant.room_id, &tx).await?;
  99                Ok(Self::build_incoming_call(&room, user_id))
 100            } else {
 101                Ok(None)
 102            }
 103        })
 104        .await
 105    }
 106
 107    pub async fn create_room(
 108        &self,
 109        user_id: UserId,
 110        connection: ConnectionId,
 111        live_kit_room: &str,
 112        release_channel: &str,
 113    ) -> Result<proto::Room> {
 114        self.transaction(|tx| async move {
 115            let room = room::ActiveModel {
 116                live_kit_room: ActiveValue::set(live_kit_room.into()),
 117                enviroment: ActiveValue::set(Some(release_channel.to_string())),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            room_participant::ActiveModel {
 123                room_id: ActiveValue::set(room.id),
 124                user_id: ActiveValue::set(user_id),
 125                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 126                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 127                    connection.owner_id as i32,
 128                ))),
 129                answering_connection_lost: ActiveValue::set(false),
 130                calling_user_id: ActiveValue::set(user_id),
 131                calling_connection_id: ActiveValue::set(connection.id as i32),
 132                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 133                    connection.owner_id as i32,
 134                ))),
 135                participant_index: ActiveValue::set(Some(0)),
 136                ..Default::default()
 137            }
 138            .insert(&*tx)
 139            .await?;
 140
 141            let room = self.get_room(room.id, &tx).await?;
 142            Ok(room)
 143        })
 144        .await
 145    }
 146
 147    pub async fn call(
 148        &self,
 149        room_id: RoomId,
 150        calling_user_id: UserId,
 151        calling_connection: ConnectionId,
 152        called_user_id: UserId,
 153        initial_project_id: Option<ProjectId>,
 154    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 155        self.room_transaction(room_id, |tx| async move {
 156            room_participant::ActiveModel {
 157                room_id: ActiveValue::set(room_id),
 158                user_id: ActiveValue::set(called_user_id),
 159                answering_connection_lost: ActiveValue::set(false),
 160                participant_index: ActiveValue::NotSet,
 161                calling_user_id: ActiveValue::set(calling_user_id),
 162                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 163                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 164                    calling_connection.owner_id as i32,
 165                ))),
 166                initial_project_id: ActiveValue::set(initial_project_id),
 167                ..Default::default()
 168            }
 169            .insert(&*tx)
 170            .await?;
 171
 172            let room = self.get_room(room_id, &tx).await?;
 173            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 174                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 175            Ok((room, incoming_call))
 176        })
 177        .await
 178    }
 179
 180    pub async fn call_failed(
 181        &self,
 182        room_id: RoomId,
 183        called_user_id: UserId,
 184    ) -> Result<RoomGuard<proto::Room>> {
 185        self.room_transaction(room_id, |tx| async move {
 186            room_participant::Entity::delete_many()
 187                .filter(
 188                    room_participant::Column::RoomId
 189                        .eq(room_id)
 190                        .and(room_participant::Column::UserId.eq(called_user_id)),
 191                )
 192                .exec(&*tx)
 193                .await?;
 194            let room = self.get_room(room_id, &tx).await?;
 195            Ok(room)
 196        })
 197        .await
 198    }
 199
 200    pub async fn decline_call(
 201        &self,
 202        expected_room_id: Option<RoomId>,
 203        user_id: UserId,
 204    ) -> Result<Option<RoomGuard<proto::Room>>> {
 205        self.optional_room_transaction(|tx| async move {
 206            let mut filter = Condition::all()
 207                .add(room_participant::Column::UserId.eq(user_id))
 208                .add(room_participant::Column::AnsweringConnectionId.is_null());
 209            if let Some(room_id) = expected_room_id {
 210                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 211            }
 212            let participant = room_participant::Entity::find()
 213                .filter(filter)
 214                .one(&*tx)
 215                .await?;
 216
 217            let participant = if let Some(participant) = participant {
 218                participant
 219            } else if expected_room_id.is_some() {
 220                return Err(anyhow!("could not find call to decline"))?;
 221            } else {
 222                return Ok(None);
 223            };
 224
 225            let room_id = participant.room_id;
 226            room_participant::Entity::delete(participant.into_active_model())
 227                .exec(&*tx)
 228                .await?;
 229
 230            let room = self.get_room(room_id, &tx).await?;
 231            Ok(Some((room_id, room)))
 232        })
 233        .await
 234    }
 235
 236    pub async fn cancel_call(
 237        &self,
 238        room_id: RoomId,
 239        calling_connection: ConnectionId,
 240        called_user_id: UserId,
 241    ) -> Result<RoomGuard<proto::Room>> {
 242        self.room_transaction(room_id, |tx| async move {
 243            let participant = room_participant::Entity::find()
 244                .filter(
 245                    Condition::all()
 246                        .add(room_participant::Column::UserId.eq(called_user_id))
 247                        .add(room_participant::Column::RoomId.eq(room_id))
 248                        .add(
 249                            room_participant::Column::CallingConnectionId
 250                                .eq(calling_connection.id as i32),
 251                        )
 252                        .add(
 253                            room_participant::Column::CallingConnectionServerId
 254                                .eq(calling_connection.owner_id as i32),
 255                        )
 256                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 257                )
 258                .one(&*tx)
 259                .await?
 260                .ok_or_else(|| anyhow!("no call to cancel"))?;
 261
 262            room_participant::Entity::delete(participant.into_active_model())
 263                .exec(&*tx)
 264                .await?;
 265
 266            let room = self.get_room(room_id, &tx).await?;
 267            Ok(room)
 268        })
 269        .await
 270    }
 271
 272    pub async fn join_room(
 273        &self,
 274        room_id: RoomId,
 275        user_id: UserId,
 276        connection: ConnectionId,
 277        enviroment: &str,
 278    ) -> Result<RoomGuard<JoinRoom>> {
 279        self.room_transaction(room_id, |tx| async move {
 280            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 281            enum QueryChannelIdAndEnviroment {
 282                ChannelId,
 283                Enviroment,
 284            }
 285
 286            let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
 287                room::Entity::find()
 288                    .select_only()
 289                    .column(room::Column::ChannelId)
 290                    .column(room::Column::Enviroment)
 291                    .filter(room::Column::Id.eq(room_id))
 292                    .into_values::<_, QueryChannelIdAndEnviroment>()
 293                    .one(&*tx)
 294                    .await?
 295                    .ok_or_else(|| anyhow!("no such room"))?;
 296
 297            if let Some(release_channel) = release_channel {
 298                if &release_channel != enviroment {
 299                    Err(anyhow!("must join using the {} release", release_channel))?;
 300                }
 301            }
 302
 303            if channel_id.is_some() {
 304                Err(anyhow!("tried to join channel call directly"))?
 305            }
 306
 307            let participant_index = self
 308                .get_next_participant_index_internal(room_id, &*tx)
 309                .await?;
 310
 311            let result = room_participant::Entity::update_many()
 312                .filter(
 313                    Condition::all()
 314                        .add(room_participant::Column::RoomId.eq(room_id))
 315                        .add(room_participant::Column::UserId.eq(user_id))
 316                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 317                )
 318                .set(room_participant::ActiveModel {
 319                    participant_index: ActiveValue::Set(Some(participant_index)),
 320                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 321                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 322                        connection.owner_id as i32,
 323                    ))),
 324                    answering_connection_lost: ActiveValue::set(false),
 325                    ..Default::default()
 326                })
 327                .exec(&*tx)
 328                .await?;
 329            if result.rows_affected == 0 {
 330                Err(anyhow!("room does not exist or was already joined"))?;
 331            }
 332
 333            let room = self.get_room(room_id, &tx).await?;
 334            Ok(JoinRoom {
 335                room,
 336                channel_id: None,
 337                channel_members: vec![],
 338            })
 339        })
 340        .await
 341    }
 342
 343    async fn get_next_participant_index_internal(
 344        &self,
 345        room_id: RoomId,
 346        tx: &DatabaseTransaction,
 347    ) -> Result<i32> {
 348        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 349        enum QueryParticipantIndices {
 350            ParticipantIndex,
 351        }
 352        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 353            .filter(
 354                room_participant::Column::RoomId
 355                    .eq(room_id)
 356                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 357            )
 358            .select_only()
 359            .column(room_participant::Column::ParticipantIndex)
 360            .into_values::<_, QueryParticipantIndices>()
 361            .all(&*tx)
 362            .await?;
 363
 364        let mut participant_index = 0;
 365        while existing_participant_indices.contains(&participant_index) {
 366            participant_index += 1;
 367        }
 368
 369        Ok(participant_index)
 370    }
 371
 372    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 373        self.transaction(|tx| async move {
 374            let room: Option<room::Model> = room::Entity::find()
 375                .filter(room::Column::Id.eq(room_id))
 376                .one(&*tx)
 377                .await?;
 378
 379            Ok(room.and_then(|room| room.channel_id))
 380        })
 381        .await
 382    }
 383
 384    pub(crate) async fn join_channel_room_internal(
 385        &self,
 386        channel_id: ChannelId,
 387        room_id: RoomId,
 388        user_id: UserId,
 389        connection: ConnectionId,
 390        tx: &DatabaseTransaction,
 391    ) -> Result<JoinRoom> {
 392        let participant_index = self
 393            .get_next_participant_index_internal(room_id, &*tx)
 394            .await?;
 395
 396        room_participant::Entity::insert_many([room_participant::ActiveModel {
 397            room_id: ActiveValue::set(room_id),
 398            user_id: ActiveValue::set(user_id),
 399            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 400            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 401                connection.owner_id as i32,
 402            ))),
 403            answering_connection_lost: ActiveValue::set(false),
 404            calling_user_id: ActiveValue::set(user_id),
 405            calling_connection_id: ActiveValue::set(connection.id as i32),
 406            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 407                connection.owner_id as i32,
 408            ))),
 409            participant_index: ActiveValue::Set(Some(participant_index)),
 410            ..Default::default()
 411        }])
 412        .on_conflict(
 413            OnConflict::columns([room_participant::Column::UserId])
 414                .update_columns([
 415                    room_participant::Column::AnsweringConnectionId,
 416                    room_participant::Column::AnsweringConnectionServerId,
 417                    room_participant::Column::AnsweringConnectionLost,
 418                    room_participant::Column::ParticipantIndex,
 419                ])
 420                .to_owned(),
 421        )
 422        .exec(&*tx)
 423        .await?;
 424
 425        let room = self.get_room(room_id, &tx).await?;
 426        let channel_members = self
 427            .get_channel_participants_internal(channel_id, &tx)
 428            .await?;
 429        Ok(JoinRoom {
 430            room,
 431            channel_id: Some(channel_id),
 432            channel_members,
 433        })
 434    }
 435
 436    pub async fn rejoin_room(
 437        &self,
 438        rejoin_room: proto::RejoinRoom,
 439        user_id: UserId,
 440        connection: ConnectionId,
 441    ) -> Result<RoomGuard<RejoinedRoom>> {
 442        let room_id = RoomId::from_proto(rejoin_room.id);
 443        self.room_transaction(room_id, |tx| async {
 444            let tx = tx;
 445            let participant_update = room_participant::Entity::update_many()
 446                .filter(
 447                    Condition::all()
 448                        .add(room_participant::Column::RoomId.eq(room_id))
 449                        .add(room_participant::Column::UserId.eq(user_id))
 450                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 451                        .add(
 452                            Condition::any()
 453                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 454                                .add(
 455                                    room_participant::Column::AnsweringConnectionServerId
 456                                        .ne(connection.owner_id as i32),
 457                                ),
 458                        ),
 459                )
 460                .set(room_participant::ActiveModel {
 461                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 462                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 463                        connection.owner_id as i32,
 464                    ))),
 465                    answering_connection_lost: ActiveValue::set(false),
 466                    ..Default::default()
 467                })
 468                .exec(&*tx)
 469                .await?;
 470            if participant_update.rows_affected == 0 {
 471                return Err(anyhow!("room does not exist or was already joined"))?;
 472            }
 473
 474            let mut reshared_projects = Vec::new();
 475            for reshared_project in &rejoin_room.reshared_projects {
 476                let project_id = ProjectId::from_proto(reshared_project.project_id);
 477                let project = project::Entity::find_by_id(project_id)
 478                    .one(&*tx)
 479                    .await?
 480                    .ok_or_else(|| anyhow!("project does not exist"))?;
 481                if project.host_user_id != user_id {
 482                    return Err(anyhow!("no such project"))?;
 483                }
 484
 485                let mut collaborators = project
 486                    .find_related(project_collaborator::Entity)
 487                    .all(&*tx)
 488                    .await?;
 489                let host_ix = collaborators
 490                    .iter()
 491                    .position(|collaborator| {
 492                        collaborator.user_id == user_id && collaborator.is_host
 493                    })
 494                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 495                let host = collaborators.swap_remove(host_ix);
 496                let old_connection_id = host.connection();
 497
 498                project::Entity::update(project::ActiveModel {
 499                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 500                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 501                        connection.owner_id as i32,
 502                    ))),
 503                    ..project.into_active_model()
 504                })
 505                .exec(&*tx)
 506                .await?;
 507                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 508                    connection_id: ActiveValue::set(connection.id as i32),
 509                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 510                    ..host.into_active_model()
 511                })
 512                .exec(&*tx)
 513                .await?;
 514
 515                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 516                    .await?;
 517
 518                reshared_projects.push(ResharedProject {
 519                    id: project_id,
 520                    old_connection_id,
 521                    collaborators: collaborators
 522                        .iter()
 523                        .map(|collaborator| ProjectCollaborator {
 524                            connection_id: collaborator.connection(),
 525                            user_id: collaborator.user_id,
 526                            replica_id: collaborator.replica_id,
 527                            is_host: collaborator.is_host,
 528                        })
 529                        .collect(),
 530                    worktrees: reshared_project.worktrees.clone(),
 531                });
 532            }
 533
 534            project::Entity::delete_many()
 535                .filter(
 536                    Condition::all()
 537                        .add(project::Column::RoomId.eq(room_id))
 538                        .add(project::Column::HostUserId.eq(user_id))
 539                        .add(
 540                            project::Column::Id
 541                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 542                        ),
 543                )
 544                .exec(&*tx)
 545                .await?;
 546
 547            let mut rejoined_projects = Vec::new();
 548            for rejoined_project in &rejoin_room.rejoined_projects {
 549                let project_id = ProjectId::from_proto(rejoined_project.id);
 550                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 551                    continue;
 552                };
 553
 554                let mut worktrees = Vec::new();
 555                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 556                for db_worktree in db_worktrees {
 557                    let mut worktree = RejoinedWorktree {
 558                        id: db_worktree.id as u64,
 559                        abs_path: db_worktree.abs_path,
 560                        root_name: db_worktree.root_name,
 561                        visible: db_worktree.visible,
 562                        updated_entries: Default::default(),
 563                        removed_entries: Default::default(),
 564                        updated_repositories: Default::default(),
 565                        removed_repositories: Default::default(),
 566                        diagnostic_summaries: Default::default(),
 567                        settings_files: Default::default(),
 568                        scan_id: db_worktree.scan_id as u64,
 569                        completed_scan_id: db_worktree.completed_scan_id as u64,
 570                    };
 571
 572                    let rejoined_worktree = rejoined_project
 573                        .worktrees
 574                        .iter()
 575                        .find(|worktree| worktree.id == db_worktree.id as u64);
 576
 577                    // File entries
 578                    {
 579                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 580                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 581                        } else {
 582                            worktree_entry::Column::IsDeleted.eq(false)
 583                        };
 584
 585                        let mut db_entries = worktree_entry::Entity::find()
 586                            .filter(
 587                                Condition::all()
 588                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 589                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 590                                    .add(entry_filter),
 591                            )
 592                            .stream(&*tx)
 593                            .await?;
 594
 595                        while let Some(db_entry) = db_entries.next().await {
 596                            let db_entry = db_entry?;
 597                            if db_entry.is_deleted {
 598                                worktree.removed_entries.push(db_entry.id as u64);
 599                            } else {
 600                                worktree.updated_entries.push(proto::Entry {
 601                                    id: db_entry.id as u64,
 602                                    is_dir: db_entry.is_dir,
 603                                    path: db_entry.path,
 604                                    inode: db_entry.inode as u64,
 605                                    mtime: Some(proto::Timestamp {
 606                                        seconds: db_entry.mtime_seconds as u64,
 607                                        nanos: db_entry.mtime_nanos as u32,
 608                                    }),
 609                                    is_symlink: db_entry.is_symlink,
 610                                    is_ignored: db_entry.is_ignored,
 611                                    is_external: db_entry.is_external,
 612                                    git_status: db_entry.git_status.map(|status| status as i32),
 613                                });
 614                            }
 615                        }
 616                    }
 617
 618                    // Repository Entries
 619                    {
 620                        let repository_entry_filter =
 621                            if let Some(rejoined_worktree) = rejoined_worktree {
 622                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 623                            } else {
 624                                worktree_repository::Column::IsDeleted.eq(false)
 625                            };
 626
 627                        let mut db_repositories = worktree_repository::Entity::find()
 628                            .filter(
 629                                Condition::all()
 630                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 631                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 632                                    .add(repository_entry_filter),
 633                            )
 634                            .stream(&*tx)
 635                            .await?;
 636
 637                        while let Some(db_repository) = db_repositories.next().await {
 638                            let db_repository = db_repository?;
 639                            if db_repository.is_deleted {
 640                                worktree
 641                                    .removed_repositories
 642                                    .push(db_repository.work_directory_id as u64);
 643                            } else {
 644                                worktree.updated_repositories.push(proto::RepositoryEntry {
 645                                    work_directory_id: db_repository.work_directory_id as u64,
 646                                    branch: db_repository.branch,
 647                                });
 648                            }
 649                        }
 650                    }
 651
 652                    worktrees.push(worktree);
 653                }
 654
 655                let language_servers = project
 656                    .find_related(language_server::Entity)
 657                    .all(&*tx)
 658                    .await?
 659                    .into_iter()
 660                    .map(|language_server| proto::LanguageServer {
 661                        id: language_server.id as u64,
 662                        name: language_server.name,
 663                    })
 664                    .collect::<Vec<_>>();
 665
 666                {
 667                    let mut db_settings_files = worktree_settings_file::Entity::find()
 668                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 669                        .stream(&*tx)
 670                        .await?;
 671                    while let Some(db_settings_file) = db_settings_files.next().await {
 672                        let db_settings_file = db_settings_file?;
 673                        if let Some(worktree) = worktrees
 674                            .iter_mut()
 675                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 676                        {
 677                            worktree.settings_files.push(WorktreeSettingsFile {
 678                                path: db_settings_file.path,
 679                                content: db_settings_file.content,
 680                            });
 681                        }
 682                    }
 683                }
 684
 685                let mut collaborators = project
 686                    .find_related(project_collaborator::Entity)
 687                    .all(&*tx)
 688                    .await?;
 689                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 690                    .iter()
 691                    .position(|collaborator| collaborator.user_id == user_id)
 692                {
 693                    collaborators.swap_remove(self_collaborator_ix)
 694                } else {
 695                    continue;
 696                };
 697                let old_connection_id = self_collaborator.connection();
 698                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 699                    connection_id: ActiveValue::set(connection.id as i32),
 700                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 701                    ..self_collaborator.into_active_model()
 702                })
 703                .exec(&*tx)
 704                .await?;
 705
 706                let collaborators = collaborators
 707                    .into_iter()
 708                    .map(|collaborator| ProjectCollaborator {
 709                        connection_id: collaborator.connection(),
 710                        user_id: collaborator.user_id,
 711                        replica_id: collaborator.replica_id,
 712                        is_host: collaborator.is_host,
 713                    })
 714                    .collect::<Vec<_>>();
 715
 716                rejoined_projects.push(RejoinedProject {
 717                    id: project_id,
 718                    old_connection_id,
 719                    collaborators,
 720                    worktrees,
 721                    language_servers,
 722                });
 723            }
 724
 725            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 726            let channel_members = if let Some(channel_id) = channel_id {
 727                self.get_channel_participants_internal(channel_id, &tx)
 728                    .await?
 729            } else {
 730                Vec::new()
 731            };
 732
 733            Ok(RejoinedRoom {
 734                room,
 735                channel_id,
 736                channel_members,
 737                rejoined_projects,
 738                reshared_projects,
 739            })
 740        })
 741        .await
 742    }
 743
 744    pub async fn leave_room(
 745        &self,
 746        connection: ConnectionId,
 747    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 748        self.optional_room_transaction(|tx| async move {
 749            let leaving_participant = room_participant::Entity::find()
 750                .filter(
 751                    Condition::all()
 752                        .add(
 753                            room_participant::Column::AnsweringConnectionId
 754                                .eq(connection.id as i32),
 755                        )
 756                        .add(
 757                            room_participant::Column::AnsweringConnectionServerId
 758                                .eq(connection.owner_id as i32),
 759                        ),
 760                )
 761                .one(&*tx)
 762                .await?;
 763
 764            if let Some(leaving_participant) = leaving_participant {
 765                // Leave room.
 766                let room_id = leaving_participant.room_id;
 767                room_participant::Entity::delete_by_id(leaving_participant.id)
 768                    .exec(&*tx)
 769                    .await?;
 770
 771                // Cancel pending calls initiated by the leaving user.
 772                let called_participants = room_participant::Entity::find()
 773                    .filter(
 774                        Condition::all()
 775                            .add(
 776                                room_participant::Column::CallingUserId
 777                                    .eq(leaving_participant.user_id),
 778                            )
 779                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 780                    )
 781                    .all(&*tx)
 782                    .await?;
 783                room_participant::Entity::delete_many()
 784                    .filter(
 785                        room_participant::Column::Id
 786                            .is_in(called_participants.iter().map(|participant| participant.id)),
 787                    )
 788                    .exec(&*tx)
 789                    .await?;
 790                let canceled_calls_to_user_ids = called_participants
 791                    .into_iter()
 792                    .map(|participant| participant.user_id)
 793                    .collect();
 794
 795                // Detect left projects.
 796                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 797                enum QueryProjectIds {
 798                    ProjectId,
 799                }
 800                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 801                    .select_only()
 802                    .column_as(
 803                        project_collaborator::Column::ProjectId,
 804                        QueryProjectIds::ProjectId,
 805                    )
 806                    .filter(
 807                        Condition::all()
 808                            .add(
 809                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 810                            )
 811                            .add(
 812                                project_collaborator::Column::ConnectionServerId
 813                                    .eq(connection.owner_id as i32),
 814                            ),
 815                    )
 816                    .into_values::<_, QueryProjectIds>()
 817                    .all(&*tx)
 818                    .await?;
 819                let mut left_projects = HashMap::default();
 820                let mut collaborators = project_collaborator::Entity::find()
 821                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 822                    .stream(&*tx)
 823                    .await?;
 824                while let Some(collaborator) = collaborators.next().await {
 825                    let collaborator = collaborator?;
 826                    let left_project =
 827                        left_projects
 828                            .entry(collaborator.project_id)
 829                            .or_insert(LeftProject {
 830                                id: collaborator.project_id,
 831                                host_user_id: Default::default(),
 832                                connection_ids: Default::default(),
 833                                host_connection_id: Default::default(),
 834                            });
 835
 836                    let collaborator_connection_id = collaborator.connection();
 837                    if collaborator_connection_id != connection {
 838                        left_project.connection_ids.push(collaborator_connection_id);
 839                    }
 840
 841                    if collaborator.is_host {
 842                        left_project.host_user_id = collaborator.user_id;
 843                        left_project.host_connection_id = collaborator_connection_id;
 844                    }
 845                }
 846                drop(collaborators);
 847
 848                // Leave projects.
 849                project_collaborator::Entity::delete_many()
 850                    .filter(
 851                        Condition::all()
 852                            .add(
 853                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 854                            )
 855                            .add(
 856                                project_collaborator::Column::ConnectionServerId
 857                                    .eq(connection.owner_id as i32),
 858                            ),
 859                    )
 860                    .exec(&*tx)
 861                    .await?;
 862
 863                // Unshare projects.
 864                project::Entity::delete_many()
 865                    .filter(
 866                        Condition::all()
 867                            .add(project::Column::RoomId.eq(room_id))
 868                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 869                            .add(
 870                                project::Column::HostConnectionServerId
 871                                    .eq(connection.owner_id as i32),
 872                            ),
 873                    )
 874                    .exec(&*tx)
 875                    .await?;
 876
 877                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 878                let deleted = if room.participants.is_empty() {
 879                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 880                    result.rows_affected > 0
 881                } else {
 882                    false
 883                };
 884
 885                let channel_members = if let Some(channel_id) = channel_id {
 886                    self.get_channel_participants_internal(channel_id, &tx)
 887                        .await?
 888                } else {
 889                    Vec::new()
 890                };
 891                let left_room = LeftRoom {
 892                    room,
 893                    channel_id,
 894                    channel_members,
 895                    left_projects,
 896                    canceled_calls_to_user_ids,
 897                    deleted,
 898                };
 899
 900                if left_room.room.participants.is_empty() {
 901                    self.rooms.remove(&room_id);
 902                }
 903
 904                Ok(Some((room_id, left_room)))
 905            } else {
 906                Ok(None)
 907            }
 908        })
 909        .await
 910    }
 911
 912    pub async fn update_room_participant_location(
 913        &self,
 914        room_id: RoomId,
 915        connection: ConnectionId,
 916        location: proto::ParticipantLocation,
 917    ) -> Result<RoomGuard<proto::Room>> {
 918        self.room_transaction(room_id, |tx| async {
 919            let tx = tx;
 920            let location_kind;
 921            let location_project_id;
 922            match location
 923                .variant
 924                .as_ref()
 925                .ok_or_else(|| anyhow!("invalid location"))?
 926            {
 927                proto::participant_location::Variant::SharedProject(project) => {
 928                    location_kind = 0;
 929                    location_project_id = Some(ProjectId::from_proto(project.id));
 930                }
 931                proto::participant_location::Variant::UnsharedProject(_) => {
 932                    location_kind = 1;
 933                    location_project_id = None;
 934                }
 935                proto::participant_location::Variant::External(_) => {
 936                    location_kind = 2;
 937                    location_project_id = None;
 938                }
 939            }
 940
 941            let result = room_participant::Entity::update_many()
 942                .filter(
 943                    Condition::all()
 944                        .add(room_participant::Column::RoomId.eq(room_id))
 945                        .add(
 946                            room_participant::Column::AnsweringConnectionId
 947                                .eq(connection.id as i32),
 948                        )
 949                        .add(
 950                            room_participant::Column::AnsweringConnectionServerId
 951                                .eq(connection.owner_id as i32),
 952                        ),
 953                )
 954                .set(room_participant::ActiveModel {
 955                    location_kind: ActiveValue::set(Some(location_kind)),
 956                    location_project_id: ActiveValue::set(location_project_id),
 957                    ..Default::default()
 958                })
 959                .exec(&*tx)
 960                .await?;
 961
 962            if result.rows_affected == 1 {
 963                let room = self.get_room(room_id, &tx).await?;
 964                Ok(room)
 965            } else {
 966                Err(anyhow!("could not update room participant location"))?
 967            }
 968        })
 969        .await
 970    }
 971
 972    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
 973        self.transaction(|tx| async move {
 974            self.room_connection_lost(connection, &*tx).await?;
 975            self.channel_buffer_connection_lost(connection, &*tx)
 976                .await?;
 977            self.channel_chat_connection_lost(connection, &*tx).await?;
 978            Ok(())
 979        })
 980        .await
 981    }
 982
 983    pub async fn room_connection_lost(
 984        &self,
 985        connection: ConnectionId,
 986        tx: &DatabaseTransaction,
 987    ) -> Result<()> {
 988        let participant = room_participant::Entity::find()
 989            .filter(
 990                Condition::all()
 991                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
 992                    .add(
 993                        room_participant::Column::AnsweringConnectionServerId
 994                            .eq(connection.owner_id as i32),
 995                    ),
 996            )
 997            .one(&*tx)
 998            .await?;
 999
1000        if let Some(participant) = participant {
1001            room_participant::Entity::update(room_participant::ActiveModel {
1002                answering_connection_lost: ActiveValue::set(true),
1003                ..participant.into_active_model()
1004            })
1005            .exec(&*tx)
1006            .await?;
1007        }
1008        Ok(())
1009    }
1010
1011    fn build_incoming_call(
1012        room: &proto::Room,
1013        called_user_id: UserId,
1014    ) -> Option<proto::IncomingCall> {
1015        let pending_participant = room
1016            .pending_participants
1017            .iter()
1018            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1019
1020        Some(proto::IncomingCall {
1021            room_id: room.id,
1022            calling_user_id: pending_participant.calling_user_id,
1023            participant_user_ids: room
1024                .participants
1025                .iter()
1026                .map(|participant| participant.user_id)
1027                .collect(),
1028            initial_project: room.participants.iter().find_map(|participant| {
1029                let initial_project_id = pending_participant.initial_project_id?;
1030                participant
1031                    .projects
1032                    .iter()
1033                    .find(|project| project.id == initial_project_id)
1034                    .cloned()
1035            }),
1036        })
1037    }
1038
1039    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1040        let (_, room) = self.get_channel_room(room_id, tx).await?;
1041        Ok(room)
1042    }
1043
1044    pub async fn room_connection_ids(
1045        &self,
1046        room_id: RoomId,
1047        connection_id: ConnectionId,
1048    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1049        self.room_transaction(room_id, |tx| async move {
1050            let mut participants = room_participant::Entity::find()
1051                .filter(room_participant::Column::RoomId.eq(room_id))
1052                .stream(&*tx)
1053                .await?;
1054
1055            let mut is_participant = false;
1056            let mut connection_ids = HashSet::default();
1057            while let Some(participant) = participants.next().await {
1058                let participant = participant?;
1059                if let Some(answering_connection) = participant.answering_connection() {
1060                    if answering_connection == connection_id {
1061                        is_participant = true;
1062                    } else {
1063                        connection_ids.insert(answering_connection);
1064                    }
1065                }
1066            }
1067
1068            if !is_participant {
1069                Err(anyhow!("not a room participant"))?;
1070            }
1071
1072            Ok(connection_ids)
1073        })
1074        .await
1075    }
1076
1077    async fn get_channel_room(
1078        &self,
1079        room_id: RoomId,
1080        tx: &DatabaseTransaction,
1081    ) -> Result<(Option<ChannelId>, proto::Room)> {
1082        let db_room = room::Entity::find_by_id(room_id)
1083            .one(tx)
1084            .await?
1085            .ok_or_else(|| anyhow!("could not find room"))?;
1086
1087        let mut db_participants = db_room
1088            .find_related(room_participant::Entity)
1089            .stream(tx)
1090            .await?;
1091        let mut participants = HashMap::default();
1092        let mut pending_participants = Vec::new();
1093        while let Some(db_participant) = db_participants.next().await {
1094            let db_participant = db_participant?;
1095            if let (
1096                Some(answering_connection_id),
1097                Some(answering_connection_server_id),
1098                Some(participant_index),
1099            ) = (
1100                db_participant.answering_connection_id,
1101                db_participant.answering_connection_server_id,
1102                db_participant.participant_index,
1103            ) {
1104                let location = match (
1105                    db_participant.location_kind,
1106                    db_participant.location_project_id,
1107                ) {
1108                    (Some(0), Some(project_id)) => {
1109                        Some(proto::participant_location::Variant::SharedProject(
1110                            proto::participant_location::SharedProject {
1111                                id: project_id.to_proto(),
1112                            },
1113                        ))
1114                    }
1115                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1116                        Default::default(),
1117                    )),
1118                    _ => Some(proto::participant_location::Variant::External(
1119                        Default::default(),
1120                    )),
1121                };
1122
1123                let answering_connection = ConnectionId {
1124                    owner_id: answering_connection_server_id.0 as u32,
1125                    id: answering_connection_id as u32,
1126                };
1127                participants.insert(
1128                    answering_connection,
1129                    proto::Participant {
1130                        user_id: db_participant.user_id.to_proto(),
1131                        peer_id: Some(answering_connection.into()),
1132                        projects: Default::default(),
1133                        location: Some(proto::ParticipantLocation { variant: location }),
1134                        participant_index: participant_index as u32,
1135                    },
1136                );
1137            } else {
1138                pending_participants.push(proto::PendingParticipant {
1139                    user_id: db_participant.user_id.to_proto(),
1140                    calling_user_id: db_participant.calling_user_id.to_proto(),
1141                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1142                });
1143            }
1144        }
1145        drop(db_participants);
1146
1147        let mut db_projects = db_room
1148            .find_related(project::Entity)
1149            .find_with_related(worktree::Entity)
1150            .stream(tx)
1151            .await?;
1152
1153        while let Some(row) = db_projects.next().await {
1154            let (db_project, db_worktree) = row?;
1155            let host_connection = db_project.host_connection()?;
1156            if let Some(participant) = participants.get_mut(&host_connection) {
1157                let project = if let Some(project) = participant
1158                    .projects
1159                    .iter_mut()
1160                    .find(|project| project.id == db_project.id.to_proto())
1161                {
1162                    project
1163                } else {
1164                    participant.projects.push(proto::ParticipantProject {
1165                        id: db_project.id.to_proto(),
1166                        worktree_root_names: Default::default(),
1167                    });
1168                    participant.projects.last_mut().unwrap()
1169                };
1170
1171                if let Some(db_worktree) = db_worktree {
1172                    if db_worktree.visible {
1173                        project.worktree_root_names.push(db_worktree.root_name);
1174                    }
1175                }
1176            }
1177        }
1178        drop(db_projects);
1179
1180        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1181        let mut followers = Vec::new();
1182        while let Some(db_follower) = db_followers.next().await {
1183            let db_follower = db_follower?;
1184            followers.push(proto::Follower {
1185                leader_id: Some(db_follower.leader_connection().into()),
1186                follower_id: Some(db_follower.follower_connection().into()),
1187                project_id: db_follower.project_id.to_proto(),
1188            });
1189        }
1190
1191        Ok((
1192            db_room.channel_id,
1193            proto::Room {
1194                id: db_room.id.to_proto(),
1195                live_kit_room: db_room.live_kit_room,
1196                participants: participants.into_values().collect(),
1197                pending_participants,
1198                followers,
1199            },
1200        ))
1201    }
1202}