rooms.rs

   1use super::*;
   2
   3impl Database {
   4    pub async fn clear_stale_room_participants(
   5        &self,
   6        room_id: RoomId,
   7        new_server_id: ServerId,
   8    ) -> Result<RoomGuard<RefreshedRoom>> {
   9        self.room_transaction(room_id, |tx| async move {
  10            let stale_participant_filter = Condition::all()
  11                .add(room_participant::Column::RoomId.eq(room_id))
  12                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  13                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  14
  15            let stale_participant_user_ids = room_participant::Entity::find()
  16                .filter(stale_participant_filter.clone())
  17                .all(&*tx)
  18                .await?
  19                .into_iter()
  20                .map(|participant| participant.user_id)
  21                .collect::<Vec<_>>();
  22
  23            // Delete participants who failed to reconnect and cancel their calls.
  24            let mut canceled_calls_to_user_ids = Vec::new();
  25            room_participant::Entity::delete_many()
  26                .filter(stale_participant_filter)
  27                .exec(&*tx)
  28                .await?;
  29            let called_participants = room_participant::Entity::find()
  30                .filter(
  31                    Condition::all()
  32                        .add(
  33                            room_participant::Column::CallingUserId
  34                                .is_in(stale_participant_user_ids.iter().copied()),
  35                        )
  36                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  37                )
  38                .all(&*tx)
  39                .await?;
  40            room_participant::Entity::delete_many()
  41                .filter(
  42                    room_participant::Column::Id
  43                        .is_in(called_participants.iter().map(|participant| participant.id)),
  44                )
  45                .exec(&*tx)
  46                .await?;
  47            canceled_calls_to_user_ids.extend(
  48                called_participants
  49                    .into_iter()
  50                    .map(|participant| participant.user_id),
  51            );
  52
  53            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  54            let channel_members;
  55            if let Some(channel) = &channel {
  56                channel_members = self.get_channel_participants(channel, &tx).await?;
  57            } else {
  58                channel_members = Vec::new();
  59
  60                // Delete the room if it becomes empty.
  61                if room.participants.is_empty() {
  62                    project::Entity::delete_many()
  63                        .filter(project::Column::RoomId.eq(room_id))
  64                        .exec(&*tx)
  65                        .await?;
  66                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  67                }
  68            };
  69
  70            Ok(RefreshedRoom {
  71                room,
  72                channel_id: channel.map(|channel| channel.id),
  73                channel_members,
  74                stale_participant_user_ids,
  75                canceled_calls_to_user_ids,
  76            })
  77        })
  78        .await
  79    }
  80
  81    pub async fn incoming_call_for_user(
  82        &self,
  83        user_id: UserId,
  84    ) -> Result<Option<proto::IncomingCall>> {
  85        self.transaction(|tx| async move {
  86            let pending_participant = room_participant::Entity::find()
  87                .filter(
  88                    room_participant::Column::UserId
  89                        .eq(user_id)
  90                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  91                )
  92                .one(&*tx)
  93                .await?;
  94
  95            if let Some(pending_participant) = pending_participant {
  96                let room = self.get_room(pending_participant.room_id, &tx).await?;
  97                Ok(Self::build_incoming_call(&room, user_id))
  98            } else {
  99                Ok(None)
 100            }
 101        })
 102        .await
 103    }
 104
 105    pub async fn create_room(
 106        &self,
 107        user_id: UserId,
 108        connection: ConnectionId,
 109        live_kit_room: &str,
 110        release_channel: &str,
 111    ) -> Result<proto::Room> {
 112        self.transaction(|tx| async move {
 113            let room = room::ActiveModel {
 114                live_kit_room: ActiveValue::set(live_kit_room.into()),
 115                environment: ActiveValue::set(Some(release_channel.to_string())),
 116                ..Default::default()
 117            }
 118            .insert(&*tx)
 119            .await?;
 120            room_participant::ActiveModel {
 121                room_id: ActiveValue::set(room.id),
 122                user_id: ActiveValue::set(user_id),
 123                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 124                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 125                    connection.owner_id as i32,
 126                ))),
 127                answering_connection_lost: ActiveValue::set(false),
 128                calling_user_id: ActiveValue::set(user_id),
 129                calling_connection_id: ActiveValue::set(connection.id as i32),
 130                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 131                    connection.owner_id as i32,
 132                ))),
 133                participant_index: ActiveValue::set(Some(0)),
 134                role: ActiveValue::set(Some(ChannelRole::Admin)),
 135
 136                id: ActiveValue::NotSet,
 137                location_kind: ActiveValue::NotSet,
 138                location_project_id: ActiveValue::NotSet,
 139                initial_project_id: ActiveValue::NotSet,
 140            }
 141            .insert(&*tx)
 142            .await?;
 143
 144            let room = self.get_room(room.id, &tx).await?;
 145            Ok(room)
 146        })
 147        .await
 148    }
 149
 150    pub async fn call(
 151        &self,
 152        room_id: RoomId,
 153        calling_user_id: UserId,
 154        calling_connection: ConnectionId,
 155        called_user_id: UserId,
 156        initial_project_id: Option<ProjectId>,
 157    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 158        self.room_transaction(room_id, |tx| async move {
 159            let caller = room_participant::Entity::find()
 160                .filter(
 161                    room_participant::Column::UserId
 162                        .eq(calling_user_id)
 163                        .and(room_participant::Column::RoomId.eq(room_id)),
 164                )
 165                .one(&*tx)
 166                .await?
 167                .ok_or_else(|| anyhow!("user is not in the room"))?;
 168
 169            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 170                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 171                ChannelRole::Guest => ChannelRole::Guest,
 172                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 173            };
 174
 175            room_participant::ActiveModel {
 176                room_id: ActiveValue::set(room_id),
 177                user_id: ActiveValue::set(called_user_id),
 178                answering_connection_lost: ActiveValue::set(false),
 179                participant_index: ActiveValue::NotSet,
 180                calling_user_id: ActiveValue::set(calling_user_id),
 181                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 182                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 183                    calling_connection.owner_id as i32,
 184                ))),
 185                initial_project_id: ActiveValue::set(initial_project_id),
 186                role: ActiveValue::set(Some(called_user_role)),
 187
 188                id: ActiveValue::NotSet,
 189                answering_connection_id: ActiveValue::NotSet,
 190                answering_connection_server_id: ActiveValue::NotSet,
 191                location_kind: ActiveValue::NotSet,
 192                location_project_id: ActiveValue::NotSet,
 193            }
 194            .insert(&*tx)
 195            .await?;
 196
 197            let room = self.get_room(room_id, &tx).await?;
 198            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 199                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 200            Ok((room, incoming_call))
 201        })
 202        .await
 203    }
 204
 205    pub async fn call_failed(
 206        &self,
 207        room_id: RoomId,
 208        called_user_id: UserId,
 209    ) -> Result<RoomGuard<proto::Room>> {
 210        self.room_transaction(room_id, |tx| async move {
 211            room_participant::Entity::delete_many()
 212                .filter(
 213                    room_participant::Column::RoomId
 214                        .eq(room_id)
 215                        .and(room_participant::Column::UserId.eq(called_user_id)),
 216                )
 217                .exec(&*tx)
 218                .await?;
 219            let room = self.get_room(room_id, &tx).await?;
 220            Ok(room)
 221        })
 222        .await
 223    }
 224
 225    pub async fn decline_call(
 226        &self,
 227        expected_room_id: Option<RoomId>,
 228        user_id: UserId,
 229    ) -> Result<Option<RoomGuard<proto::Room>>> {
 230        self.optional_room_transaction(|tx| async move {
 231            let mut filter = Condition::all()
 232                .add(room_participant::Column::UserId.eq(user_id))
 233                .add(room_participant::Column::AnsweringConnectionId.is_null());
 234            if let Some(room_id) = expected_room_id {
 235                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 236            }
 237            let participant = room_participant::Entity::find()
 238                .filter(filter)
 239                .one(&*tx)
 240                .await?;
 241
 242            let participant = if let Some(participant) = participant {
 243                participant
 244            } else if expected_room_id.is_some() {
 245                return Err(anyhow!("could not find call to decline"))?;
 246            } else {
 247                return Ok(None);
 248            };
 249
 250            let room_id = participant.room_id;
 251            room_participant::Entity::delete(participant.into_active_model())
 252                .exec(&*tx)
 253                .await?;
 254
 255            let room = self.get_room(room_id, &tx).await?;
 256            Ok(Some((room_id, room)))
 257        })
 258        .await
 259    }
 260
 261    pub async fn cancel_call(
 262        &self,
 263        room_id: RoomId,
 264        calling_connection: ConnectionId,
 265        called_user_id: UserId,
 266    ) -> Result<RoomGuard<proto::Room>> {
 267        self.room_transaction(room_id, |tx| async move {
 268            let participant = room_participant::Entity::find()
 269                .filter(
 270                    Condition::all()
 271                        .add(room_participant::Column::UserId.eq(called_user_id))
 272                        .add(room_participant::Column::RoomId.eq(room_id))
 273                        .add(
 274                            room_participant::Column::CallingConnectionId
 275                                .eq(calling_connection.id as i32),
 276                        )
 277                        .add(
 278                            room_participant::Column::CallingConnectionServerId
 279                                .eq(calling_connection.owner_id as i32),
 280                        )
 281                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 282                )
 283                .one(&*tx)
 284                .await?
 285                .ok_or_else(|| anyhow!("no call to cancel"))?;
 286
 287            room_participant::Entity::delete(participant.into_active_model())
 288                .exec(&*tx)
 289                .await?;
 290
 291            let room = self.get_room(room_id, &tx).await?;
 292            Ok(room)
 293        })
 294        .await
 295    }
 296
 297    pub async fn join_room(
 298        &self,
 299        room_id: RoomId,
 300        user_id: UserId,
 301        connection: ConnectionId,
 302        environment: &str,
 303    ) -> Result<RoomGuard<JoinRoom>> {
 304        self.room_transaction(room_id, |tx| async move {
 305            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 306            enum QueryChannelIdAndEnvironment {
 307                ChannelId,
 308                Environment,
 309            }
 310
 311            let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
 312                room::Entity::find()
 313                    .select_only()
 314                    .column(room::Column::ChannelId)
 315                    .column(room::Column::Environment)
 316                    .filter(room::Column::Id.eq(room_id))
 317                    .into_values::<_, QueryChannelIdAndEnvironment>()
 318                    .one(&*tx)
 319                    .await?
 320                    .ok_or_else(|| anyhow!("no such room"))?;
 321
 322            if let Some(release_channel) = release_channel {
 323                if &release_channel != environment {
 324                    Err(anyhow!("must join using the {} release", release_channel))?;
 325                }
 326            }
 327
 328            if channel_id.is_some() {
 329                Err(anyhow!("tried to join channel call directly"))?
 330            }
 331
 332            let participant_index = self
 333                .get_next_participant_index_internal(room_id, &*tx)
 334                .await?;
 335
 336            let result = room_participant::Entity::update_many()
 337                .filter(
 338                    Condition::all()
 339                        .add(room_participant::Column::RoomId.eq(room_id))
 340                        .add(room_participant::Column::UserId.eq(user_id))
 341                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 342                )
 343                .set(room_participant::ActiveModel {
 344                    participant_index: ActiveValue::Set(Some(participant_index)),
 345                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 346                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 347                        connection.owner_id as i32,
 348                    ))),
 349                    answering_connection_lost: ActiveValue::set(false),
 350                    ..Default::default()
 351                })
 352                .exec(&*tx)
 353                .await?;
 354            if result.rows_affected == 0 {
 355                Err(anyhow!("room does not exist or was already joined"))?;
 356            }
 357
 358            let room = self.get_room(room_id, &tx).await?;
 359            Ok(JoinRoom {
 360                room,
 361                channel_id: None,
 362                channel_members: vec![],
 363            })
 364        })
 365        .await
 366    }
 367
 368    async fn get_next_participant_index_internal(
 369        &self,
 370        room_id: RoomId,
 371        tx: &DatabaseTransaction,
 372    ) -> Result<i32> {
 373        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 374        enum QueryParticipantIndices {
 375            ParticipantIndex,
 376        }
 377        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 378            .filter(
 379                room_participant::Column::RoomId
 380                    .eq(room_id)
 381                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 382            )
 383            .select_only()
 384            .column(room_participant::Column::ParticipantIndex)
 385            .into_values::<_, QueryParticipantIndices>()
 386            .all(&*tx)
 387            .await?;
 388
 389        let mut participant_index = 0;
 390        while existing_participant_indices.contains(&participant_index) {
 391            participant_index += 1;
 392        }
 393
 394        Ok(participant_index)
 395    }
 396
 397    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 398        self.transaction(|tx| async move {
 399            let room: Option<room::Model> = room::Entity::find()
 400                .filter(room::Column::Id.eq(room_id))
 401                .one(&*tx)
 402                .await?;
 403
 404            Ok(room.and_then(|room| room.channel_id))
 405        })
 406        .await
 407    }
 408
 409    pub(crate) async fn join_channel_room_internal(
 410        &self,
 411        room_id: RoomId,
 412        user_id: UserId,
 413        connection: ConnectionId,
 414        role: ChannelRole,
 415        tx: &DatabaseTransaction,
 416    ) -> Result<JoinRoom> {
 417        let participant_index = self
 418            .get_next_participant_index_internal(room_id, &*tx)
 419            .await?;
 420
 421        room_participant::Entity::insert_many([room_participant::ActiveModel {
 422            room_id: ActiveValue::set(room_id),
 423            user_id: ActiveValue::set(user_id),
 424            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 425            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 426                connection.owner_id as i32,
 427            ))),
 428            answering_connection_lost: ActiveValue::set(false),
 429            calling_user_id: ActiveValue::set(user_id),
 430            calling_connection_id: ActiveValue::set(connection.id as i32),
 431            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 432                connection.owner_id as i32,
 433            ))),
 434            participant_index: ActiveValue::Set(Some(participant_index)),
 435            role: ActiveValue::set(Some(role)),
 436            id: ActiveValue::NotSet,
 437            location_kind: ActiveValue::NotSet,
 438            location_project_id: ActiveValue::NotSet,
 439            initial_project_id: ActiveValue::NotSet,
 440        }])
 441        .on_conflict(
 442            OnConflict::columns([room_participant::Column::UserId])
 443                .update_columns([
 444                    room_participant::Column::AnsweringConnectionId,
 445                    room_participant::Column::AnsweringConnectionServerId,
 446                    room_participant::Column::AnsweringConnectionLost,
 447                    room_participant::Column::ParticipantIndex,
 448                    room_participant::Column::Role,
 449                ])
 450                .to_owned(),
 451        )
 452        .exec(&*tx)
 453        .await?;
 454
 455        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 456        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 457        let channel_members = self.get_channel_participants(&channel, &*tx).await?;
 458        Ok(JoinRoom {
 459            room,
 460            channel_id: Some(channel.id),
 461            channel_members,
 462        })
 463    }
 464
 465    pub async fn rejoin_room(
 466        &self,
 467        rejoin_room: proto::RejoinRoom,
 468        user_id: UserId,
 469        connection: ConnectionId,
 470    ) -> Result<RoomGuard<RejoinedRoom>> {
 471        let room_id = RoomId::from_proto(rejoin_room.id);
 472        self.room_transaction(room_id, |tx| async {
 473            let tx = tx;
 474            let participant_update = room_participant::Entity::update_many()
 475                .filter(
 476                    Condition::all()
 477                        .add(room_participant::Column::RoomId.eq(room_id))
 478                        .add(room_participant::Column::UserId.eq(user_id))
 479                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 480                        .add(
 481                            Condition::any()
 482                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 483                                .add(
 484                                    room_participant::Column::AnsweringConnectionServerId
 485                                        .ne(connection.owner_id as i32),
 486                                ),
 487                        ),
 488                )
 489                .set(room_participant::ActiveModel {
 490                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 491                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 492                        connection.owner_id as i32,
 493                    ))),
 494                    answering_connection_lost: ActiveValue::set(false),
 495                    ..Default::default()
 496                })
 497                .exec(&*tx)
 498                .await?;
 499            if participant_update.rows_affected == 0 {
 500                return Err(anyhow!("room does not exist or was already joined"))?;
 501            }
 502
 503            let mut reshared_projects = Vec::new();
 504            for reshared_project in &rejoin_room.reshared_projects {
 505                let project_id = ProjectId::from_proto(reshared_project.project_id);
 506                let project = project::Entity::find_by_id(project_id)
 507                    .one(&*tx)
 508                    .await?
 509                    .ok_or_else(|| anyhow!("project does not exist"))?;
 510                if project.host_user_id != user_id {
 511                    return Err(anyhow!("no such project"))?;
 512                }
 513
 514                let mut collaborators = project
 515                    .find_related(project_collaborator::Entity)
 516                    .all(&*tx)
 517                    .await?;
 518                let host_ix = collaborators
 519                    .iter()
 520                    .position(|collaborator| {
 521                        collaborator.user_id == user_id && collaborator.is_host
 522                    })
 523                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 524                let host = collaborators.swap_remove(host_ix);
 525                let old_connection_id = host.connection();
 526
 527                project::Entity::update(project::ActiveModel {
 528                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 529                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 530                        connection.owner_id as i32,
 531                    ))),
 532                    ..project.into_active_model()
 533                })
 534                .exec(&*tx)
 535                .await?;
 536                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 537                    connection_id: ActiveValue::set(connection.id as i32),
 538                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 539                    ..host.into_active_model()
 540                })
 541                .exec(&*tx)
 542                .await?;
 543
 544                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 545                    .await?;
 546
 547                reshared_projects.push(ResharedProject {
 548                    id: project_id,
 549                    old_connection_id,
 550                    collaborators: collaborators
 551                        .iter()
 552                        .map(|collaborator| ProjectCollaborator {
 553                            connection_id: collaborator.connection(),
 554                            user_id: collaborator.user_id,
 555                            replica_id: collaborator.replica_id,
 556                            is_host: collaborator.is_host,
 557                        })
 558                        .collect(),
 559                    worktrees: reshared_project.worktrees.clone(),
 560                });
 561            }
 562
 563            project::Entity::delete_many()
 564                .filter(
 565                    Condition::all()
 566                        .add(project::Column::RoomId.eq(room_id))
 567                        .add(project::Column::HostUserId.eq(user_id))
 568                        .add(
 569                            project::Column::Id
 570                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 571                        ),
 572                )
 573                .exec(&*tx)
 574                .await?;
 575
 576            let mut rejoined_projects = Vec::new();
 577            for rejoined_project in &rejoin_room.rejoined_projects {
 578                let project_id = ProjectId::from_proto(rejoined_project.id);
 579                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 580                    continue;
 581                };
 582
 583                let mut worktrees = Vec::new();
 584                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 585                for db_worktree in db_worktrees {
 586                    let mut worktree = RejoinedWorktree {
 587                        id: db_worktree.id as u64,
 588                        abs_path: db_worktree.abs_path,
 589                        root_name: db_worktree.root_name,
 590                        visible: db_worktree.visible,
 591                        updated_entries: Default::default(),
 592                        removed_entries: Default::default(),
 593                        updated_repositories: Default::default(),
 594                        removed_repositories: Default::default(),
 595                        diagnostic_summaries: Default::default(),
 596                        settings_files: Default::default(),
 597                        scan_id: db_worktree.scan_id as u64,
 598                        completed_scan_id: db_worktree.completed_scan_id as u64,
 599                    };
 600
 601                    let rejoined_worktree = rejoined_project
 602                        .worktrees
 603                        .iter()
 604                        .find(|worktree| worktree.id == db_worktree.id as u64);
 605
 606                    // File entries
 607                    {
 608                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 609                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 610                        } else {
 611                            worktree_entry::Column::IsDeleted.eq(false)
 612                        };
 613
 614                        let mut db_entries = worktree_entry::Entity::find()
 615                            .filter(
 616                                Condition::all()
 617                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 618                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 619                                    .add(entry_filter),
 620                            )
 621                            .stream(&*tx)
 622                            .await?;
 623
 624                        while let Some(db_entry) = db_entries.next().await {
 625                            let db_entry = db_entry?;
 626                            if db_entry.is_deleted {
 627                                worktree.removed_entries.push(db_entry.id as u64);
 628                            } else {
 629                                worktree.updated_entries.push(proto::Entry {
 630                                    id: db_entry.id as u64,
 631                                    is_dir: db_entry.is_dir,
 632                                    path: db_entry.path,
 633                                    inode: db_entry.inode as u64,
 634                                    mtime: Some(proto::Timestamp {
 635                                        seconds: db_entry.mtime_seconds as u64,
 636                                        nanos: db_entry.mtime_nanos as u32,
 637                                    }),
 638                                    is_symlink: db_entry.is_symlink,
 639                                    is_ignored: db_entry.is_ignored,
 640                                    is_external: db_entry.is_external,
 641                                    git_status: db_entry.git_status.map(|status| status as i32),
 642                                });
 643                            }
 644                        }
 645                    }
 646
 647                    // Repository Entries
 648                    {
 649                        let repository_entry_filter =
 650                            if let Some(rejoined_worktree) = rejoined_worktree {
 651                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 652                            } else {
 653                                worktree_repository::Column::IsDeleted.eq(false)
 654                            };
 655
 656                        let mut db_repositories = worktree_repository::Entity::find()
 657                            .filter(
 658                                Condition::all()
 659                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 660                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 661                                    .add(repository_entry_filter),
 662                            )
 663                            .stream(&*tx)
 664                            .await?;
 665
 666                        while let Some(db_repository) = db_repositories.next().await {
 667                            let db_repository = db_repository?;
 668                            if db_repository.is_deleted {
 669                                worktree
 670                                    .removed_repositories
 671                                    .push(db_repository.work_directory_id as u64);
 672                            } else {
 673                                worktree.updated_repositories.push(proto::RepositoryEntry {
 674                                    work_directory_id: db_repository.work_directory_id as u64,
 675                                    branch: db_repository.branch,
 676                                });
 677                            }
 678                        }
 679                    }
 680
 681                    worktrees.push(worktree);
 682                }
 683
 684                let language_servers = project
 685                    .find_related(language_server::Entity)
 686                    .all(&*tx)
 687                    .await?
 688                    .into_iter()
 689                    .map(|language_server| proto::LanguageServer {
 690                        id: language_server.id as u64,
 691                        name: language_server.name,
 692                    })
 693                    .collect::<Vec<_>>();
 694
 695                {
 696                    let mut db_settings_files = worktree_settings_file::Entity::find()
 697                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 698                        .stream(&*tx)
 699                        .await?;
 700                    while let Some(db_settings_file) = db_settings_files.next().await {
 701                        let db_settings_file = db_settings_file?;
 702                        if let Some(worktree) = worktrees
 703                            .iter_mut()
 704                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 705                        {
 706                            worktree.settings_files.push(WorktreeSettingsFile {
 707                                path: db_settings_file.path,
 708                                content: db_settings_file.content,
 709                            });
 710                        }
 711                    }
 712                }
 713
 714                let mut collaborators = project
 715                    .find_related(project_collaborator::Entity)
 716                    .all(&*tx)
 717                    .await?;
 718                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 719                    .iter()
 720                    .position(|collaborator| collaborator.user_id == user_id)
 721                {
 722                    collaborators.swap_remove(self_collaborator_ix)
 723                } else {
 724                    continue;
 725                };
 726                let old_connection_id = self_collaborator.connection();
 727                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 728                    connection_id: ActiveValue::set(connection.id as i32),
 729                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 730                    ..self_collaborator.into_active_model()
 731                })
 732                .exec(&*tx)
 733                .await?;
 734
 735                let collaborators = collaborators
 736                    .into_iter()
 737                    .map(|collaborator| ProjectCollaborator {
 738                        connection_id: collaborator.connection(),
 739                        user_id: collaborator.user_id,
 740                        replica_id: collaborator.replica_id,
 741                        is_host: collaborator.is_host,
 742                    })
 743                    .collect::<Vec<_>>();
 744
 745                rejoined_projects.push(RejoinedProject {
 746                    id: project_id,
 747                    old_connection_id,
 748                    collaborators,
 749                    worktrees,
 750                    language_servers,
 751                });
 752            }
 753
 754            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 755            let channel_members = if let Some(channel) = &channel {
 756                self.get_channel_participants(&channel, &tx).await?
 757            } else {
 758                Vec::new()
 759            };
 760
 761            Ok(RejoinedRoom {
 762                room,
 763                channel_id: channel.map(|channel| channel.id),
 764                channel_members,
 765                rejoined_projects,
 766                reshared_projects,
 767            })
 768        })
 769        .await
 770    }
 771
 772    pub async fn leave_room(
 773        &self,
 774        connection: ConnectionId,
 775    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 776        self.optional_room_transaction(|tx| async move {
 777            let leaving_participant = room_participant::Entity::find()
 778                .filter(
 779                    Condition::all()
 780                        .add(
 781                            room_participant::Column::AnsweringConnectionId
 782                                .eq(connection.id as i32),
 783                        )
 784                        .add(
 785                            room_participant::Column::AnsweringConnectionServerId
 786                                .eq(connection.owner_id as i32),
 787                        ),
 788                )
 789                .one(&*tx)
 790                .await?;
 791
 792            if let Some(leaving_participant) = leaving_participant {
 793                // Leave room.
 794                let room_id = leaving_participant.room_id;
 795                room_participant::Entity::delete_by_id(leaving_participant.id)
 796                    .exec(&*tx)
 797                    .await?;
 798
 799                // Cancel pending calls initiated by the leaving user.
 800                let called_participants = room_participant::Entity::find()
 801                    .filter(
 802                        Condition::all()
 803                            .add(
 804                                room_participant::Column::CallingUserId
 805                                    .eq(leaving_participant.user_id),
 806                            )
 807                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 808                    )
 809                    .all(&*tx)
 810                    .await?;
 811                room_participant::Entity::delete_many()
 812                    .filter(
 813                        room_participant::Column::Id
 814                            .is_in(called_participants.iter().map(|participant| participant.id)),
 815                    )
 816                    .exec(&*tx)
 817                    .await?;
 818                let canceled_calls_to_user_ids = called_participants
 819                    .into_iter()
 820                    .map(|participant| participant.user_id)
 821                    .collect();
 822
 823                // Detect left projects.
 824                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 825                enum QueryProjectIds {
 826                    ProjectId,
 827                }
 828                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 829                    .select_only()
 830                    .column_as(
 831                        project_collaborator::Column::ProjectId,
 832                        QueryProjectIds::ProjectId,
 833                    )
 834                    .filter(
 835                        Condition::all()
 836                            .add(
 837                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 838                            )
 839                            .add(
 840                                project_collaborator::Column::ConnectionServerId
 841                                    .eq(connection.owner_id as i32),
 842                            ),
 843                    )
 844                    .into_values::<_, QueryProjectIds>()
 845                    .all(&*tx)
 846                    .await?;
 847                let mut left_projects = HashMap::default();
 848                let mut collaborators = project_collaborator::Entity::find()
 849                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 850                    .stream(&*tx)
 851                    .await?;
 852                while let Some(collaborator) = collaborators.next().await {
 853                    let collaborator = collaborator?;
 854                    let left_project =
 855                        left_projects
 856                            .entry(collaborator.project_id)
 857                            .or_insert(LeftProject {
 858                                id: collaborator.project_id,
 859                                host_user_id: Default::default(),
 860                                connection_ids: Default::default(),
 861                                host_connection_id: Default::default(),
 862                            });
 863
 864                    let collaborator_connection_id = collaborator.connection();
 865                    if collaborator_connection_id != connection {
 866                        left_project.connection_ids.push(collaborator_connection_id);
 867                    }
 868
 869                    if collaborator.is_host {
 870                        left_project.host_user_id = collaborator.user_id;
 871                        left_project.host_connection_id = collaborator_connection_id;
 872                    }
 873                }
 874                drop(collaborators);
 875
 876                // Leave projects.
 877                project_collaborator::Entity::delete_many()
 878                    .filter(
 879                        Condition::all()
 880                            .add(
 881                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 882                            )
 883                            .add(
 884                                project_collaborator::Column::ConnectionServerId
 885                                    .eq(connection.owner_id as i32),
 886                            ),
 887                    )
 888                    .exec(&*tx)
 889                    .await?;
 890
 891                follower::Entity::delete_many()
 892                    .filter(
 893                        Condition::all()
 894                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
 895                    )
 896                    .exec(&*tx)
 897                    .await?;
 898
 899                // Unshare projects.
 900                project::Entity::delete_many()
 901                    .filter(
 902                        Condition::all()
 903                            .add(project::Column::RoomId.eq(room_id))
 904                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 905                            .add(
 906                                project::Column::HostConnectionServerId
 907                                    .eq(connection.owner_id as i32),
 908                            ),
 909                    )
 910                    .exec(&*tx)
 911                    .await?;
 912
 913                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 914                let deleted = if room.participants.is_empty() {
 915                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 916                    result.rows_affected > 0
 917                } else {
 918                    false
 919                };
 920
 921                let channel_members = if let Some(channel) = &channel {
 922                    self.get_channel_participants(channel, &tx).await?
 923                } else {
 924                    Vec::new()
 925                };
 926                let left_room = LeftRoom {
 927                    room,
 928                    channel_id: channel.map(|channel| channel.id),
 929                    channel_members,
 930                    left_projects,
 931                    canceled_calls_to_user_ids,
 932                    deleted,
 933                };
 934
 935                if left_room.room.participants.is_empty() {
 936                    self.rooms.remove(&room_id);
 937                }
 938
 939                Ok(Some((room_id, left_room)))
 940            } else {
 941                Ok(None)
 942            }
 943        })
 944        .await
 945    }
 946
 947    pub async fn update_room_participant_location(
 948        &self,
 949        room_id: RoomId,
 950        connection: ConnectionId,
 951        location: proto::ParticipantLocation,
 952    ) -> Result<RoomGuard<proto::Room>> {
 953        self.room_transaction(room_id, |tx| async {
 954            let tx = tx;
 955            let location_kind;
 956            let location_project_id;
 957            match location
 958                .variant
 959                .as_ref()
 960                .ok_or_else(|| anyhow!("invalid location"))?
 961            {
 962                proto::participant_location::Variant::SharedProject(project) => {
 963                    location_kind = 0;
 964                    location_project_id = Some(ProjectId::from_proto(project.id));
 965                }
 966                proto::participant_location::Variant::UnsharedProject(_) => {
 967                    location_kind = 1;
 968                    location_project_id = None;
 969                }
 970                proto::participant_location::Variant::External(_) => {
 971                    location_kind = 2;
 972                    location_project_id = None;
 973                }
 974            }
 975
 976            let result = room_participant::Entity::update_many()
 977                .filter(
 978                    Condition::all()
 979                        .add(room_participant::Column::RoomId.eq(room_id))
 980                        .add(
 981                            room_participant::Column::AnsweringConnectionId
 982                                .eq(connection.id as i32),
 983                        )
 984                        .add(
 985                            room_participant::Column::AnsweringConnectionServerId
 986                                .eq(connection.owner_id as i32),
 987                        ),
 988                )
 989                .set(room_participant::ActiveModel {
 990                    location_kind: ActiveValue::set(Some(location_kind)),
 991                    location_project_id: ActiveValue::set(location_project_id),
 992                    ..Default::default()
 993                })
 994                .exec(&*tx)
 995                .await?;
 996
 997            if result.rows_affected == 1 {
 998                let room = self.get_room(room_id, &tx).await?;
 999                Ok(room)
1000            } else {
1001                Err(anyhow!("could not update room participant location"))?
1002            }
1003        })
1004        .await
1005    }
1006
1007    pub async fn set_room_participant_role(
1008        &self,
1009        admin_id: UserId,
1010        room_id: RoomId,
1011        user_id: UserId,
1012        role: ChannelRole,
1013    ) -> Result<RoomGuard<proto::Room>> {
1014        self.room_transaction(room_id, |tx| async move {
1015            room_participant::Entity::find()
1016                .filter(
1017                    Condition::all()
1018                        .add(room_participant::Column::RoomId.eq(room_id))
1019                        .add(room_participant::Column::UserId.eq(admin_id))
1020                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1021                )
1022                .one(&*tx)
1023                .await?
1024                .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1025
1026            let result = room_participant::Entity::update_many()
1027                .filter(
1028                    Condition::all()
1029                        .add(room_participant::Column::RoomId.eq(room_id))
1030                        .add(room_participant::Column::UserId.eq(user_id)),
1031                )
1032                .set(room_participant::ActiveModel {
1033                    role: ActiveValue::set(Some(ChannelRole::from(role))),
1034                    ..Default::default()
1035                })
1036                .exec(&*tx)
1037                .await?;
1038
1039            if result.rows_affected != 1 {
1040                Err(anyhow!("could not update room participant role"))?;
1041            }
1042            Ok(self.get_room(room_id, &tx).await?)
1043        })
1044        .await
1045    }
1046
1047    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1048        self.transaction(|tx| async move {
1049            self.room_connection_lost(connection, &*tx).await?;
1050            self.channel_buffer_connection_lost(connection, &*tx)
1051                .await?;
1052            self.channel_chat_connection_lost(connection, &*tx).await?;
1053            Ok(())
1054        })
1055        .await
1056    }
1057
1058    pub async fn room_connection_lost(
1059        &self,
1060        connection: ConnectionId,
1061        tx: &DatabaseTransaction,
1062    ) -> Result<()> {
1063        let participant = room_participant::Entity::find()
1064            .filter(
1065                Condition::all()
1066                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1067                    .add(
1068                        room_participant::Column::AnsweringConnectionServerId
1069                            .eq(connection.owner_id as i32),
1070                    ),
1071            )
1072            .one(&*tx)
1073            .await?;
1074
1075        if let Some(participant) = participant {
1076            room_participant::Entity::update(room_participant::ActiveModel {
1077                answering_connection_lost: ActiveValue::set(true),
1078                ..participant.into_active_model()
1079            })
1080            .exec(&*tx)
1081            .await?;
1082        }
1083        Ok(())
1084    }
1085
1086    fn build_incoming_call(
1087        room: &proto::Room,
1088        called_user_id: UserId,
1089    ) -> Option<proto::IncomingCall> {
1090        let pending_participant = room
1091            .pending_participants
1092            .iter()
1093            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1094
1095        Some(proto::IncomingCall {
1096            room_id: room.id,
1097            calling_user_id: pending_participant.calling_user_id,
1098            participant_user_ids: room
1099                .participants
1100                .iter()
1101                .map(|participant| participant.user_id)
1102                .collect(),
1103            initial_project: room.participants.iter().find_map(|participant| {
1104                let initial_project_id = pending_participant.initial_project_id?;
1105                participant
1106                    .projects
1107                    .iter()
1108                    .find(|project| project.id == initial_project_id)
1109                    .cloned()
1110            }),
1111        })
1112    }
1113
1114    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1115        let (_, room) = self.get_channel_room(room_id, tx).await?;
1116        Ok(room)
1117    }
1118
1119    pub async fn room_connection_ids(
1120        &self,
1121        room_id: RoomId,
1122        connection_id: ConnectionId,
1123    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1124        self.room_transaction(room_id, |tx| async move {
1125            let mut participants = room_participant::Entity::find()
1126                .filter(room_participant::Column::RoomId.eq(room_id))
1127                .stream(&*tx)
1128                .await?;
1129
1130            let mut is_participant = false;
1131            let mut connection_ids = HashSet::default();
1132            while let Some(participant) = participants.next().await {
1133                let participant = participant?;
1134                if let Some(answering_connection) = participant.answering_connection() {
1135                    if answering_connection == connection_id {
1136                        is_participant = true;
1137                    } else {
1138                        connection_ids.insert(answering_connection);
1139                    }
1140                }
1141            }
1142
1143            if !is_participant {
1144                Err(anyhow!("not a room participant"))?;
1145            }
1146
1147            Ok(connection_ids)
1148        })
1149        .await
1150    }
1151
1152    async fn get_channel_room(
1153        &self,
1154        room_id: RoomId,
1155        tx: &DatabaseTransaction,
1156    ) -> Result<(Option<channel::Model>, proto::Room)> {
1157        let db_room = room::Entity::find_by_id(room_id)
1158            .one(tx)
1159            .await?
1160            .ok_or_else(|| anyhow!("could not find room"))?;
1161
1162        let mut db_participants = db_room
1163            .find_related(room_participant::Entity)
1164            .stream(tx)
1165            .await?;
1166        let mut participants = HashMap::default();
1167        let mut pending_participants = Vec::new();
1168        while let Some(db_participant) = db_participants.next().await {
1169            let db_participant = db_participant?;
1170            if let (
1171                Some(answering_connection_id),
1172                Some(answering_connection_server_id),
1173                Some(participant_index),
1174            ) = (
1175                db_participant.answering_connection_id,
1176                db_participant.answering_connection_server_id,
1177                db_participant.participant_index,
1178            ) {
1179                let location = match (
1180                    db_participant.location_kind,
1181                    db_participant.location_project_id,
1182                ) {
1183                    (Some(0), Some(project_id)) => {
1184                        Some(proto::participant_location::Variant::SharedProject(
1185                            proto::participant_location::SharedProject {
1186                                id: project_id.to_proto(),
1187                            },
1188                        ))
1189                    }
1190                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1191                        Default::default(),
1192                    )),
1193                    _ => Some(proto::participant_location::Variant::External(
1194                        Default::default(),
1195                    )),
1196                };
1197
1198                let answering_connection = ConnectionId {
1199                    owner_id: answering_connection_server_id.0 as u32,
1200                    id: answering_connection_id as u32,
1201                };
1202                participants.insert(
1203                    answering_connection,
1204                    proto::Participant {
1205                        user_id: db_participant.user_id.to_proto(),
1206                        peer_id: Some(answering_connection.into()),
1207                        projects: Default::default(),
1208                        location: Some(proto::ParticipantLocation { variant: location }),
1209                        participant_index: participant_index as u32,
1210                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1211                    },
1212                );
1213            } else {
1214                pending_participants.push(proto::PendingParticipant {
1215                    user_id: db_participant.user_id.to_proto(),
1216                    calling_user_id: db_participant.calling_user_id.to_proto(),
1217                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1218                });
1219            }
1220        }
1221        drop(db_participants);
1222
1223        let mut db_projects = db_room
1224            .find_related(project::Entity)
1225            .find_with_related(worktree::Entity)
1226            .stream(tx)
1227            .await?;
1228
1229        while let Some(row) = db_projects.next().await {
1230            let (db_project, db_worktree) = row?;
1231            let host_connection = db_project.host_connection()?;
1232            if let Some(participant) = participants.get_mut(&host_connection) {
1233                let project = if let Some(project) = participant
1234                    .projects
1235                    .iter_mut()
1236                    .find(|project| project.id == db_project.id.to_proto())
1237                {
1238                    project
1239                } else {
1240                    participant.projects.push(proto::ParticipantProject {
1241                        id: db_project.id.to_proto(),
1242                        worktree_root_names: Default::default(),
1243                    });
1244                    participant.projects.last_mut().unwrap()
1245                };
1246
1247                if let Some(db_worktree) = db_worktree {
1248                    if db_worktree.visible {
1249                        project.worktree_root_names.push(db_worktree.root_name);
1250                    }
1251                }
1252            }
1253        }
1254        drop(db_projects);
1255
1256        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1257        let mut followers = Vec::new();
1258        while let Some(db_follower) = db_followers.next().await {
1259            let db_follower = db_follower?;
1260            followers.push(proto::Follower {
1261                leader_id: Some(db_follower.leader_connection().into()),
1262                follower_id: Some(db_follower.follower_connection().into()),
1263                project_id: db_follower.project_id.to_proto(),
1264            });
1265        }
1266        drop(db_followers);
1267
1268        let channel = if let Some(channel_id) = db_room.channel_id {
1269            Some(self.get_channel_internal(channel_id, &*tx).await?)
1270        } else {
1271            None
1272        };
1273
1274        Ok((
1275            channel,
1276            proto::Room {
1277                id: db_room.id.to_proto(),
1278                live_kit_room: db_room.live_kit_room,
1279                participants: participants.into_values().collect(),
1280                pending_participants,
1281                followers,
1282            },
1283        ))
1284    }
1285}