rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel_id) = channel_id {
  56                channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
  57            } else {
  58                channel_members = Vec::new();
  59
  60                // Delete the room if it becomes empty.
  61                if room.participants.is_empty() {
  62                    project::Entity::delete_many()
  63                        .filter(project::Column::RoomId.eq(room_id))
  64                        .exec(&*tx)
  65                        .await?;
  66                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  67                }
  68            };
  69
  70            Ok(RefreshedRoom {
  71                room,
  72                channel_id,
  73                channel_members,
  74                stale_participant_user_ids,
  75                canceled_calls_to_user_ids,
  76            })
  77        })
  78        .await
  79    }
  80
  81    pub async fn incoming_call_for_user(
  82        &self,
  83        user_id: UserId,
  84    ) -> Result<Option<proto::IncomingCall>> {
  85        self.transaction(|tx| async move {
  86            let pending_participant = room_participant::Entity::find()
  87                .filter(
  88                    room_participant::Column::UserId
  89                        .eq(user_id)
  90                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  91                )
  92                .one(&*tx)
  93                .await?;
  94
  95            if let Some(pending_participant) = pending_participant {
  96                let room = self.get_room(pending_participant.room_id, &tx).await?;
  97                Ok(Self::build_incoming_call(&room, user_id))
  98            } else {
  99                Ok(None)
 100            }
 101        })
 102        .await
 103    }
 104
 105    pub async fn create_room(
 106        &self,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109        live_kit_room: &str,
 110    ) -> Result<proto::Room> {
 111        self.transaction(|tx| async move {
 112            let room = room::ActiveModel {
 113                live_kit_room: ActiveValue::set(live_kit_room.into()),
 114                ..Default::default()
 115            }
 116            .insert(&*tx)
 117            .await?;
 118            room_participant::ActiveModel {
 119                room_id: ActiveValue::set(room.id),
 120                user_id: ActiveValue::set(user_id),
 121                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 122                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 123                    connection.owner_id as i32,
 124                ))),
 125                answering_connection_lost: ActiveValue::set(false),
 126                calling_user_id: ActiveValue::set(user_id),
 127                calling_connection_id: ActiveValue::set(connection.id as i32),
 128                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 129                    connection.owner_id as i32,
 130                ))),
 131                ..Default::default()
 132            }
 133            .insert(&*tx)
 134            .await?;
 135
 136            let room = self.get_room(room.id, &tx).await?;
 137            Ok(room)
 138        })
 139        .await
 140    }
 141
 142    pub async fn call(
 143        &self,
 144        room_id: RoomId,
 145        calling_user_id: UserId,
 146        calling_connection: ConnectionId,
 147        called_user_id: UserId,
 148        initial_project_id: Option<ProjectId>,
 149    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 150        self.room_transaction(room_id, |tx| async move {
 151            room_participant::ActiveModel {
 152                room_id: ActiveValue::set(room_id),
 153                user_id: ActiveValue::set(called_user_id),
 154                answering_connection_lost: ActiveValue::set(false),
 155                calling_user_id: ActiveValue::set(calling_user_id),
 156                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 157                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 158                    calling_connection.owner_id as i32,
 159                ))),
 160                initial_project_id: ActiveValue::set(initial_project_id),
 161                ..Default::default()
 162            }
 163            .insert(&*tx)
 164            .await?;
 165
 166            let room = self.get_room(room_id, &tx).await?;
 167            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 168                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 169            Ok((room, incoming_call))
 170        })
 171        .await
 172    }
 173
 174    pub async fn call_failed(
 175        &self,
 176        room_id: RoomId,
 177        called_user_id: UserId,
 178    ) -> Result<RoomGuard<proto::Room>> {
 179        self.room_transaction(room_id, |tx| async move {
 180            room_participant::Entity::delete_many()
 181                .filter(
 182                    room_participant::Column::RoomId
 183                        .eq(room_id)
 184                        .and(room_participant::Column::UserId.eq(called_user_id)),
 185                )
 186                .exec(&*tx)
 187                .await?;
 188            let room = self.get_room(room_id, &tx).await?;
 189            Ok(room)
 190        })
 191        .await
 192    }
 193
 194    pub async fn decline_call(
 195        &self,
 196        expected_room_id: Option<RoomId>,
 197        user_id: UserId,
 198    ) -> Result<Option<RoomGuard<proto::Room>>> {
 199        self.optional_room_transaction(|tx| async move {
 200            let mut filter = Condition::all()
 201                .add(room_participant::Column::UserId.eq(user_id))
 202                .add(room_participant::Column::AnsweringConnectionId.is_null());
 203            if let Some(room_id) = expected_room_id {
 204                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 205            }
 206            let participant = room_participant::Entity::find()
 207                .filter(filter)
 208                .one(&*tx)
 209                .await?;
 210
 211            let participant = if let Some(participant) = participant {
 212                participant
 213            } else if expected_room_id.is_some() {
 214                return Err(anyhow!("could not find call to decline"))?;
 215            } else {
 216                return Ok(None);
 217            };
 218
 219            let room_id = participant.room_id;
 220            room_participant::Entity::delete(participant.into_active_model())
 221                .exec(&*tx)
 222                .await?;
 223
 224            let room = self.get_room(room_id, &tx).await?;
 225            Ok(Some((room_id, room)))
 226        })
 227        .await
 228    }
 229
 230    pub async fn cancel_call(
 231        &self,
 232        room_id: RoomId,
 233        calling_connection: ConnectionId,
 234        called_user_id: UserId,
 235    ) -> Result<RoomGuard<proto::Room>> {
 236        self.room_transaction(room_id, |tx| async move {
 237            let participant = room_participant::Entity::find()
 238                .filter(
 239                    Condition::all()
 240                        .add(room_participant::Column::UserId.eq(called_user_id))
 241                        .add(room_participant::Column::RoomId.eq(room_id))
 242                        .add(
 243                            room_participant::Column::CallingConnectionId
 244                                .eq(calling_connection.id as i32),
 245                        )
 246                        .add(
 247                            room_participant::Column::CallingConnectionServerId
 248                                .eq(calling_connection.owner_id as i32),
 249                        )
 250                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 251                )
 252                .one(&*tx)
 253                .await?
 254                .ok_or_else(|| anyhow!("no call to cancel"))?;
 255
 256            room_participant::Entity::delete(participant.into_active_model())
 257                .exec(&*tx)
 258                .await?;
 259
 260            let room = self.get_room(room_id, &tx).await?;
 261            Ok(room)
 262        })
 263        .await
 264    }
 265
 266    pub async fn join_room(
 267        &self,
 268        room_id: RoomId,
 269        user_id: UserId,
 270        connection: ConnectionId,
 271    ) -> Result<RoomGuard<JoinRoom>> {
 272        self.room_transaction(room_id, |tx| async move {
 273            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 274            enum QueryChannelId {
 275                ChannelId,
 276            }
 277            let channel_id: Option<ChannelId> = room::Entity::find()
 278                .select_only()
 279                .column(room::Column::ChannelId)
 280                .filter(room::Column::Id.eq(room_id))
 281                .into_values::<_, QueryChannelId>()
 282                .one(&*tx)
 283                .await?
 284                .ok_or_else(|| anyhow!("no such room"))?;
 285
 286            if let Some(channel_id) = channel_id {
 287                self.check_user_is_channel_member(channel_id, user_id, &*tx)
 288                    .await?;
 289
 290                room_participant::Entity::insert_many([room_participant::ActiveModel {
 291                    room_id: ActiveValue::set(room_id),
 292                    user_id: ActiveValue::set(user_id),
 293                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 294                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 295                        connection.owner_id as i32,
 296                    ))),
 297                    answering_connection_lost: ActiveValue::set(false),
 298                    calling_user_id: ActiveValue::set(user_id),
 299                    calling_connection_id: ActiveValue::set(connection.id as i32),
 300                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
 301                        connection.owner_id as i32,
 302                    ))),
 303                    ..Default::default()
 304                }])
 305                .on_conflict(
 306                    OnConflict::columns([room_participant::Column::UserId])
 307                        .update_columns([
 308                            room_participant::Column::AnsweringConnectionId,
 309                            room_participant::Column::AnsweringConnectionServerId,
 310                            room_participant::Column::AnsweringConnectionLost,
 311                        ])
 312                        .to_owned(),
 313                )
 314                .exec(&*tx)
 315                .await?;
 316            } else {
 317                let result = room_participant::Entity::update_many()
 318                    .filter(
 319                        Condition::all()
 320                            .add(room_participant::Column::RoomId.eq(room_id))
 321                            .add(room_participant::Column::UserId.eq(user_id))
 322                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 323                    )
 324                    .set(room_participant::ActiveModel {
 325                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 326                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
 327                            connection.owner_id as i32,
 328                        ))),
 329                        answering_connection_lost: ActiveValue::set(false),
 330                        ..Default::default()
 331                    })
 332                    .exec(&*tx)
 333                    .await?;
 334                if result.rows_affected == 0 {
 335                    Err(anyhow!("room does not exist or was already joined"))?;
 336                }
 337            }
 338
 339            let room = self.get_room(room_id, &tx).await?;
 340            let channel_members = if let Some(channel_id) = channel_id {
 341                self.get_channel_members_internal(channel_id, &tx).await?
 342            } else {
 343                Vec::new()
 344            };
 345            Ok(JoinRoom {
 346                room,
 347                channel_id,
 348                channel_members,
 349            })
 350        })
 351        .await
 352    }
 353
 354    pub async fn rejoin_room(
 355        &self,
 356        rejoin_room: proto::RejoinRoom,
 357        user_id: UserId,
 358        connection: ConnectionId,
 359    ) -> Result<RoomGuard<RejoinedRoom>> {
 360        let room_id = RoomId::from_proto(rejoin_room.id);
 361        self.room_transaction(room_id, |tx| async {
 362            let tx = tx;
 363            let participant_update = room_participant::Entity::update_many()
 364                .filter(
 365                    Condition::all()
 366                        .add(room_participant::Column::RoomId.eq(room_id))
 367                        .add(room_participant::Column::UserId.eq(user_id))
 368                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 369                        .add(
 370                            Condition::any()
 371                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 372                                .add(
 373                                    room_participant::Column::AnsweringConnectionServerId
 374                                        .ne(connection.owner_id as i32),
 375                                ),
 376                        ),
 377                )
 378                .set(room_participant::ActiveModel {
 379                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 380                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 381                        connection.owner_id as i32,
 382                    ))),
 383                    answering_connection_lost: ActiveValue::set(false),
 384                    ..Default::default()
 385                })
 386                .exec(&*tx)
 387                .await?;
 388            if participant_update.rows_affected == 0 {
 389                return Err(anyhow!("room does not exist or was already joined"))?;
 390            }
 391
 392            let mut reshared_projects = Vec::new();
 393            for reshared_project in &rejoin_room.reshared_projects {
 394                let project_id = ProjectId::from_proto(reshared_project.project_id);
 395                let project = project::Entity::find_by_id(project_id)
 396                    .one(&*tx)
 397                    .await?
 398                    .ok_or_else(|| anyhow!("project does not exist"))?;
 399                if project.host_user_id != user_id {
 400                    return Err(anyhow!("no such project"))?;
 401                }
 402
 403                let mut collaborators = project
 404                    .find_related(project_collaborator::Entity)
 405                    .all(&*tx)
 406                    .await?;
 407                let host_ix = collaborators
 408                    .iter()
 409                    .position(|collaborator| {
 410                        collaborator.user_id == user_id && collaborator.is_host
 411                    })
 412                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 413                let host = collaborators.swap_remove(host_ix);
 414                let old_connection_id = host.connection();
 415
 416                project::Entity::update(project::ActiveModel {
 417                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 418                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 419                        connection.owner_id as i32,
 420                    ))),
 421                    ..project.into_active_model()
 422                })
 423                .exec(&*tx)
 424                .await?;
 425                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 426                    connection_id: ActiveValue::set(connection.id as i32),
 427                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 428                    ..host.into_active_model()
 429                })
 430                .exec(&*tx)
 431                .await?;
 432
 433                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 434                    .await?;
 435
 436                reshared_projects.push(ResharedProject {
 437                    id: project_id,
 438                    old_connection_id,
 439                    collaborators: collaborators
 440                        .iter()
 441                        .map(|collaborator| ProjectCollaborator {
 442                            connection_id: collaborator.connection(),
 443                            user_id: collaborator.user_id,
 444                            replica_id: collaborator.replica_id,
 445                            is_host: collaborator.is_host,
 446                        })
 447                        .collect(),
 448                    worktrees: reshared_project.worktrees.clone(),
 449                });
 450            }
 451
 452            project::Entity::delete_many()
 453                .filter(
 454                    Condition::all()
 455                        .add(project::Column::RoomId.eq(room_id))
 456                        .add(project::Column::HostUserId.eq(user_id))
 457                        .add(
 458                            project::Column::Id
 459                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 460                        ),
 461                )
 462                .exec(&*tx)
 463                .await?;
 464
 465            let mut rejoined_projects = Vec::new();
 466            for rejoined_project in &rejoin_room.rejoined_projects {
 467                let project_id = ProjectId::from_proto(rejoined_project.id);
 468                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 469                    continue;
 470                };
 471
 472                let mut worktrees = Vec::new();
 473                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 474                for db_worktree in db_worktrees {
 475                    let mut worktree = RejoinedWorktree {
 476                        id: db_worktree.id as u64,
 477                        abs_path: db_worktree.abs_path,
 478                        root_name: db_worktree.root_name,
 479                        visible: db_worktree.visible,
 480                        updated_entries: Default::default(),
 481                        removed_entries: Default::default(),
 482                        updated_repositories: Default::default(),
 483                        removed_repositories: Default::default(),
 484                        diagnostic_summaries: Default::default(),
 485                        settings_files: Default::default(),
 486                        scan_id: db_worktree.scan_id as u64,
 487                        completed_scan_id: db_worktree.completed_scan_id as u64,
 488                    };
 489
 490                    let rejoined_worktree = rejoined_project
 491                        .worktrees
 492                        .iter()
 493                        .find(|worktree| worktree.id == db_worktree.id as u64);
 494
 495                    // File entries
 496                    {
 497                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 498                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 499                        } else {
 500                            worktree_entry::Column::IsDeleted.eq(false)
 501                        };
 502
 503                        let mut db_entries = worktree_entry::Entity::find()
 504                            .filter(
 505                                Condition::all()
 506                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 507                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 508                                    .add(entry_filter),
 509                            )
 510                            .stream(&*tx)
 511                            .await?;
 512
 513                        while let Some(db_entry) = db_entries.next().await {
 514                            let db_entry = db_entry?;
 515                            if db_entry.is_deleted {
 516                                worktree.removed_entries.push(db_entry.id as u64);
 517                            } else {
 518                                worktree.updated_entries.push(proto::Entry {
 519                                    id: db_entry.id as u64,
 520                                    is_dir: db_entry.is_dir,
 521                                    path: db_entry.path,
 522                                    inode: db_entry.inode as u64,
 523                                    mtime: Some(proto::Timestamp {
 524                                        seconds: db_entry.mtime_seconds as u64,
 525                                        nanos: db_entry.mtime_nanos as u32,
 526                                    }),
 527                                    is_symlink: db_entry.is_symlink,
 528                                    is_ignored: db_entry.is_ignored,
 529                                    is_external: db_entry.is_external,
 530                                    git_status: db_entry.git_status.map(|status| status as i32),
 531                                });
 532                            }
 533                        }
 534                    }
 535
 536                    // Repository Entries
 537                    {
 538                        let repository_entry_filter =
 539                            if let Some(rejoined_worktree) = rejoined_worktree {
 540                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 541                            } else {
 542                                worktree_repository::Column::IsDeleted.eq(false)
 543                            };
 544
 545                        let mut db_repositories = worktree_repository::Entity::find()
 546                            .filter(
 547                                Condition::all()
 548                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 549                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 550                                    .add(repository_entry_filter),
 551                            )
 552                            .stream(&*tx)
 553                            .await?;
 554
 555                        while let Some(db_repository) = db_repositories.next().await {
 556                            let db_repository = db_repository?;
 557                            if db_repository.is_deleted {
 558                                worktree
 559                                    .removed_repositories
 560                                    .push(db_repository.work_directory_id as u64);
 561                            } else {
 562                                worktree.updated_repositories.push(proto::RepositoryEntry {
 563                                    work_directory_id: db_repository.work_directory_id as u64,
 564                                    branch: db_repository.branch,
 565                                });
 566                            }
 567                        }
 568                    }
 569
 570                    worktrees.push(worktree);
 571                }
 572
 573                let language_servers = project
 574                    .find_related(language_server::Entity)
 575                    .all(&*tx)
 576                    .await?
 577                    .into_iter()
 578                    .map(|language_server| proto::LanguageServer {
 579                        id: language_server.id as u64,
 580                        name: language_server.name,
 581                    })
 582                    .collect::<Vec<_>>();
 583
 584                {
 585                    let mut db_settings_files = worktree_settings_file::Entity::find()
 586                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 587                        .stream(&*tx)
 588                        .await?;
 589                    while let Some(db_settings_file) = db_settings_files.next().await {
 590                        let db_settings_file = db_settings_file?;
 591                        if let Some(worktree) = worktrees
 592                            .iter_mut()
 593                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 594                        {
 595                            worktree.settings_files.push(WorktreeSettingsFile {
 596                                path: db_settings_file.path,
 597                                content: db_settings_file.content,
 598                            });
 599                        }
 600                    }
 601                }
 602
 603                let mut collaborators = project
 604                    .find_related(project_collaborator::Entity)
 605                    .all(&*tx)
 606                    .await?;
 607                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 608                    .iter()
 609                    .position(|collaborator| collaborator.user_id == user_id)
 610                {
 611                    collaborators.swap_remove(self_collaborator_ix)
 612                } else {
 613                    continue;
 614                };
 615                let old_connection_id = self_collaborator.connection();
 616                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 617                    connection_id: ActiveValue::set(connection.id as i32),
 618                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 619                    ..self_collaborator.into_active_model()
 620                })
 621                .exec(&*tx)
 622                .await?;
 623
 624                let collaborators = collaborators
 625                    .into_iter()
 626                    .map(|collaborator| ProjectCollaborator {
 627                        connection_id: collaborator.connection(),
 628                        user_id: collaborator.user_id,
 629                        replica_id: collaborator.replica_id,
 630                        is_host: collaborator.is_host,
 631                    })
 632                    .collect::<Vec<_>>();
 633
 634                rejoined_projects.push(RejoinedProject {
 635                    id: project_id,
 636                    old_connection_id,
 637                    collaborators,
 638                    worktrees,
 639                    language_servers,
 640                });
 641            }
 642
 643            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 644            let channel_members = if let Some(channel_id) = channel_id {
 645                self.get_channel_members_internal(channel_id, &tx).await?
 646            } else {
 647                Vec::new()
 648            };
 649
 650            Ok(RejoinedRoom {
 651                room,
 652                channel_id,
 653                channel_members,
 654                rejoined_projects,
 655                reshared_projects,
 656            })
 657        })
 658        .await
 659    }
 660
 661    pub async fn leave_room(
 662        &self,
 663        connection: ConnectionId,
 664    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 665        self.optional_room_transaction(|tx| async move {
 666            let leaving_participant = room_participant::Entity::find()
 667                .filter(
 668                    Condition::all()
 669                        .add(
 670                            room_participant::Column::AnsweringConnectionId
 671                                .eq(connection.id as i32),
 672                        )
 673                        .add(
 674                            room_participant::Column::AnsweringConnectionServerId
 675                                .eq(connection.owner_id as i32),
 676                        ),
 677                )
 678                .one(&*tx)
 679                .await?;
 680
 681            if let Some(leaving_participant) = leaving_participant {
 682                // Leave room.
 683                let room_id = leaving_participant.room_id;
 684                room_participant::Entity::delete_by_id(leaving_participant.id)
 685                    .exec(&*tx)
 686                    .await?;
 687
 688                // Cancel pending calls initiated by the leaving user.
 689                let called_participants = room_participant::Entity::find()
 690                    .filter(
 691                        Condition::all()
 692                            .add(
 693                                room_participant::Column::CallingUserId
 694                                    .eq(leaving_participant.user_id),
 695                            )
 696                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 697                    )
 698                    .all(&*tx)
 699                    .await?;
 700                room_participant::Entity::delete_many()
 701                    .filter(
 702                        room_participant::Column::Id
 703                            .is_in(called_participants.iter().map(|participant| participant.id)),
 704                    )
 705                    .exec(&*tx)
 706                    .await?;
 707                let canceled_calls_to_user_ids = called_participants
 708                    .into_iter()
 709                    .map(|participant| participant.user_id)
 710                    .collect();
 711
 712                // Detect left projects.
 713                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 714                enum QueryProjectIds {
 715                    ProjectId,
 716                }
 717                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 718                    .select_only()
 719                    .column_as(
 720                        project_collaborator::Column::ProjectId,
 721                        QueryProjectIds::ProjectId,
 722                    )
 723                    .filter(
 724                        Condition::all()
 725                            .add(
 726                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 727                            )
 728                            .add(
 729                                project_collaborator::Column::ConnectionServerId
 730                                    .eq(connection.owner_id as i32),
 731                            ),
 732                    )
 733                    .into_values::<_, QueryProjectIds>()
 734                    .all(&*tx)
 735                    .await?;
 736                let mut left_projects = HashMap::default();
 737                let mut collaborators = project_collaborator::Entity::find()
 738                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 739                    .stream(&*tx)
 740                    .await?;
 741                while let Some(collaborator) = collaborators.next().await {
 742                    let collaborator = collaborator?;
 743                    let left_project =
 744                        left_projects
 745                            .entry(collaborator.project_id)
 746                            .or_insert(LeftProject {
 747                                id: collaborator.project_id,
 748                                host_user_id: Default::default(),
 749                                connection_ids: Default::default(),
 750                                host_connection_id: Default::default(),
 751                            });
 752
 753                    let collaborator_connection_id = collaborator.connection();
 754                    if collaborator_connection_id != connection {
 755                        left_project.connection_ids.push(collaborator_connection_id);
 756                    }
 757
 758                    if collaborator.is_host {
 759                        left_project.host_user_id = collaborator.user_id;
 760                        left_project.host_connection_id = collaborator_connection_id;
 761                    }
 762                }
 763                drop(collaborators);
 764
 765                // Leave projects.
 766                project_collaborator::Entity::delete_many()
 767                    .filter(
 768                        Condition::all()
 769                            .add(
 770                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 771                            )
 772                            .add(
 773                                project_collaborator::Column::ConnectionServerId
 774                                    .eq(connection.owner_id as i32),
 775                            ),
 776                    )
 777                    .exec(&*tx)
 778                    .await?;
 779
 780                // Unshare projects.
 781                project::Entity::delete_many()
 782                    .filter(
 783                        Condition::all()
 784                            .add(project::Column::RoomId.eq(room_id))
 785                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 786                            .add(
 787                                project::Column::HostConnectionServerId
 788                                    .eq(connection.owner_id as i32),
 789                            ),
 790                    )
 791                    .exec(&*tx)
 792                    .await?;
 793
 794                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 795                let deleted = if room.participants.is_empty() {
 796                    let result = room::Entity::delete_by_id(room_id)
 797                        .filter(room::Column::ChannelId.is_null())
 798                        .exec(&*tx)
 799                        .await?;
 800                    result.rows_affected > 0
 801                } else {
 802                    false
 803                };
 804
 805                let channel_members = if let Some(channel_id) = channel_id {
 806                    self.get_channel_members_internal(channel_id, &tx).await?
 807                } else {
 808                    Vec::new()
 809                };
 810                let left_room = LeftRoom {
 811                    room,
 812                    channel_id,
 813                    channel_members,
 814                    left_projects,
 815                    canceled_calls_to_user_ids,
 816                    deleted,
 817                };
 818
 819                if left_room.room.participants.is_empty() {
 820                    self.rooms.remove(&room_id);
 821                }
 822
 823                Ok(Some((room_id, left_room)))
 824            } else {
 825                Ok(None)
 826            }
 827        })
 828        .await
 829    }
 830
 831    pub async fn update_room_participant_location(
 832        &self,
 833        room_id: RoomId,
 834        connection: ConnectionId,
 835        location: proto::ParticipantLocation,
 836    ) -> Result<RoomGuard<proto::Room>> {
 837        self.room_transaction(room_id, |tx| async {
 838            let tx = tx;
 839            let location_kind;
 840            let location_project_id;
 841            match location
 842                .variant
 843                .as_ref()
 844                .ok_or_else(|| anyhow!("invalid location"))?
 845            {
 846                proto::participant_location::Variant::SharedProject(project) => {
 847                    location_kind = 0;
 848                    location_project_id = Some(ProjectId::from_proto(project.id));
 849                }
 850                proto::participant_location::Variant::UnsharedProject(_) => {
 851                    location_kind = 1;
 852                    location_project_id = None;
 853                }
 854                proto::participant_location::Variant::External(_) => {
 855                    location_kind = 2;
 856                    location_project_id = None;
 857                }
 858            }
 859
 860            let result = room_participant::Entity::update_many()
 861                .filter(
 862                    Condition::all()
 863                        .add(room_participant::Column::RoomId.eq(room_id))
 864                        .add(
 865                            room_participant::Column::AnsweringConnectionId
 866                                .eq(connection.id as i32),
 867                        )
 868                        .add(
 869                            room_participant::Column::AnsweringConnectionServerId
 870                                .eq(connection.owner_id as i32),
 871                        ),
 872                )
 873                .set(room_participant::ActiveModel {
 874                    location_kind: ActiveValue::set(Some(location_kind)),
 875                    location_project_id: ActiveValue::set(location_project_id),
 876                    ..Default::default()
 877                })
 878                .exec(&*tx)
 879                .await?;
 880
 881            if result.rows_affected == 1 {
 882                let room = self.get_room(room_id, &tx).await?;
 883                Ok(room)
 884            } else {
 885                Err(anyhow!("could not update room participant location"))?
 886            }
 887        })
 888        .await
 889    }
 890
 891    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
 892        self.transaction(|tx| async move {
 893            self.room_connection_lost(connection, &*tx).await?;
 894            self.channel_buffer_connection_lost(connection, &*tx)
 895                .await?;
 896            self.channel_chat_connection_lost(connection, &*tx).await?;
 897            Ok(())
 898        })
 899        .await
 900    }
 901
 902    pub async fn room_connection_lost(
 903        &self,
 904        connection: ConnectionId,
 905        tx: &DatabaseTransaction,
 906    ) -> Result<()> {
 907        let participant = room_participant::Entity::find()
 908            .filter(
 909                Condition::all()
 910                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
 911                    .add(
 912                        room_participant::Column::AnsweringConnectionServerId
 913                            .eq(connection.owner_id as i32),
 914                    ),
 915            )
 916            .one(&*tx)
 917            .await?;
 918
 919        if let Some(participant) = participant {
 920            room_participant::Entity::update(room_participant::ActiveModel {
 921                answering_connection_lost: ActiveValue::set(true),
 922                ..participant.into_active_model()
 923            })
 924            .exec(&*tx)
 925            .await?;
 926        }
 927        Ok(())
 928    }
 929
 930    fn build_incoming_call(
 931        room: &proto::Room,
 932        called_user_id: UserId,
 933    ) -> Option<proto::IncomingCall> {
 934        let pending_participant = room
 935            .pending_participants
 936            .iter()
 937            .find(|participant| participant.user_id == called_user_id.to_proto())?;
 938
 939        Some(proto::IncomingCall {
 940            room_id: room.id,
 941            calling_user_id: pending_participant.calling_user_id,
 942            participant_user_ids: room
 943                .participants
 944                .iter()
 945                .map(|participant| participant.user_id)
 946                .collect(),
 947            initial_project: room.participants.iter().find_map(|participant| {
 948                let initial_project_id = pending_participant.initial_project_id?;
 949                participant
 950                    .projects
 951                    .iter()
 952                    .find(|project| project.id == initial_project_id)
 953                    .cloned()
 954            }),
 955        })
 956    }
 957
 958    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
 959        let (_, room) = self.get_channel_room(room_id, tx).await?;
 960        Ok(room)
 961    }
 962
 963    async fn get_channel_room(
 964        &self,
 965        room_id: RoomId,
 966        tx: &DatabaseTransaction,
 967    ) -> Result<(Option<ChannelId>, proto::Room)> {
 968        let db_room = room::Entity::find_by_id(room_id)
 969            .one(tx)
 970            .await?
 971            .ok_or_else(|| anyhow!("could not find room"))?;
 972
 973        let mut db_participants = db_room
 974            .find_related(room_participant::Entity)
 975            .stream(tx)
 976            .await?;
 977        let mut participants = HashMap::default();
 978        let mut pending_participants = Vec::new();
 979        while let Some(db_participant) = db_participants.next().await {
 980            let db_participant = db_participant?;
 981            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
 982                .answering_connection_id
 983                .zip(db_participant.answering_connection_server_id)
 984            {
 985                let location = match (
 986                    db_participant.location_kind,
 987                    db_participant.location_project_id,
 988                ) {
 989                    (Some(0), Some(project_id)) => {
 990                        Some(proto::participant_location::Variant::SharedProject(
 991                            proto::participant_location::SharedProject {
 992                                id: project_id.to_proto(),
 993                            },
 994                        ))
 995                    }
 996                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
 997                        Default::default(),
 998                    )),
 999                    _ => Some(proto::participant_location::Variant::External(
1000                        Default::default(),
1001                    )),
1002                };
1003
1004                let answering_connection = ConnectionId {
1005                    owner_id: answering_connection_server_id.0 as u32,
1006                    id: answering_connection_id as u32,
1007                };
1008                participants.insert(
1009                    answering_connection,
1010                    proto::Participant {
1011                        user_id: db_participant.user_id.to_proto(),
1012                        peer_id: Some(answering_connection.into()),
1013                        projects: Default::default(),
1014                        location: Some(proto::ParticipantLocation { variant: location }),
1015                    },
1016                );
1017            } else {
1018                pending_participants.push(proto::PendingParticipant {
1019                    user_id: db_participant.user_id.to_proto(),
1020                    calling_user_id: db_participant.calling_user_id.to_proto(),
1021                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1022                });
1023            }
1024        }
1025        drop(db_participants);
1026
1027        let mut db_projects = db_room
1028            .find_related(project::Entity)
1029            .find_with_related(worktree::Entity)
1030            .stream(tx)
1031            .await?;
1032
1033        while let Some(row) = db_projects.next().await {
1034            let (db_project, db_worktree) = row?;
1035            let host_connection = db_project.host_connection()?;
1036            if let Some(participant) = participants.get_mut(&host_connection) {
1037                let project = if let Some(project) = participant
1038                    .projects
1039                    .iter_mut()
1040                    .find(|project| project.id == db_project.id.to_proto())
1041                {
1042                    project
1043                } else {
1044                    participant.projects.push(proto::ParticipantProject {
1045                        id: db_project.id.to_proto(),
1046                        worktree_root_names: Default::default(),
1047                    });
1048                    participant.projects.last_mut().unwrap()
1049                };
1050
1051                if let Some(db_worktree) = db_worktree {
1052                    if db_worktree.visible {
1053                        project.worktree_root_names.push(db_worktree.root_name);
1054                    }
1055                }
1056            }
1057        }
1058        drop(db_projects);
1059
1060        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1061        let mut followers = Vec::new();
1062        while let Some(db_follower) = db_followers.next().await {
1063            let db_follower = db_follower?;
1064            followers.push(proto::Follower {
1065                leader_id: Some(db_follower.leader_connection().into()),
1066                follower_id: Some(db_follower.follower_connection().into()),
1067                project_id: db_follower.project_id.to_proto(),
1068            });
1069        }
1070
1071        Ok((
1072            db_room.channel_id,
1073            proto::Room {
1074                id: db_room.id.to_proto(),
1075                live_kit_room: db_room.live_kit_room,
1076                participants: participants.into_values().collect(),
1077                pending_participants,
1078                followers,
1079            },
1080        ))
1081    }
1082}