rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel_id) = channel_id {
  56                channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
  57            } else {
  58                channel_members = Vec::new();
  59
  60                // Delete the room if it becomes empty.
  61                if room.participants.is_empty() {
  62                    project::Entity::delete_many()
  63                        .filter(project::Column::RoomId.eq(room_id))
  64                        .exec(&*tx)
  65                        .await?;
  66                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  67                }
  68            };
  69
  70            Ok(RefreshedRoom {
  71                room,
  72                channel_id,
  73                channel_members,
  74                stale_participant_user_ids,
  75                canceled_calls_to_user_ids,
  76            })
  77        })
  78        .await
  79    }
  80
  81    pub async fn incoming_call_for_user(
  82        &self,
  83        user_id: UserId,
  84    ) -> Result<Option<proto::IncomingCall>> {
  85        self.transaction(|tx| async move {
  86            let pending_participant = room_participant::Entity::find()
  87                .filter(
  88                    room_participant::Column::UserId
  89                        .eq(user_id)
  90                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  91                )
  92                .one(&*tx)
  93                .await?;
  94
  95            if let Some(pending_participant) = pending_participant {
  96                let room = self.get_room(pending_participant.room_id, &tx).await?;
  97                Ok(Self::build_incoming_call(&room, user_id))
  98            } else {
  99                Ok(None)
 100            }
 101        })
 102        .await
 103    }
 104
 105    pub async fn create_room(
 106        &self,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109        live_kit_room: &str,
 110    ) -> Result<proto::Room> {
 111        self.transaction(|tx| async move {
 112            let room = room::ActiveModel {
 113                live_kit_room: ActiveValue::set(live_kit_room.into()),
 114                ..Default::default()
 115            }
 116            .insert(&*tx)
 117            .await?;
 118            room_participant::ActiveModel {
 119                room_id: ActiveValue::set(room.id),
 120                user_id: ActiveValue::set(user_id),
 121                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 122                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 123                    connection.owner_id as i32,
 124                ))),
 125                answering_connection_lost: ActiveValue::set(false),
 126                calling_user_id: ActiveValue::set(user_id),
 127                calling_connection_id: ActiveValue::set(connection.id as i32),
 128                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 129                    connection.owner_id as i32,
 130                ))),
 131                participant_index: ActiveValue::set(Some(0)),
 132                ..Default::default()
 133            }
 134            .insert(&*tx)
 135            .await?;
 136
 137            let room = self.get_room(room.id, &tx).await?;
 138            Ok(room)
 139        })
 140        .await
 141    }
 142
 143    pub async fn call(
 144        &self,
 145        room_id: RoomId,
 146        calling_user_id: UserId,
 147        calling_connection: ConnectionId,
 148        called_user_id: UserId,
 149        initial_project_id: Option<ProjectId>,
 150    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 151        self.room_transaction(room_id, |tx| async move {
 152            room_participant::ActiveModel {
 153                room_id: ActiveValue::set(room_id),
 154                user_id: ActiveValue::set(called_user_id),
 155                answering_connection_lost: ActiveValue::set(false),
 156                participant_index: ActiveValue::NotSet,
 157                calling_user_id: ActiveValue::set(calling_user_id),
 158                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 159                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 160                    calling_connection.owner_id as i32,
 161                ))),
 162                initial_project_id: ActiveValue::set(initial_project_id),
 163                ..Default::default()
 164            }
 165            .insert(&*tx)
 166            .await?;
 167
 168            let room = self.get_room(room_id, &tx).await?;
 169            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 170                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 171            Ok((room, incoming_call))
 172        })
 173        .await
 174    }
 175
 176    pub async fn call_failed(
 177        &self,
 178        room_id: RoomId,
 179        called_user_id: UserId,
 180    ) -> Result<RoomGuard<proto::Room>> {
 181        self.room_transaction(room_id, |tx| async move {
 182            room_participant::Entity::delete_many()
 183                .filter(
 184                    room_participant::Column::RoomId
 185                        .eq(room_id)
 186                        .and(room_participant::Column::UserId.eq(called_user_id)),
 187                )
 188                .exec(&*tx)
 189                .await?;
 190            let room = self.get_room(room_id, &tx).await?;
 191            Ok(room)
 192        })
 193        .await
 194    }
 195
 196    pub async fn decline_call(
 197        &self,
 198        expected_room_id: Option<RoomId>,
 199        user_id: UserId,
 200    ) -> Result<Option<RoomGuard<proto::Room>>> {
 201        self.optional_room_transaction(|tx| async move {
 202            let mut filter = Condition::all()
 203                .add(room_participant::Column::UserId.eq(user_id))
 204                .add(room_participant::Column::AnsweringConnectionId.is_null());
 205            if let Some(room_id) = expected_room_id {
 206                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 207            }
 208            let participant = room_participant::Entity::find()
 209                .filter(filter)
 210                .one(&*tx)
 211                .await?;
 212
 213            let participant = if let Some(participant) = participant {
 214                participant
 215            } else if expected_room_id.is_some() {
 216                return Err(anyhow!("could not find call to decline"))?;
 217            } else {
 218                return Ok(None);
 219            };
 220
 221            let room_id = participant.room_id;
 222            room_participant::Entity::delete(participant.into_active_model())
 223                .exec(&*tx)
 224                .await?;
 225
 226            let room = self.get_room(room_id, &tx).await?;
 227            Ok(Some((room_id, room)))
 228        })
 229        .await
 230    }
 231
 232    pub async fn cancel_call(
 233        &self,
 234        room_id: RoomId,
 235        calling_connection: ConnectionId,
 236        called_user_id: UserId,
 237    ) -> Result<RoomGuard<proto::Room>> {
 238        self.room_transaction(room_id, |tx| async move {
 239            let participant = room_participant::Entity::find()
 240                .filter(
 241                    Condition::all()
 242                        .add(room_participant::Column::UserId.eq(called_user_id))
 243                        .add(room_participant::Column::RoomId.eq(room_id))
 244                        .add(
 245                            room_participant::Column::CallingConnectionId
 246                                .eq(calling_connection.id as i32),
 247                        )
 248                        .add(
 249                            room_participant::Column::CallingConnectionServerId
 250                                .eq(calling_connection.owner_id as i32),
 251                        )
 252                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 253                )
 254                .one(&*tx)
 255                .await?
 256                .ok_or_else(|| anyhow!("no call to cancel"))?;
 257
 258            room_participant::Entity::delete(participant.into_active_model())
 259                .exec(&*tx)
 260                .await?;
 261
 262            let room = self.get_room(room_id, &tx).await?;
 263            Ok(room)
 264        })
 265        .await
 266    }
 267
 268    pub async fn join_room(
 269        &self,
 270        room_id: RoomId,
 271        user_id: UserId,
 272        connection: ConnectionId,
 273    ) -> Result<RoomGuard<JoinRoom>> {
 274        self.room_transaction(room_id, |tx| async move {
 275            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 276            enum QueryChannelId {
 277                ChannelId,
 278            }
 279            let channel_id: Option<ChannelId> = room::Entity::find()
 280                .select_only()
 281                .column(room::Column::ChannelId)
 282                .filter(room::Column::Id.eq(room_id))
 283                .into_values::<_, QueryChannelId>()
 284                .one(&*tx)
 285                .await?
 286                .ok_or_else(|| anyhow!("no such room"))?;
 287
 288            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 289            enum QueryParticipantIndices {
 290                ParticipantIndex,
 291            }
 292            let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 293                .filter(
 294                    room_participant::Column::RoomId
 295                        .eq(room_id)
 296                        .and(room_participant::Column::ParticipantIndex.is_not_null()),
 297                )
 298                .select_only()
 299                .column(room_participant::Column::ParticipantIndex)
 300                .into_values::<_, QueryParticipantIndices>()
 301                .all(&*tx)
 302                .await?;
 303            let mut participant_index = 0;
 304            while existing_participant_indices.contains(&participant_index) {
 305                participant_index += 1;
 306            }
 307
 308            if let Some(channel_id) = channel_id {
 309                self.check_user_is_channel_member(channel_id, user_id, &*tx)
 310                    .await?;
 311
 312                room_participant::Entity::insert_many([room_participant::ActiveModel {
 313                    room_id: ActiveValue::set(room_id),
 314                    user_id: ActiveValue::set(user_id),
 315                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 316                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 317                        connection.owner_id as i32,
 318                    ))),
 319                    answering_connection_lost: ActiveValue::set(false),
 320                    calling_user_id: ActiveValue::set(user_id),
 321                    calling_connection_id: ActiveValue::set(connection.id as i32),
 322                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
 323                        connection.owner_id as i32,
 324                    ))),
 325                    participant_index: ActiveValue::Set(Some(participant_index)),
 326                    ..Default::default()
 327                }])
 328                .on_conflict(
 329                    OnConflict::columns([room_participant::Column::UserId])
 330                        .update_columns([
 331                            room_participant::Column::AnsweringConnectionId,
 332                            room_participant::Column::AnsweringConnectionServerId,
 333                            room_participant::Column::AnsweringConnectionLost,
 334                            room_participant::Column::ParticipantIndex,
 335                        ])
 336                        .to_owned(),
 337                )
 338                .exec(&*tx)
 339                .await?;
 340            } else {
 341                let result = room_participant::Entity::update_many()
 342                    .filter(
 343                        Condition::all()
 344                            .add(room_participant::Column::RoomId.eq(room_id))
 345                            .add(room_participant::Column::UserId.eq(user_id))
 346                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 347                    )
 348                    .set(room_participant::ActiveModel {
 349                        participant_index: ActiveValue::Set(Some(participant_index)),
 350                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 351                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
 352                            connection.owner_id as i32,
 353                        ))),
 354                        answering_connection_lost: ActiveValue::set(false),
 355                        ..Default::default()
 356                    })
 357                    .exec(&*tx)
 358                    .await?;
 359                if result.rows_affected == 0 {
 360                    Err(anyhow!("room does not exist or was already joined"))?;
 361                }
 362            }
 363
 364            let room = self.get_room(room_id, &tx).await?;
 365            let channel_members = if let Some(channel_id) = channel_id {
 366                self.get_channel_members_internal(channel_id, &tx).await?
 367            } else {
 368                Vec::new()
 369            };
 370            Ok(JoinRoom {
 371                room,
 372                channel_id,
 373                channel_members,
 374            })
 375        })
 376        .await
 377    }
 378
 379    pub async fn rejoin_room(
 380        &self,
 381        rejoin_room: proto::RejoinRoom,
 382        user_id: UserId,
 383        connection: ConnectionId,
 384    ) -> Result<RoomGuard<RejoinedRoom>> {
 385        let room_id = RoomId::from_proto(rejoin_room.id);
 386        self.room_transaction(room_id, |tx| async {
 387            let tx = tx;
 388            let participant_update = room_participant::Entity::update_many()
 389                .filter(
 390                    Condition::all()
 391                        .add(room_participant::Column::RoomId.eq(room_id))
 392                        .add(room_participant::Column::UserId.eq(user_id))
 393                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 394                        .add(
 395                            Condition::any()
 396                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 397                                .add(
 398                                    room_participant::Column::AnsweringConnectionServerId
 399                                        .ne(connection.owner_id as i32),
 400                                ),
 401                        ),
 402                )
 403                .set(room_participant::ActiveModel {
 404                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 405                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 406                        connection.owner_id as i32,
 407                    ))),
 408                    answering_connection_lost: ActiveValue::set(false),
 409                    ..Default::default()
 410                })
 411                .exec(&*tx)
 412                .await?;
 413            if participant_update.rows_affected == 0 {
 414                return Err(anyhow!("room does not exist or was already joined"))?;
 415            }
 416
 417            let mut reshared_projects = Vec::new();
 418            for reshared_project in &rejoin_room.reshared_projects {
 419                let project_id = ProjectId::from_proto(reshared_project.project_id);
 420                let project = project::Entity::find_by_id(project_id)
 421                    .one(&*tx)
 422                    .await?
 423                    .ok_or_else(|| anyhow!("project does not exist"))?;
 424                if project.host_user_id != user_id {
 425                    return Err(anyhow!("no such project"))?;
 426                }
 427
 428                let mut collaborators = project
 429                    .find_related(project_collaborator::Entity)
 430                    .all(&*tx)
 431                    .await?;
 432                let host_ix = collaborators
 433                    .iter()
 434                    .position(|collaborator| {
 435                        collaborator.user_id == user_id && collaborator.is_host
 436                    })
 437                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 438                let host = collaborators.swap_remove(host_ix);
 439                let old_connection_id = host.connection();
 440
 441                project::Entity::update(project::ActiveModel {
 442                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 443                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 444                        connection.owner_id as i32,
 445                    ))),
 446                    ..project.into_active_model()
 447                })
 448                .exec(&*tx)
 449                .await?;
 450                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 451                    connection_id: ActiveValue::set(connection.id as i32),
 452                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 453                    ..host.into_active_model()
 454                })
 455                .exec(&*tx)
 456                .await?;
 457
 458                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 459                    .await?;
 460
 461                reshared_projects.push(ResharedProject {
 462                    id: project_id,
 463                    old_connection_id,
 464                    collaborators: collaborators
 465                        .iter()
 466                        .map(|collaborator| ProjectCollaborator {
 467                            connection_id: collaborator.connection(),
 468                            user_id: collaborator.user_id,
 469                            replica_id: collaborator.replica_id,
 470                            is_host: collaborator.is_host,
 471                        })
 472                        .collect(),
 473                    worktrees: reshared_project.worktrees.clone(),
 474                });
 475            }
 476
 477            project::Entity::delete_many()
 478                .filter(
 479                    Condition::all()
 480                        .add(project::Column::RoomId.eq(room_id))
 481                        .add(project::Column::HostUserId.eq(user_id))
 482                        .add(
 483                            project::Column::Id
 484                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 485                        ),
 486                )
 487                .exec(&*tx)
 488                .await?;
 489
 490            let mut rejoined_projects = Vec::new();
 491            for rejoined_project in &rejoin_room.rejoined_projects {
 492                let project_id = ProjectId::from_proto(rejoined_project.id);
 493                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 494                    continue;
 495                };
 496
 497                let mut worktrees = Vec::new();
 498                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 499                for db_worktree in db_worktrees {
 500                    let mut worktree = RejoinedWorktree {
 501                        id: db_worktree.id as u64,
 502                        abs_path: db_worktree.abs_path,
 503                        root_name: db_worktree.root_name,
 504                        visible: db_worktree.visible,
 505                        updated_entries: Default::default(),
 506                        removed_entries: Default::default(),
 507                        updated_repositories: Default::default(),
 508                        removed_repositories: Default::default(),
 509                        diagnostic_summaries: Default::default(),
 510                        settings_files: Default::default(),
 511                        scan_id: db_worktree.scan_id as u64,
 512                        completed_scan_id: db_worktree.completed_scan_id as u64,
 513                    };
 514
 515                    let rejoined_worktree = rejoined_project
 516                        .worktrees
 517                        .iter()
 518                        .find(|worktree| worktree.id == db_worktree.id as u64);
 519
 520                    // File entries
 521                    {
 522                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 523                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 524                        } else {
 525                            worktree_entry::Column::IsDeleted.eq(false)
 526                        };
 527
 528                        let mut db_entries = worktree_entry::Entity::find()
 529                            .filter(
 530                                Condition::all()
 531                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 532                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 533                                    .add(entry_filter),
 534                            )
 535                            .stream(&*tx)
 536                            .await?;
 537
 538                        while let Some(db_entry) = db_entries.next().await {
 539                            let db_entry = db_entry?;
 540                            if db_entry.is_deleted {
 541                                worktree.removed_entries.push(db_entry.id as u64);
 542                            } else {
 543                                worktree.updated_entries.push(proto::Entry {
 544                                    id: db_entry.id as u64,
 545                                    is_dir: db_entry.is_dir,
 546                                    path: db_entry.path,
 547                                    inode: db_entry.inode as u64,
 548                                    mtime: Some(proto::Timestamp {
 549                                        seconds: db_entry.mtime_seconds as u64,
 550                                        nanos: db_entry.mtime_nanos as u32,
 551                                    }),
 552                                    is_symlink: db_entry.is_symlink,
 553                                    is_ignored: db_entry.is_ignored,
 554                                    is_external: db_entry.is_external,
 555                                    git_status: db_entry.git_status.map(|status| status as i32),
 556                                });
 557                            }
 558                        }
 559                    }
 560
 561                    // Repository Entries
 562                    {
 563                        let repository_entry_filter =
 564                            if let Some(rejoined_worktree) = rejoined_worktree {
 565                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 566                            } else {
 567                                worktree_repository::Column::IsDeleted.eq(false)
 568                            };
 569
 570                        let mut db_repositories = worktree_repository::Entity::find()
 571                            .filter(
 572                                Condition::all()
 573                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 574                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 575                                    .add(repository_entry_filter),
 576                            )
 577                            .stream(&*tx)
 578                            .await?;
 579
 580                        while let Some(db_repository) = db_repositories.next().await {
 581                            let db_repository = db_repository?;
 582                            if db_repository.is_deleted {
 583                                worktree
 584                                    .removed_repositories
 585                                    .push(db_repository.work_directory_id as u64);
 586                            } else {
 587                                worktree.updated_repositories.push(proto::RepositoryEntry {
 588                                    work_directory_id: db_repository.work_directory_id as u64,
 589                                    branch: db_repository.branch,
 590                                });
 591                            }
 592                        }
 593                    }
 594
 595                    worktrees.push(worktree);
 596                }
 597
 598                let language_servers = project
 599                    .find_related(language_server::Entity)
 600                    .all(&*tx)
 601                    .await?
 602                    .into_iter()
 603                    .map(|language_server| proto::LanguageServer {
 604                        id: language_server.id as u64,
 605                        name: language_server.name,
 606                    })
 607                    .collect::<Vec<_>>();
 608
 609                {
 610                    let mut db_settings_files = worktree_settings_file::Entity::find()
 611                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 612                        .stream(&*tx)
 613                        .await?;
 614                    while let Some(db_settings_file) = db_settings_files.next().await {
 615                        let db_settings_file = db_settings_file?;
 616                        if let Some(worktree) = worktrees
 617                            .iter_mut()
 618                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 619                        {
 620                            worktree.settings_files.push(WorktreeSettingsFile {
 621                                path: db_settings_file.path,
 622                                content: db_settings_file.content,
 623                            });
 624                        }
 625                    }
 626                }
 627
 628                let mut collaborators = project
 629                    .find_related(project_collaborator::Entity)
 630                    .all(&*tx)
 631                    .await?;
 632                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 633                    .iter()
 634                    .position(|collaborator| collaborator.user_id == user_id)
 635                {
 636                    collaborators.swap_remove(self_collaborator_ix)
 637                } else {
 638                    continue;
 639                };
 640                let old_connection_id = self_collaborator.connection();
 641                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 642                    connection_id: ActiveValue::set(connection.id as i32),
 643                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 644                    ..self_collaborator.into_active_model()
 645                })
 646                .exec(&*tx)
 647                .await?;
 648
 649                let collaborators = collaborators
 650                    .into_iter()
 651                    .map(|collaborator| ProjectCollaborator {
 652                        connection_id: collaborator.connection(),
 653                        user_id: collaborator.user_id,
 654                        replica_id: collaborator.replica_id,
 655                        is_host: collaborator.is_host,
 656                    })
 657                    .collect::<Vec<_>>();
 658
 659                rejoined_projects.push(RejoinedProject {
 660                    id: project_id,
 661                    old_connection_id,
 662                    collaborators,
 663                    worktrees,
 664                    language_servers,
 665                });
 666            }
 667
 668            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 669            let channel_members = if let Some(channel_id) = channel_id {
 670                self.get_channel_members_internal(channel_id, &tx).await?
 671            } else {
 672                Vec::new()
 673            };
 674
 675            Ok(RejoinedRoom {
 676                room,
 677                channel_id,
 678                channel_members,
 679                rejoined_projects,
 680                reshared_projects,
 681            })
 682        })
 683        .await
 684    }
 685
 686    pub async fn leave_room(
 687        &self,
 688        connection: ConnectionId,
 689    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 690        self.optional_room_transaction(|tx| async move {
 691            let leaving_participant = room_participant::Entity::find()
 692                .filter(
 693                    Condition::all()
 694                        .add(
 695                            room_participant::Column::AnsweringConnectionId
 696                                .eq(connection.id as i32),
 697                        )
 698                        .add(
 699                            room_participant::Column::AnsweringConnectionServerId
 700                                .eq(connection.owner_id as i32),
 701                        ),
 702                )
 703                .one(&*tx)
 704                .await?;
 705
 706            if let Some(leaving_participant) = leaving_participant {
 707                // Leave room.
 708                let room_id = leaving_participant.room_id;
 709                room_participant::Entity::delete_by_id(leaving_participant.id)
 710                    .exec(&*tx)
 711                    .await?;
 712
 713                // Cancel pending calls initiated by the leaving user.
 714                let called_participants = room_participant::Entity::find()
 715                    .filter(
 716                        Condition::all()
 717                            .add(
 718                                room_participant::Column::CallingUserId
 719                                    .eq(leaving_participant.user_id),
 720                            )
 721                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 722                    )
 723                    .all(&*tx)
 724                    .await?;
 725                room_participant::Entity::delete_many()
 726                    .filter(
 727                        room_participant::Column::Id
 728                            .is_in(called_participants.iter().map(|participant| participant.id)),
 729                    )
 730                    .exec(&*tx)
 731                    .await?;
 732                let canceled_calls_to_user_ids = called_participants
 733                    .into_iter()
 734                    .map(|participant| participant.user_id)
 735                    .collect();
 736
 737                // Detect left projects.
 738                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 739                enum QueryProjectIds {
 740                    ProjectId,
 741                }
 742                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 743                    .select_only()
 744                    .column_as(
 745                        project_collaborator::Column::ProjectId,
 746                        QueryProjectIds::ProjectId,
 747                    )
 748                    .filter(
 749                        Condition::all()
 750                            .add(
 751                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 752                            )
 753                            .add(
 754                                project_collaborator::Column::ConnectionServerId
 755                                    .eq(connection.owner_id as i32),
 756                            ),
 757                    )
 758                    .into_values::<_, QueryProjectIds>()
 759                    .all(&*tx)
 760                    .await?;
 761                let mut left_projects = HashMap::default();
 762                let mut collaborators = project_collaborator::Entity::find()
 763                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 764                    .stream(&*tx)
 765                    .await?;
 766                while let Some(collaborator) = collaborators.next().await {
 767                    let collaborator = collaborator?;
 768                    let left_project =
 769                        left_projects
 770                            .entry(collaborator.project_id)
 771                            .or_insert(LeftProject {
 772                                id: collaborator.project_id,
 773                                host_user_id: Default::default(),
 774                                connection_ids: Default::default(),
 775                                host_connection_id: Default::default(),
 776                            });
 777
 778                    let collaborator_connection_id = collaborator.connection();
 779                    if collaborator_connection_id != connection {
 780                        left_project.connection_ids.push(collaborator_connection_id);
 781                    }
 782
 783                    if collaborator.is_host {
 784                        left_project.host_user_id = collaborator.user_id;
 785                        left_project.host_connection_id = collaborator_connection_id;
 786                    }
 787                }
 788                drop(collaborators);
 789
 790                // Leave projects.
 791                project_collaborator::Entity::delete_many()
 792                    .filter(
 793                        Condition::all()
 794                            .add(
 795                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 796                            )
 797                            .add(
 798                                project_collaborator::Column::ConnectionServerId
 799                                    .eq(connection.owner_id as i32),
 800                            ),
 801                    )
 802                    .exec(&*tx)
 803                    .await?;
 804
 805                // Unshare projects.
 806                project::Entity::delete_many()
 807                    .filter(
 808                        Condition::all()
 809                            .add(project::Column::RoomId.eq(room_id))
 810                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 811                            .add(
 812                                project::Column::HostConnectionServerId
 813                                    .eq(connection.owner_id as i32),
 814                            ),
 815                    )
 816                    .exec(&*tx)
 817                    .await?;
 818
 819                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 820                let deleted = if room.participants.is_empty() {
 821                    let result = room::Entity::delete_by_id(room_id)
 822                        .filter(room::Column::ChannelId.is_null())
 823                        .exec(&*tx)
 824                        .await?;
 825                    result.rows_affected > 0
 826                } else {
 827                    false
 828                };
 829
 830                let channel_members = if let Some(channel_id) = channel_id {
 831                    self.get_channel_members_internal(channel_id, &tx).await?
 832                } else {
 833                    Vec::new()
 834                };
 835                let left_room = LeftRoom {
 836                    room,
 837                    channel_id,
 838                    channel_members,
 839                    left_projects,
 840                    canceled_calls_to_user_ids,
 841                    deleted,
 842                };
 843
 844                if left_room.room.participants.is_empty() {
 845                    self.rooms.remove(&room_id);
 846                }
 847
 848                Ok(Some((room_id, left_room)))
 849            } else {
 850                Ok(None)
 851            }
 852        })
 853        .await
 854    }
 855
 856    pub async fn update_room_participant_location(
 857        &self,
 858        room_id: RoomId,
 859        connection: ConnectionId,
 860        location: proto::ParticipantLocation,
 861    ) -> Result<RoomGuard<proto::Room>> {
 862        self.room_transaction(room_id, |tx| async {
 863            let tx = tx;
 864            let location_kind;
 865            let location_project_id;
 866            match location
 867                .variant
 868                .as_ref()
 869                .ok_or_else(|| anyhow!("invalid location"))?
 870            {
 871                proto::participant_location::Variant::SharedProject(project) => {
 872                    location_kind = 0;
 873                    location_project_id = Some(ProjectId::from_proto(project.id));
 874                }
 875                proto::participant_location::Variant::UnsharedProject(_) => {
 876                    location_kind = 1;
 877                    location_project_id = None;
 878                }
 879                proto::participant_location::Variant::External(_) => {
 880                    location_kind = 2;
 881                    location_project_id = None;
 882                }
 883            }
 884
 885            let result = room_participant::Entity::update_many()
 886                .filter(
 887                    Condition::all()
 888                        .add(room_participant::Column::RoomId.eq(room_id))
 889                        .add(
 890                            room_participant::Column::AnsweringConnectionId
 891                                .eq(connection.id as i32),
 892                        )
 893                        .add(
 894                            room_participant::Column::AnsweringConnectionServerId
 895                                .eq(connection.owner_id as i32),
 896                        ),
 897                )
 898                .set(room_participant::ActiveModel {
 899                    location_kind: ActiveValue::set(Some(location_kind)),
 900                    location_project_id: ActiveValue::set(location_project_id),
 901                    ..Default::default()
 902                })
 903                .exec(&*tx)
 904                .await?;
 905
 906            if result.rows_affected == 1 {
 907                let room = self.get_room(room_id, &tx).await?;
 908                Ok(room)
 909            } else {
 910                Err(anyhow!("could not update room participant location"))?
 911            }
 912        })
 913        .await
 914    }
 915
 916    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
 917        self.transaction(|tx| async move {
 918            self.room_connection_lost(connection, &*tx).await?;
 919            self.channel_buffer_connection_lost(connection, &*tx)
 920                .await?;
 921            self.channel_chat_connection_lost(connection, &*tx).await?;
 922            Ok(())
 923        })
 924        .await
 925    }
 926
 927    pub async fn room_connection_lost(
 928        &self,
 929        connection: ConnectionId,
 930        tx: &DatabaseTransaction,
 931    ) -> Result<()> {
 932        let participant = room_participant::Entity::find()
 933            .filter(
 934                Condition::all()
 935                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
 936                    .add(
 937                        room_participant::Column::AnsweringConnectionServerId
 938                            .eq(connection.owner_id as i32),
 939                    ),
 940            )
 941            .one(&*tx)
 942            .await?;
 943
 944        if let Some(participant) = participant {
 945            room_participant::Entity::update(room_participant::ActiveModel {
 946                answering_connection_lost: ActiveValue::set(true),
 947                ..participant.into_active_model()
 948            })
 949            .exec(&*tx)
 950            .await?;
 951        }
 952        Ok(())
 953    }
 954
 955    fn build_incoming_call(
 956        room: &proto::Room,
 957        called_user_id: UserId,
 958    ) -> Option<proto::IncomingCall> {
 959        let pending_participant = room
 960            .pending_participants
 961            .iter()
 962            .find(|participant| participant.user_id == called_user_id.to_proto())?;
 963
 964        Some(proto::IncomingCall {
 965            room_id: room.id,
 966            calling_user_id: pending_participant.calling_user_id,
 967            participant_user_ids: room
 968                .participants
 969                .iter()
 970                .map(|participant| participant.user_id)
 971                .collect(),
 972            initial_project: room.participants.iter().find_map(|participant| {
 973                let initial_project_id = pending_participant.initial_project_id?;
 974                participant
 975                    .projects
 976                    .iter()
 977                    .find(|project| project.id == initial_project_id)
 978                    .cloned()
 979            }),
 980        })
 981    }
 982
 983    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
 984        let (_, room) = self.get_channel_room(room_id, tx).await?;
 985        Ok(room)
 986    }
 987
 988    pub async fn room_connection_ids(
 989        &self,
 990        room_id: RoomId,
 991        connection_id: ConnectionId,
 992    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
 993        self.room_transaction(room_id, |tx| async move {
 994            let mut participants = room_participant::Entity::find()
 995                .filter(room_participant::Column::RoomId.eq(room_id))
 996                .stream(&*tx)
 997                .await?;
 998
 999            let mut is_participant = false;
1000            let mut connection_ids = HashSet::default();
1001            while let Some(participant) = participants.next().await {
1002                let participant = participant?;
1003                if let Some(answering_connection) = participant.answering_connection() {
1004                    if answering_connection == connection_id {
1005                        is_participant = true;
1006                    } else {
1007                        connection_ids.insert(answering_connection);
1008                    }
1009                }
1010            }
1011
1012            if !is_participant {
1013                Err(anyhow!("not a room participant"))?;
1014            }
1015
1016            Ok(connection_ids)
1017        })
1018        .await
1019    }
1020
1021    async fn get_channel_room(
1022        &self,
1023        room_id: RoomId,
1024        tx: &DatabaseTransaction,
1025    ) -> Result<(Option<ChannelId>, proto::Room)> {
1026        let db_room = room::Entity::find_by_id(room_id)
1027            .one(tx)
1028            .await?
1029            .ok_or_else(|| anyhow!("could not find room"))?;
1030
1031        let mut db_participants = db_room
1032            .find_related(room_participant::Entity)
1033            .stream(tx)
1034            .await?;
1035        let mut participants = HashMap::default();
1036        let mut pending_participants = Vec::new();
1037        while let Some(db_participant) = db_participants.next().await {
1038            let db_participant = db_participant?;
1039            if let (
1040                Some(answering_connection_id),
1041                Some(answering_connection_server_id),
1042                Some(participant_index),
1043            ) = (
1044                db_participant.answering_connection_id,
1045                db_participant.answering_connection_server_id,
1046                db_participant.participant_index,
1047            ) {
1048                let location = match (
1049                    db_participant.location_kind,
1050                    db_participant.location_project_id,
1051                ) {
1052                    (Some(0), Some(project_id)) => {
1053                        Some(proto::participant_location::Variant::SharedProject(
1054                            proto::participant_location::SharedProject {
1055                                id: project_id.to_proto(),
1056                            },
1057                        ))
1058                    }
1059                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1060                        Default::default(),
1061                    )),
1062                    _ => Some(proto::participant_location::Variant::External(
1063                        Default::default(),
1064                    )),
1065                };
1066
1067                let answering_connection = ConnectionId {
1068                    owner_id: answering_connection_server_id.0 as u32,
1069                    id: answering_connection_id as u32,
1070                };
1071                participants.insert(
1072                    answering_connection,
1073                    proto::Participant {
1074                        user_id: db_participant.user_id.to_proto(),
1075                        peer_id: Some(answering_connection.into()),
1076                        projects: Default::default(),
1077                        location: Some(proto::ParticipantLocation { variant: location }),
1078                        participant_index: participant_index as u32,
1079                    },
1080                );
1081            } else {
1082                pending_participants.push(proto::PendingParticipant {
1083                    user_id: db_participant.user_id.to_proto(),
1084                    calling_user_id: db_participant.calling_user_id.to_proto(),
1085                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1086                });
1087            }
1088        }
1089        drop(db_participants);
1090
1091        let mut db_projects = db_room
1092            .find_related(project::Entity)
1093            .find_with_related(worktree::Entity)
1094            .stream(tx)
1095            .await?;
1096
1097        while let Some(row) = db_projects.next().await {
1098            let (db_project, db_worktree) = row?;
1099            let host_connection = db_project.host_connection()?;
1100            if let Some(participant) = participants.get_mut(&host_connection) {
1101                let project = if let Some(project) = participant
1102                    .projects
1103                    .iter_mut()
1104                    .find(|project| project.id == db_project.id.to_proto())
1105                {
1106                    project
1107                } else {
1108                    participant.projects.push(proto::ParticipantProject {
1109                        id: db_project.id.to_proto(),
1110                        worktree_root_names: Default::default(),
1111                    });
1112                    participant.projects.last_mut().unwrap()
1113                };
1114
1115                if let Some(db_worktree) = db_worktree {
1116                    if db_worktree.visible {
1117                        project.worktree_root_names.push(db_worktree.root_name);
1118                    }
1119                }
1120            }
1121        }
1122        drop(db_projects);
1123
1124        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1125        let mut followers = Vec::new();
1126        while let Some(db_follower) = db_followers.next().await {
1127            let db_follower = db_follower?;
1128            followers.push(proto::Follower {
1129                leader_id: Some(db_follower.leader_connection().into()),
1130                follower_id: Some(db_follower.follower_connection().into()),
1131                project_id: db_follower.project_id.to_proto(),
1132            });
1133        }
1134
1135        Ok((
1136            db_room.channel_id,
1137            proto::Room {
1138                id: db_room.id.to_proto(),
1139                live_kit_room: db_room.live_kit_room,
1140                participants: participants.into_values().collect(),
1141                pending_participants,
1142                followers,
1143            },
1144        ))
1145    }
1146}