rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel) = &channel {
  56                channel_members = self.get_channel_participants(channel, &tx).await?;
  57            } else {
  58                channel_members = Vec::new();
  59
  60                // Delete the room if it becomes empty.
  61                if room.participants.is_empty() {
  62                    project::Entity::delete_many()
  63                        .filter(project::Column::RoomId.eq(room_id))
  64                        .exec(&*tx)
  65                        .await?;
  66                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  67                }
  68            };
  69
  70            Ok(RefreshedRoom {
  71                room,
  72                channel_id: channel.map(|channel| channel.id),
  73                channel_members,
  74                stale_participant_user_ids,
  75                canceled_calls_to_user_ids,
  76            })
  77        })
  78        .await
  79    }
  80
  81    pub async fn incoming_call_for_user(
  82        &self,
  83        user_id: UserId,
  84    ) -> Result<Option<proto::IncomingCall>> {
  85        self.transaction(|tx| async move {
  86            let pending_participant = room_participant::Entity::find()
  87                .filter(
  88                    room_participant::Column::UserId
  89                        .eq(user_id)
  90                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  91                )
  92                .one(&*tx)
  93                .await?;
  94
  95            if let Some(pending_participant) = pending_participant {
  96                let room = self.get_room(pending_participant.room_id, &tx).await?;
  97                Ok(Self::build_incoming_call(&room, user_id))
  98            } else {
  99                Ok(None)
 100            }
 101        })
 102        .await
 103    }
 104
 105    pub async fn create_room(
 106        &self,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109        live_kit_room: &str,
 110        release_channel: &str,
 111    ) -> Result<proto::Room> {
 112        self.transaction(|tx| async move {
 113            let room = room::ActiveModel {
 114                live_kit_room: ActiveValue::set(live_kit_room.into()),
 115                enviroment: ActiveValue::set(Some(release_channel.to_string())),
 116                ..Default::default()
 117            }
 118            .insert(&*tx)
 119            .await?;
 120            room_participant::ActiveModel {
 121                room_id: ActiveValue::set(room.id),
 122                user_id: ActiveValue::set(user_id),
 123                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 124                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 125                    connection.owner_id as i32,
 126                ))),
 127                answering_connection_lost: ActiveValue::set(false),
 128                calling_user_id: ActiveValue::set(user_id),
 129                calling_connection_id: ActiveValue::set(connection.id as i32),
 130                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 131                    connection.owner_id as i32,
 132                ))),
 133                participant_index: ActiveValue::set(Some(0)),
 134                role: ActiveValue::set(Some(ChannelRole::Admin)),
 135
 136                id: ActiveValue::NotSet,
 137                location_kind: ActiveValue::NotSet,
 138                location_project_id: ActiveValue::NotSet,
 139                initial_project_id: ActiveValue::NotSet,
 140            }
 141            .insert(&*tx)
 142            .await?;
 143
 144            let room = self.get_room(room.id, &tx).await?;
 145            Ok(room)
 146        })
 147        .await
 148    }
 149
 150    pub async fn call(
 151        &self,
 152        room_id: RoomId,
 153        calling_user_id: UserId,
 154        calling_connection: ConnectionId,
 155        called_user_id: UserId,
 156        initial_project_id: Option<ProjectId>,
 157    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 158        self.room_transaction(room_id, |tx| async move {
 159            let caller = room_participant::Entity::find()
 160                .filter(
 161                    room_participant::Column::UserId
 162                        .eq(calling_user_id)
 163                        .and(room_participant::Column::RoomId.eq(room_id)),
 164                )
 165                .one(&*tx)
 166                .await?
 167                .ok_or_else(|| anyhow!("user is not in the room"))?;
 168
 169            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 170                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 171                ChannelRole::Guest => ChannelRole::Guest,
 172                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 173            };
 174
 175            room_participant::ActiveModel {
 176                room_id: ActiveValue::set(room_id),
 177                user_id: ActiveValue::set(called_user_id),
 178                answering_connection_lost: ActiveValue::set(false),
 179                participant_index: ActiveValue::NotSet,
 180                calling_user_id: ActiveValue::set(calling_user_id),
 181                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 182                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 183                    calling_connection.owner_id as i32,
 184                ))),
 185                initial_project_id: ActiveValue::set(initial_project_id),
 186                role: ActiveValue::set(Some(called_user_role)),
 187
 188                id: ActiveValue::NotSet,
 189                answering_connection_id: ActiveValue::NotSet,
 190                answering_connection_server_id: ActiveValue::NotSet,
 191                location_kind: ActiveValue::NotSet,
 192                location_project_id: ActiveValue::NotSet,
 193            }
 194            .insert(&*tx)
 195            .await?;
 196
 197            let room = self.get_room(room_id, &tx).await?;
 198            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 199                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 200            Ok((room, incoming_call))
 201        })
 202        .await
 203    }
 204
 205    pub async fn call_failed(
 206        &self,
 207        room_id: RoomId,
 208        called_user_id: UserId,
 209    ) -> Result<RoomGuard<proto::Room>> {
 210        self.room_transaction(room_id, |tx| async move {
 211            room_participant::Entity::delete_many()
 212                .filter(
 213                    room_participant::Column::RoomId
 214                        .eq(room_id)
 215                        .and(room_participant::Column::UserId.eq(called_user_id)),
 216                )
 217                .exec(&*tx)
 218                .await?;
 219            let room = self.get_room(room_id, &tx).await?;
 220            Ok(room)
 221        })
 222        .await
 223    }
 224
 225    pub async fn decline_call(
 226        &self,
 227        expected_room_id: Option<RoomId>,
 228        user_id: UserId,
 229    ) -> Result<Option<RoomGuard<proto::Room>>> {
 230        self.optional_room_transaction(|tx| async move {
 231            let mut filter = Condition::all()
 232                .add(room_participant::Column::UserId.eq(user_id))
 233                .add(room_participant::Column::AnsweringConnectionId.is_null());
 234            if let Some(room_id) = expected_room_id {
 235                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 236            }
 237            let participant = room_participant::Entity::find()
 238                .filter(filter)
 239                .one(&*tx)
 240                .await?;
 241
 242            let participant = if let Some(participant) = participant {
 243                participant
 244            } else if expected_room_id.is_some() {
 245                return Err(anyhow!("could not find call to decline"))?;
 246            } else {
 247                return Ok(None);
 248            };
 249
 250            let room_id = participant.room_id;
 251            room_participant::Entity::delete(participant.into_active_model())
 252                .exec(&*tx)
 253                .await?;
 254
 255            let room = self.get_room(room_id, &tx).await?;
 256            Ok(Some((room_id, room)))
 257        })
 258        .await
 259    }
 260
 261    pub async fn cancel_call(
 262        &self,
 263        room_id: RoomId,
 264        calling_connection: ConnectionId,
 265        called_user_id: UserId,
 266    ) -> Result<RoomGuard<proto::Room>> {
 267        self.room_transaction(room_id, |tx| async move {
 268            let participant = room_participant::Entity::find()
 269                .filter(
 270                    Condition::all()
 271                        .add(room_participant::Column::UserId.eq(called_user_id))
 272                        .add(room_participant::Column::RoomId.eq(room_id))
 273                        .add(
 274                            room_participant::Column::CallingConnectionId
 275                                .eq(calling_connection.id as i32),
 276                        )
 277                        .add(
 278                            room_participant::Column::CallingConnectionServerId
 279                                .eq(calling_connection.owner_id as i32),
 280                        )
 281                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 282                )
 283                .one(&*tx)
 284                .await?
 285                .ok_or_else(|| anyhow!("no call to cancel"))?;
 286
 287            room_participant::Entity::delete(participant.into_active_model())
 288                .exec(&*tx)
 289                .await?;
 290
 291            let room = self.get_room(room_id, &tx).await?;
 292            Ok(room)
 293        })
 294        .await
 295    }
 296
 297    pub async fn join_room(
 298        &self,
 299        room_id: RoomId,
 300        user_id: UserId,
 301        connection: ConnectionId,
 302        enviroment: &str,
 303    ) -> Result<RoomGuard<JoinRoom>> {
 304        self.room_transaction(room_id, |tx| async move {
 305            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 306            enum QueryChannelIdAndEnviroment {
 307                ChannelId,
 308                Enviroment,
 309            }
 310
 311            let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
 312                room::Entity::find()
 313                    .select_only()
 314                    .column(room::Column::ChannelId)
 315                    .column(room::Column::Enviroment)
 316                    .filter(room::Column::Id.eq(room_id))
 317                    .into_values::<_, QueryChannelIdAndEnviroment>()
 318                    .one(&*tx)
 319                    .await?
 320                    .ok_or_else(|| anyhow!("no such room"))?;
 321
 322            if let Some(release_channel) = release_channel {
 323                if &release_channel != enviroment {
 324                    Err(anyhow!("must join using the {} release", release_channel))?;
 325                }
 326            }
 327
 328            if channel_id.is_some() {
 329                Err(anyhow!("tried to join channel call directly"))?
 330            }
 331
 332            let participant_index = self
 333                .get_next_participant_index_internal(room_id, &*tx)
 334                .await?;
 335
 336            let result = room_participant::Entity::update_many()
 337                .filter(
 338                    Condition::all()
 339                        .add(room_participant::Column::RoomId.eq(room_id))
 340                        .add(room_participant::Column::UserId.eq(user_id))
 341                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 342                )
 343                .set(room_participant::ActiveModel {
 344                    participant_index: ActiveValue::Set(Some(participant_index)),
 345                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 346                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 347                        connection.owner_id as i32,
 348                    ))),
 349                    answering_connection_lost: ActiveValue::set(false),
 350                    ..Default::default()
 351                })
 352                .exec(&*tx)
 353                .await?;
 354            if result.rows_affected == 0 {
 355                Err(anyhow!("room does not exist or was already joined"))?;
 356            }
 357
 358            let room = self.get_room(room_id, &tx).await?;
 359            Ok(JoinRoom {
 360                room,
 361                channel_id: None,
 362                channel_members: vec![],
 363            })
 364        })
 365        .await
 366    }
 367
 368    async fn get_next_participant_index_internal(
 369        &self,
 370        room_id: RoomId,
 371        tx: &DatabaseTransaction,
 372    ) -> Result<i32> {
 373        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 374        enum QueryParticipantIndices {
 375            ParticipantIndex,
 376        }
 377        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 378            .filter(
 379                room_participant::Column::RoomId
 380                    .eq(room_id)
 381                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 382            )
 383            .select_only()
 384            .column(room_participant::Column::ParticipantIndex)
 385            .into_values::<_, QueryParticipantIndices>()
 386            .all(&*tx)
 387            .await?;
 388
 389        let mut participant_index = 0;
 390        while existing_participant_indices.contains(&participant_index) {
 391            participant_index += 1;
 392        }
 393
 394        Ok(participant_index)
 395    }
 396
 397    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 398        self.transaction(|tx| async move {
 399            let room: Option<room::Model> = room::Entity::find()
 400                .filter(room::Column::Id.eq(room_id))
 401                .one(&*tx)
 402                .await?;
 403
 404            Ok(room.and_then(|room| room.channel_id))
 405        })
 406        .await
 407    }
 408
 409    pub(crate) async fn join_channel_room_internal(
 410        &self,
 411        room_id: RoomId,
 412        user_id: UserId,
 413        connection: ConnectionId,
 414        role: ChannelRole,
 415        tx: &DatabaseTransaction,
 416    ) -> Result<JoinRoom> {
 417        let participant_index = self
 418            .get_next_participant_index_internal(room_id, &*tx)
 419            .await?;
 420
 421        room_participant::Entity::insert_many([room_participant::ActiveModel {
 422            room_id: ActiveValue::set(room_id),
 423            user_id: ActiveValue::set(user_id),
 424            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 425            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 426                connection.owner_id as i32,
 427            ))),
 428            answering_connection_lost: ActiveValue::set(false),
 429            calling_user_id: ActiveValue::set(user_id),
 430            calling_connection_id: ActiveValue::set(connection.id as i32),
 431            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 432                connection.owner_id as i32,
 433            ))),
 434            participant_index: ActiveValue::Set(Some(participant_index)),
 435            role: ActiveValue::set(Some(role)),
 436            id: ActiveValue::NotSet,
 437            location_kind: ActiveValue::NotSet,
 438            location_project_id: ActiveValue::NotSet,
 439            initial_project_id: ActiveValue::NotSet,
 440        }])
 441        .on_conflict(
 442            OnConflict::columns([room_participant::Column::UserId])
 443                .update_columns([
 444                    room_participant::Column::AnsweringConnectionId,
 445                    room_participant::Column::AnsweringConnectionServerId,
 446                    room_participant::Column::AnsweringConnectionLost,
 447                    room_participant::Column::ParticipantIndex,
 448                    room_participant::Column::Role,
 449                ])
 450                .to_owned(),
 451        )
 452        .exec(&*tx)
 453        .await?;
 454
 455        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 456        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 457        let channel_members = self.get_channel_participants(&channel, &*tx).await?;
 458        Ok(JoinRoom {
 459            room,
 460            channel_id: Some(channel.id),
 461            channel_members,
 462        })
 463    }
 464
 465    pub async fn rejoin_room(
 466        &self,
 467        rejoin_room: proto::RejoinRoom,
 468        user_id: UserId,
 469        connection: ConnectionId,
 470    ) -> Result<RoomGuard<RejoinedRoom>> {
 471        let room_id = RoomId::from_proto(rejoin_room.id);
 472        self.room_transaction(room_id, |tx| async {
 473            let tx = tx;
 474            let participant_update = room_participant::Entity::update_many()
 475                .filter(
 476                    Condition::all()
 477                        .add(room_participant::Column::RoomId.eq(room_id))
 478                        .add(room_participant::Column::UserId.eq(user_id))
 479                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 480                        .add(
 481                            Condition::any()
 482                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 483                                .add(
 484                                    room_participant::Column::AnsweringConnectionServerId
 485                                        .ne(connection.owner_id as i32),
 486                                ),
 487                        ),
 488                )
 489                .set(room_participant::ActiveModel {
 490                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 491                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 492                        connection.owner_id as i32,
 493                    ))),
 494                    answering_connection_lost: ActiveValue::set(false),
 495                    ..Default::default()
 496                })
 497                .exec(&*tx)
 498                .await?;
 499            if participant_update.rows_affected == 0 {
 500                return Err(anyhow!("room does not exist or was already joined"))?;
 501            }
 502
 503            let mut reshared_projects = Vec::new();
 504            for reshared_project in &rejoin_room.reshared_projects {
 505                let project_id = ProjectId::from_proto(reshared_project.project_id);
 506                let project = project::Entity::find_by_id(project_id)
 507                    .one(&*tx)
 508                    .await?
 509                    .ok_or_else(|| anyhow!("project does not exist"))?;
 510                if project.host_user_id != user_id {
 511                    return Err(anyhow!("no such project"))?;
 512                }
 513
 514                let mut collaborators = project
 515                    .find_related(project_collaborator::Entity)
 516                    .all(&*tx)
 517                    .await?;
 518                let host_ix = collaborators
 519                    .iter()
 520                    .position(|collaborator| {
 521                        collaborator.user_id == user_id && collaborator.is_host
 522                    })
 523                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 524                let host = collaborators.swap_remove(host_ix);
 525                let old_connection_id = host.connection();
 526
 527                project::Entity::update(project::ActiveModel {
 528                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 529                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 530                        connection.owner_id as i32,
 531                    ))),
 532                    ..project.into_active_model()
 533                })
 534                .exec(&*tx)
 535                .await?;
 536                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 537                    connection_id: ActiveValue::set(connection.id as i32),
 538                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 539                    ..host.into_active_model()
 540                })
 541                .exec(&*tx)
 542                .await?;
 543
 544                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 545                    .await?;
 546
 547                reshared_projects.push(ResharedProject {
 548                    id: project_id,
 549                    old_connection_id,
 550                    collaborators: collaborators
 551                        .iter()
 552                        .map(|collaborator| ProjectCollaborator {
 553                            connection_id: collaborator.connection(),
 554                            user_id: collaborator.user_id,
 555                            replica_id: collaborator.replica_id,
 556                            is_host: collaborator.is_host,
 557                        })
 558                        .collect(),
 559                    worktrees: reshared_project.worktrees.clone(),
 560                });
 561            }
 562
 563            project::Entity::delete_many()
 564                .filter(
 565                    Condition::all()
 566                        .add(project::Column::RoomId.eq(room_id))
 567                        .add(project::Column::HostUserId.eq(user_id))
 568                        .add(
 569                            project::Column::Id
 570                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 571                        ),
 572                )
 573                .exec(&*tx)
 574                .await?;
 575
 576            let mut rejoined_projects = Vec::new();
 577            for rejoined_project in &rejoin_room.rejoined_projects {
 578                let project_id = ProjectId::from_proto(rejoined_project.id);
 579                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 580                    continue;
 581                };
 582
 583                let mut worktrees = Vec::new();
 584                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 585                for db_worktree in db_worktrees {
 586                    let mut worktree = RejoinedWorktree {
 587                        id: db_worktree.id as u64,
 588                        abs_path: db_worktree.abs_path,
 589                        root_name: db_worktree.root_name,
 590                        visible: db_worktree.visible,
 591                        updated_entries: Default::default(),
 592                        removed_entries: Default::default(),
 593                        updated_repositories: Default::default(),
 594                        removed_repositories: Default::default(),
 595                        diagnostic_summaries: Default::default(),
 596                        settings_files: Default::default(),
 597                        scan_id: db_worktree.scan_id as u64,
 598                        completed_scan_id: db_worktree.completed_scan_id as u64,
 599                    };
 600
 601                    let rejoined_worktree = rejoined_project
 602                        .worktrees
 603                        .iter()
 604                        .find(|worktree| worktree.id == db_worktree.id as u64);
 605
 606                    // File entries
 607                    {
 608                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 609                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 610                        } else {
 611                            worktree_entry::Column::IsDeleted.eq(false)
 612                        };
 613
 614                        let mut db_entries = worktree_entry::Entity::find()
 615                            .filter(
 616                                Condition::all()
 617                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 618                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 619                                    .add(entry_filter),
 620                            )
 621                            .stream(&*tx)
 622                            .await?;
 623
 624                        while let Some(db_entry) = db_entries.next().await {
 625                            let db_entry = db_entry?;
 626                            if db_entry.is_deleted {
 627                                worktree.removed_entries.push(db_entry.id as u64);
 628                            } else {
 629                                worktree.updated_entries.push(proto::Entry {
 630                                    id: db_entry.id as u64,
 631                                    is_dir: db_entry.is_dir,
 632                                    path: db_entry.path,
 633                                    inode: db_entry.inode as u64,
 634                                    mtime: Some(proto::Timestamp {
 635                                        seconds: db_entry.mtime_seconds as u64,
 636                                        nanos: db_entry.mtime_nanos as u32,
 637                                    }),
 638                                    is_symlink: db_entry.is_symlink,
 639                                    is_ignored: db_entry.is_ignored,
 640                                    is_external: db_entry.is_external,
 641                                    git_status: db_entry.git_status.map(|status| status as i32),
 642                                });
 643                            }
 644                        }
 645                    }
 646
 647                    // Repository Entries
 648                    {
 649                        let repository_entry_filter =
 650                            if let Some(rejoined_worktree) = rejoined_worktree {
 651                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 652                            } else {
 653                                worktree_repository::Column::IsDeleted.eq(false)
 654                            };
 655
 656                        let mut db_repositories = worktree_repository::Entity::find()
 657                            .filter(
 658                                Condition::all()
 659                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 660                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 661                                    .add(repository_entry_filter),
 662                            )
 663                            .stream(&*tx)
 664                            .await?;
 665
 666                        while let Some(db_repository) = db_repositories.next().await {
 667                            let db_repository = db_repository?;
 668                            if db_repository.is_deleted {
 669                                worktree
 670                                    .removed_repositories
 671                                    .push(db_repository.work_directory_id as u64);
 672                            } else {
 673                                worktree.updated_repositories.push(proto::RepositoryEntry {
 674                                    work_directory_id: db_repository.work_directory_id as u64,
 675                                    branch: db_repository.branch,
 676                                });
 677                            }
 678                        }
 679                    }
 680
 681                    worktrees.push(worktree);
 682                }
 683
 684                let language_servers = project
 685                    .find_related(language_server::Entity)
 686                    .all(&*tx)
 687                    .await?
 688                    .into_iter()
 689                    .map(|language_server| proto::LanguageServer {
 690                        id: language_server.id as u64,
 691                        name: language_server.name,
 692                    })
 693                    .collect::<Vec<_>>();
 694
 695                {
 696                    let mut db_settings_files = worktree_settings_file::Entity::find()
 697                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 698                        .stream(&*tx)
 699                        .await?;
 700                    while let Some(db_settings_file) = db_settings_files.next().await {
 701                        let db_settings_file = db_settings_file?;
 702                        if let Some(worktree) = worktrees
 703                            .iter_mut()
 704                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 705                        {
 706                            worktree.settings_files.push(WorktreeSettingsFile {
 707                                path: db_settings_file.path,
 708                                content: db_settings_file.content,
 709                            });
 710                        }
 711                    }
 712                }
 713
 714                let mut collaborators = project
 715                    .find_related(project_collaborator::Entity)
 716                    .all(&*tx)
 717                    .await?;
 718                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 719                    .iter()
 720                    .position(|collaborator| collaborator.user_id == user_id)
 721                {
 722                    collaborators.swap_remove(self_collaborator_ix)
 723                } else {
 724                    continue;
 725                };
 726                let old_connection_id = self_collaborator.connection();
 727                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 728                    connection_id: ActiveValue::set(connection.id as i32),
 729                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 730                    ..self_collaborator.into_active_model()
 731                })
 732                .exec(&*tx)
 733                .await?;
 734
 735                let collaborators = collaborators
 736                    .into_iter()
 737                    .map(|collaborator| ProjectCollaborator {
 738                        connection_id: collaborator.connection(),
 739                        user_id: collaborator.user_id,
 740                        replica_id: collaborator.replica_id,
 741                        is_host: collaborator.is_host,
 742                    })
 743                    .collect::<Vec<_>>();
 744
 745                rejoined_projects.push(RejoinedProject {
 746                    id: project_id,
 747                    old_connection_id,
 748                    collaborators,
 749                    worktrees,
 750                    language_servers,
 751                });
 752            }
 753
 754            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 755            let channel_members = if let Some(channel) = &channel {
 756                self.get_channel_participants(&channel, &tx).await?
 757            } else {
 758                Vec::new()
 759            };
 760
 761            Ok(RejoinedRoom {
 762                room,
 763                channel_id: channel.map(|channel| channel.id),
 764                channel_members,
 765                rejoined_projects,
 766                reshared_projects,
 767            })
 768        })
 769        .await
 770    }
 771
 772    pub async fn leave_room(
 773        &self,
 774        connection: ConnectionId,
 775    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 776        self.optional_room_transaction(|tx| async move {
 777            let leaving_participant = room_participant::Entity::find()
 778                .filter(
 779                    Condition::all()
 780                        .add(
 781                            room_participant::Column::AnsweringConnectionId
 782                                .eq(connection.id as i32),
 783                        )
 784                        .add(
 785                            room_participant::Column::AnsweringConnectionServerId
 786                                .eq(connection.owner_id as i32),
 787                        ),
 788                )
 789                .one(&*tx)
 790                .await?;
 791
 792            if let Some(leaving_participant) = leaving_participant {
 793                // Leave room.
 794                let room_id = leaving_participant.room_id;
 795                room_participant::Entity::delete_by_id(leaving_participant.id)
 796                    .exec(&*tx)
 797                    .await?;
 798
 799                // Cancel pending calls initiated by the leaving user.
 800                let called_participants = room_participant::Entity::find()
 801                    .filter(
 802                        Condition::all()
 803                            .add(
 804                                room_participant::Column::CallingUserId
 805                                    .eq(leaving_participant.user_id),
 806                            )
 807                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 808                    )
 809                    .all(&*tx)
 810                    .await?;
 811                room_participant::Entity::delete_many()
 812                    .filter(
 813                        room_participant::Column::Id
 814                            .is_in(called_participants.iter().map(|participant| participant.id)),
 815                    )
 816                    .exec(&*tx)
 817                    .await?;
 818                let canceled_calls_to_user_ids = called_participants
 819                    .into_iter()
 820                    .map(|participant| participant.user_id)
 821                    .collect();
 822
 823                // Detect left projects.
 824                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 825                enum QueryProjectIds {
 826                    ProjectId,
 827                }
 828                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 829                    .select_only()
 830                    .column_as(
 831                        project_collaborator::Column::ProjectId,
 832                        QueryProjectIds::ProjectId,
 833                    )
 834                    .filter(
 835                        Condition::all()
 836                            .add(
 837                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 838                            )
 839                            .add(
 840                                project_collaborator::Column::ConnectionServerId
 841                                    .eq(connection.owner_id as i32),
 842                            ),
 843                    )
 844                    .into_values::<_, QueryProjectIds>()
 845                    .all(&*tx)
 846                    .await?;
 847                let mut left_projects = HashMap::default();
 848                let mut collaborators = project_collaborator::Entity::find()
 849                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 850                    .stream(&*tx)
 851                    .await?;
 852                while let Some(collaborator) = collaborators.next().await {
 853                    let collaborator = collaborator?;
 854                    let left_project =
 855                        left_projects
 856                            .entry(collaborator.project_id)
 857                            .or_insert(LeftProject {
 858                                id: collaborator.project_id,
 859                                host_user_id: Default::default(),
 860                                connection_ids: Default::default(),
 861                                host_connection_id: Default::default(),
 862                            });
 863
 864                    let collaborator_connection_id = collaborator.connection();
 865                    if collaborator_connection_id != connection {
 866                        left_project.connection_ids.push(collaborator_connection_id);
 867                    }
 868
 869                    if collaborator.is_host {
 870                        left_project.host_user_id = collaborator.user_id;
 871                        left_project.host_connection_id = collaborator_connection_id;
 872                    }
 873                }
 874                drop(collaborators);
 875
 876                // Leave projects.
 877                project_collaborator::Entity::delete_many()
 878                    .filter(
 879                        Condition::all()
 880                            .add(
 881                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 882                            )
 883                            .add(
 884                                project_collaborator::Column::ConnectionServerId
 885                                    .eq(connection.owner_id as i32),
 886                            ),
 887                    )
 888                    .exec(&*tx)
 889                    .await?;
 890
 891                // Unshare projects.
 892                project::Entity::delete_many()
 893                    .filter(
 894                        Condition::all()
 895                            .add(project::Column::RoomId.eq(room_id))
 896                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 897                            .add(
 898                                project::Column::HostConnectionServerId
 899                                    .eq(connection.owner_id as i32),
 900                            ),
 901                    )
 902                    .exec(&*tx)
 903                    .await?;
 904
 905                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 906                let deleted = if room.participants.is_empty() {
 907                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 908                    result.rows_affected > 0
 909                } else {
 910                    false
 911                };
 912
 913                let channel_members = if let Some(channel) = &channel {
 914                    self.get_channel_participants(channel, &tx).await?
 915                } else {
 916                    Vec::new()
 917                };
 918                let left_room = LeftRoom {
 919                    room,
 920                    channel_id: channel.map(|channel| channel.id),
 921                    channel_members,
 922                    left_projects,
 923                    canceled_calls_to_user_ids,
 924                    deleted,
 925                };
 926
 927                if left_room.room.participants.is_empty() {
 928                    self.rooms.remove(&room_id);
 929                }
 930
 931                Ok(Some((room_id, left_room)))
 932            } else {
 933                Ok(None)
 934            }
 935        })
 936        .await
 937    }
 938
 939    pub async fn update_room_participant_location(
 940        &self,
 941        room_id: RoomId,
 942        connection: ConnectionId,
 943        location: proto::ParticipantLocation,
 944    ) -> Result<RoomGuard<proto::Room>> {
 945        self.room_transaction(room_id, |tx| async {
 946            let tx = tx;
 947            let location_kind;
 948            let location_project_id;
 949            match location
 950                .variant
 951                .as_ref()
 952                .ok_or_else(|| anyhow!("invalid location"))?
 953            {
 954                proto::participant_location::Variant::SharedProject(project) => {
 955                    location_kind = 0;
 956                    location_project_id = Some(ProjectId::from_proto(project.id));
 957                }
 958                proto::participant_location::Variant::UnsharedProject(_) => {
 959                    location_kind = 1;
 960                    location_project_id = None;
 961                }
 962                proto::participant_location::Variant::External(_) => {
 963                    location_kind = 2;
 964                    location_project_id = None;
 965                }
 966            }
 967
 968            let result = room_participant::Entity::update_many()
 969                .filter(
 970                    Condition::all()
 971                        .add(room_participant::Column::RoomId.eq(room_id))
 972                        .add(
 973                            room_participant::Column::AnsweringConnectionId
 974                                .eq(connection.id as i32),
 975                        )
 976                        .add(
 977                            room_participant::Column::AnsweringConnectionServerId
 978                                .eq(connection.owner_id as i32),
 979                        ),
 980                )
 981                .set(room_participant::ActiveModel {
 982                    location_kind: ActiveValue::set(Some(location_kind)),
 983                    location_project_id: ActiveValue::set(location_project_id),
 984                    ..Default::default()
 985                })
 986                .exec(&*tx)
 987                .await?;
 988
 989            if result.rows_affected == 1 {
 990                let room = self.get_room(room_id, &tx).await?;
 991                Ok(room)
 992            } else {
 993                Err(anyhow!("could not update room participant location"))?
 994            }
 995        })
 996        .await
 997    }
 998
 999    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1000        self.transaction(|tx| async move {
1001            self.room_connection_lost(connection, &*tx).await?;
1002            self.channel_buffer_connection_lost(connection, &*tx)
1003                .await?;
1004            self.channel_chat_connection_lost(connection, &*tx).await?;
1005            Ok(())
1006        })
1007        .await
1008    }
1009
1010    pub async fn room_connection_lost(
1011        &self,
1012        connection: ConnectionId,
1013        tx: &DatabaseTransaction,
1014    ) -> Result<()> {
1015        let participant = room_participant::Entity::find()
1016            .filter(
1017                Condition::all()
1018                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1019                    .add(
1020                        room_participant::Column::AnsweringConnectionServerId
1021                            .eq(connection.owner_id as i32),
1022                    ),
1023            )
1024            .one(&*tx)
1025            .await?;
1026
1027        if let Some(participant) = participant {
1028            room_participant::Entity::update(room_participant::ActiveModel {
1029                answering_connection_lost: ActiveValue::set(true),
1030                ..participant.into_active_model()
1031            })
1032            .exec(&*tx)
1033            .await?;
1034        }
1035        Ok(())
1036    }
1037
1038    fn build_incoming_call(
1039        room: &proto::Room,
1040        called_user_id: UserId,
1041    ) -> Option<proto::IncomingCall> {
1042        let pending_participant = room
1043            .pending_participants
1044            .iter()
1045            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1046
1047        Some(proto::IncomingCall {
1048            room_id: room.id,
1049            calling_user_id: pending_participant.calling_user_id,
1050            participant_user_ids: room
1051                .participants
1052                .iter()
1053                .map(|participant| participant.user_id)
1054                .collect(),
1055            initial_project: room.participants.iter().find_map(|participant| {
1056                let initial_project_id = pending_participant.initial_project_id?;
1057                participant
1058                    .projects
1059                    .iter()
1060                    .find(|project| project.id == initial_project_id)
1061                    .cloned()
1062            }),
1063        })
1064    }
1065
1066    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1067        let (_, room) = self.get_channel_room(room_id, tx).await?;
1068        Ok(room)
1069    }
1070
1071    pub async fn room_connection_ids(
1072        &self,
1073        room_id: RoomId,
1074        connection_id: ConnectionId,
1075    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1076        self.room_transaction(room_id, |tx| async move {
1077            let mut participants = room_participant::Entity::find()
1078                .filter(room_participant::Column::RoomId.eq(room_id))
1079                .stream(&*tx)
1080                .await?;
1081
1082            let mut is_participant = false;
1083            let mut connection_ids = HashSet::default();
1084            while let Some(participant) = participants.next().await {
1085                let participant = participant?;
1086                if let Some(answering_connection) = participant.answering_connection() {
1087                    if answering_connection == connection_id {
1088                        is_participant = true;
1089                    } else {
1090                        connection_ids.insert(answering_connection);
1091                    }
1092                }
1093            }
1094
1095            if !is_participant {
1096                Err(anyhow!("not a room participant"))?;
1097            }
1098
1099            Ok(connection_ids)
1100        })
1101        .await
1102    }
1103
1104    async fn get_channel_room(
1105        &self,
1106        room_id: RoomId,
1107        tx: &DatabaseTransaction,
1108    ) -> Result<(Option<channel::Model>, proto::Room)> {
1109        let db_room = room::Entity::find_by_id(room_id)
1110            .one(tx)
1111            .await?
1112            .ok_or_else(|| anyhow!("could not find room"))?;
1113
1114        let mut db_participants = db_room
1115            .find_related(room_participant::Entity)
1116            .stream(tx)
1117            .await?;
1118        let mut participants = HashMap::default();
1119        let mut pending_participants = Vec::new();
1120        while let Some(db_participant) = db_participants.next().await {
1121            let db_participant = db_participant?;
1122            if let (
1123                Some(answering_connection_id),
1124                Some(answering_connection_server_id),
1125                Some(participant_index),
1126            ) = (
1127                db_participant.answering_connection_id,
1128                db_participant.answering_connection_server_id,
1129                db_participant.participant_index,
1130            ) {
1131                let location = match (
1132                    db_participant.location_kind,
1133                    db_participant.location_project_id,
1134                ) {
1135                    (Some(0), Some(project_id)) => {
1136                        Some(proto::participant_location::Variant::SharedProject(
1137                            proto::participant_location::SharedProject {
1138                                id: project_id.to_proto(),
1139                            },
1140                        ))
1141                    }
1142                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1143                        Default::default(),
1144                    )),
1145                    _ => Some(proto::participant_location::Variant::External(
1146                        Default::default(),
1147                    )),
1148                };
1149
1150                let answering_connection = ConnectionId {
1151                    owner_id: answering_connection_server_id.0 as u32,
1152                    id: answering_connection_id as u32,
1153                };
1154                participants.insert(
1155                    answering_connection,
1156                    proto::Participant {
1157                        user_id: db_participant.user_id.to_proto(),
1158                        peer_id: Some(answering_connection.into()),
1159                        projects: Default::default(),
1160                        location: Some(proto::ParticipantLocation { variant: location }),
1161                        participant_index: participant_index as u32,
1162                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1163                    },
1164                );
1165            } else {
1166                pending_participants.push(proto::PendingParticipant {
1167                    user_id: db_participant.user_id.to_proto(),
1168                    calling_user_id: db_participant.calling_user_id.to_proto(),
1169                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1170                });
1171            }
1172        }
1173        drop(db_participants);
1174
1175        let mut db_projects = db_room
1176            .find_related(project::Entity)
1177            .find_with_related(worktree::Entity)
1178            .stream(tx)
1179            .await?;
1180
1181        while let Some(row) = db_projects.next().await {
1182            let (db_project, db_worktree) = row?;
1183            let host_connection = db_project.host_connection()?;
1184            if let Some(participant) = participants.get_mut(&host_connection) {
1185                let project = if let Some(project) = participant
1186                    .projects
1187                    .iter_mut()
1188                    .find(|project| project.id == db_project.id.to_proto())
1189                {
1190                    project
1191                } else {
1192                    participant.projects.push(proto::ParticipantProject {
1193                        id: db_project.id.to_proto(),
1194                        worktree_root_names: Default::default(),
1195                    });
1196                    participant.projects.last_mut().unwrap()
1197                };
1198
1199                if let Some(db_worktree) = db_worktree {
1200                    if db_worktree.visible {
1201                        project.worktree_root_names.push(db_worktree.root_name);
1202                    }
1203                }
1204            }
1205        }
1206        drop(db_projects);
1207
1208        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1209        let mut followers = Vec::new();
1210        while let Some(db_follower) = db_followers.next().await {
1211            let db_follower = db_follower?;
1212            followers.push(proto::Follower {
1213                leader_id: Some(db_follower.leader_connection().into()),
1214                follower_id: Some(db_follower.follower_connection().into()),
1215                project_id: db_follower.project_id.to_proto(),
1216            });
1217        }
1218        drop(db_followers);
1219
1220        let channel = if let Some(channel_id) = db_room.channel_id {
1221            Some(self.get_channel_internal(channel_id, &*tx).await?)
1222        } else {
1223            None
1224        };
1225
1226        Ok((
1227            channel,
1228            proto::Room {
1229                id: db_room.id.to_proto(),
1230                live_kit_room: db_room.live_kit_room,
1231                participants: participants.into_values().collect(),
1232                pending_participants,
1233                followers,
1234            },
1235        ))
1236    }
1237}