rooms.rs

   1use super::*;
   2
   3impl Database {
   4    /// Clears all room participants in rooms attached to a stale server.
   5    pub async fn clear_stale_room_participants(
   6        &self,
   7        room_id: RoomId,
   8        new_server_id: ServerId,
   9    ) -> Result<RoomGuard<RefreshedRoom>> {
  10        self.room_transaction(room_id, |tx| async move {
  11            let stale_participant_filter = Condition::all()
  12                .add(room_participant::Column::RoomId.eq(room_id))
  13                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  14                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  15
  16            let stale_participant_user_ids = room_participant::Entity::find()
  17                .filter(stale_participant_filter.clone())
  18                .all(&*tx)
  19                .await?
  20                .into_iter()
  21                .map(|participant| participant.user_id)
  22                .collect::<Vec<_>>();
  23
  24            // Delete participants who failed to reconnect and cancel their calls.
  25            let mut canceled_calls_to_user_ids = Vec::new();
  26            room_participant::Entity::delete_many()
  27                .filter(stale_participant_filter)
  28                .exec(&*tx)
  29                .await?;
  30            let called_participants = room_participant::Entity::find()
  31                .filter(
  32                    Condition::all()
  33                        .add(
  34                            room_participant::Column::CallingUserId
  35                                .is_in(stale_participant_user_ids.iter().copied()),
  36                        )
  37                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  38                )
  39                .all(&*tx)
  40                .await?;
  41            room_participant::Entity::delete_many()
  42                .filter(
  43                    room_participant::Column::Id
  44                        .is_in(called_participants.iter().map(|participant| participant.id)),
  45                )
  46                .exec(&*tx)
  47                .await?;
  48            canceled_calls_to_user_ids.extend(
  49                called_participants
  50                    .into_iter()
  51                    .map(|participant| participant.user_id),
  52            );
  53
  54            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  55            let channel_members;
  56            if let Some(channel) = &channel {
  57                channel_members = self.get_channel_participants(channel, &tx).await?;
  58            } else {
  59                channel_members = Vec::new();
  60
  61                // Delete the room if it becomes empty.
  62                if room.participants.is_empty() {
  63                    project::Entity::delete_many()
  64                        .filter(project::Column::RoomId.eq(room_id))
  65                        .exec(&*tx)
  66                        .await?;
  67                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  68                }
  69            };
  70
  71            Ok(RefreshedRoom {
  72                room,
  73                channel_id: channel.map(|channel| channel.id),
  74                channel_members,
  75                stale_participant_user_ids,
  76                canceled_calls_to_user_ids,
  77            })
  78        })
  79        .await
  80    }
  81
  82    /// Returns the incoming calls for user with the given ID.
  83    pub async fn incoming_call_for_user(
  84        &self,
  85        user_id: UserId,
  86    ) -> Result<Option<proto::IncomingCall>> {
  87        self.transaction(|tx| async move {
  88            let pending_participant = room_participant::Entity::find()
  89                .filter(
  90                    room_participant::Column::UserId
  91                        .eq(user_id)
  92                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  93                )
  94                .one(&*tx)
  95                .await?;
  96
  97            if let Some(pending_participant) = pending_participant {
  98                let room = self.get_room(pending_participant.room_id, &tx).await?;
  99                Ok(Self::build_incoming_call(&room, user_id))
 100            } else {
 101                Ok(None)
 102            }
 103        })
 104        .await
 105    }
 106
 107    /// Creates a new room.
 108    pub async fn create_room(
 109        &self,
 110        user_id: UserId,
 111        connection: ConnectionId,
 112        live_kit_room: &str,
 113        release_channel: &str,
 114    ) -> Result<proto::Room> {
 115        self.transaction(|tx| async move {
 116            let room = room::ActiveModel {
 117                live_kit_room: ActiveValue::set(live_kit_room.into()),
 118                environment: ActiveValue::set(Some(release_channel.to_string())),
 119                ..Default::default()
 120            }
 121            .insert(&*tx)
 122            .await?;
 123            room_participant::ActiveModel {
 124                room_id: ActiveValue::set(room.id),
 125                user_id: ActiveValue::set(user_id),
 126                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 127                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 128                    connection.owner_id as i32,
 129                ))),
 130                answering_connection_lost: ActiveValue::set(false),
 131                calling_user_id: ActiveValue::set(user_id),
 132                calling_connection_id: ActiveValue::set(connection.id as i32),
 133                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 134                    connection.owner_id as i32,
 135                ))),
 136                participant_index: ActiveValue::set(Some(0)),
 137                role: ActiveValue::set(Some(ChannelRole::Admin)),
 138
 139                id: ActiveValue::NotSet,
 140                location_kind: ActiveValue::NotSet,
 141                location_project_id: ActiveValue::NotSet,
 142                initial_project_id: ActiveValue::NotSet,
 143            }
 144            .insert(&*tx)
 145            .await?;
 146
 147            let room = self.get_room(room.id, &tx).await?;
 148            Ok(room)
 149        })
 150        .await
 151    }
 152
 153    pub async fn call(
 154        &self,
 155        room_id: RoomId,
 156        calling_user_id: UserId,
 157        calling_connection: ConnectionId,
 158        called_user_id: UserId,
 159        initial_project_id: Option<ProjectId>,
 160    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 161        self.room_transaction(room_id, |tx| async move {
 162            let caller = room_participant::Entity::find()
 163                .filter(
 164                    room_participant::Column::UserId
 165                        .eq(calling_user_id)
 166                        .and(room_participant::Column::RoomId.eq(room_id)),
 167                )
 168                .one(&*tx)
 169                .await?
 170                .ok_or_else(|| anyhow!("user is not in the room"))?;
 171
 172            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 173                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 174                ChannelRole::Guest => ChannelRole::Guest,
 175                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 176            };
 177
 178            room_participant::ActiveModel {
 179                room_id: ActiveValue::set(room_id),
 180                user_id: ActiveValue::set(called_user_id),
 181                answering_connection_lost: ActiveValue::set(false),
 182                participant_index: ActiveValue::NotSet,
 183                calling_user_id: ActiveValue::set(calling_user_id),
 184                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 185                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 186                    calling_connection.owner_id as i32,
 187                ))),
 188                initial_project_id: ActiveValue::set(initial_project_id),
 189                role: ActiveValue::set(Some(called_user_role)),
 190
 191                id: ActiveValue::NotSet,
 192                answering_connection_id: ActiveValue::NotSet,
 193                answering_connection_server_id: ActiveValue::NotSet,
 194                location_kind: ActiveValue::NotSet,
 195                location_project_id: ActiveValue::NotSet,
 196            }
 197            .insert(&*tx)
 198            .await?;
 199
 200            let room = self.get_room(room_id, &tx).await?;
 201            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 202                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 203            Ok((room, incoming_call))
 204        })
 205        .await
 206    }
 207
 208    pub async fn call_failed(
 209        &self,
 210        room_id: RoomId,
 211        called_user_id: UserId,
 212    ) -> Result<RoomGuard<proto::Room>> {
 213        self.room_transaction(room_id, |tx| async move {
 214            room_participant::Entity::delete_many()
 215                .filter(
 216                    room_participant::Column::RoomId
 217                        .eq(room_id)
 218                        .and(room_participant::Column::UserId.eq(called_user_id)),
 219                )
 220                .exec(&*tx)
 221                .await?;
 222            let room = self.get_room(room_id, &tx).await?;
 223            Ok(room)
 224        })
 225        .await
 226    }
 227
 228    pub async fn decline_call(
 229        &self,
 230        expected_room_id: Option<RoomId>,
 231        user_id: UserId,
 232    ) -> Result<Option<RoomGuard<proto::Room>>> {
 233        self.optional_room_transaction(|tx| async move {
 234            let mut filter = Condition::all()
 235                .add(room_participant::Column::UserId.eq(user_id))
 236                .add(room_participant::Column::AnsweringConnectionId.is_null());
 237            if let Some(room_id) = expected_room_id {
 238                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 239            }
 240            let participant = room_participant::Entity::find()
 241                .filter(filter)
 242                .one(&*tx)
 243                .await?;
 244
 245            let participant = if let Some(participant) = participant {
 246                participant
 247            } else if expected_room_id.is_some() {
 248                return Err(anyhow!("could not find call to decline"))?;
 249            } else {
 250                return Ok(None);
 251            };
 252
 253            let room_id = participant.room_id;
 254            room_participant::Entity::delete(participant.into_active_model())
 255                .exec(&*tx)
 256                .await?;
 257
 258            let room = self.get_room(room_id, &tx).await?;
 259            Ok(Some((room_id, room)))
 260        })
 261        .await
 262    }
 263
 264    pub async fn cancel_call(
 265        &self,
 266        room_id: RoomId,
 267        calling_connection: ConnectionId,
 268        called_user_id: UserId,
 269    ) -> Result<RoomGuard<proto::Room>> {
 270        self.room_transaction(room_id, |tx| async move {
 271            let participant = room_participant::Entity::find()
 272                .filter(
 273                    Condition::all()
 274                        .add(room_participant::Column::UserId.eq(called_user_id))
 275                        .add(room_participant::Column::RoomId.eq(room_id))
 276                        .add(
 277                            room_participant::Column::CallingConnectionId
 278                                .eq(calling_connection.id as i32),
 279                        )
 280                        .add(
 281                            room_participant::Column::CallingConnectionServerId
 282                                .eq(calling_connection.owner_id as i32),
 283                        )
 284                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 285                )
 286                .one(&*tx)
 287                .await?
 288                .ok_or_else(|| anyhow!("no call to cancel"))?;
 289
 290            room_participant::Entity::delete(participant.into_active_model())
 291                .exec(&*tx)
 292                .await?;
 293
 294            let room = self.get_room(room_id, &tx).await?;
 295            Ok(room)
 296        })
 297        .await
 298    }
 299
 300    pub async fn join_room(
 301        &self,
 302        room_id: RoomId,
 303        user_id: UserId,
 304        connection: ConnectionId,
 305        environment: &str,
 306    ) -> Result<RoomGuard<JoinRoom>> {
 307        self.room_transaction(room_id, |tx| async move {
 308            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 309            enum QueryChannelIdAndEnvironment {
 310                ChannelId,
 311                Environment,
 312            }
 313
 314            let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
 315                room::Entity::find()
 316                    .select_only()
 317                    .column(room::Column::ChannelId)
 318                    .column(room::Column::Environment)
 319                    .filter(room::Column::Id.eq(room_id))
 320                    .into_values::<_, QueryChannelIdAndEnvironment>()
 321                    .one(&*tx)
 322                    .await?
 323                    .ok_or_else(|| anyhow!("no such room"))?;
 324
 325            if let Some(release_channel) = release_channel {
 326                if &release_channel != environment {
 327                    Err(anyhow!("must join using the {} release", release_channel))?;
 328                }
 329            }
 330
 331            if channel_id.is_some() {
 332                Err(anyhow!("tried to join channel call directly"))?
 333            }
 334
 335            let participant_index = self
 336                .get_next_participant_index_internal(room_id, &*tx)
 337                .await?;
 338
 339            let result = room_participant::Entity::update_many()
 340                .filter(
 341                    Condition::all()
 342                        .add(room_participant::Column::RoomId.eq(room_id))
 343                        .add(room_participant::Column::UserId.eq(user_id))
 344                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 345                )
 346                .set(room_participant::ActiveModel {
 347                    participant_index: ActiveValue::Set(Some(participant_index)),
 348                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 349                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 350                        connection.owner_id as i32,
 351                    ))),
 352                    answering_connection_lost: ActiveValue::set(false),
 353                    ..Default::default()
 354                })
 355                .exec(&*tx)
 356                .await?;
 357            if result.rows_affected == 0 {
 358                Err(anyhow!("room does not exist or was already joined"))?;
 359            }
 360
 361            let room = self.get_room(room_id, &tx).await?;
 362            Ok(JoinRoom {
 363                room,
 364                channel_id: None,
 365                channel_members: vec![],
 366            })
 367        })
 368        .await
 369    }
 370
 371    async fn get_next_participant_index_internal(
 372        &self,
 373        room_id: RoomId,
 374        tx: &DatabaseTransaction,
 375    ) -> Result<i32> {
 376        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 377        enum QueryParticipantIndices {
 378            ParticipantIndex,
 379        }
 380        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 381            .filter(
 382                room_participant::Column::RoomId
 383                    .eq(room_id)
 384                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 385            )
 386            .select_only()
 387            .column(room_participant::Column::ParticipantIndex)
 388            .into_values::<_, QueryParticipantIndices>()
 389            .all(&*tx)
 390            .await?;
 391
 392        let mut participant_index = 0;
 393        while existing_participant_indices.contains(&participant_index) {
 394            participant_index += 1;
 395        }
 396
 397        Ok(participant_index)
 398    }
 399
 400    /// Returns the channel ID for the given room, if it has one.
 401    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 402        self.transaction(|tx| async move {
 403            let room: Option<room::Model> = room::Entity::find()
 404                .filter(room::Column::Id.eq(room_id))
 405                .one(&*tx)
 406                .await?;
 407
 408            Ok(room.and_then(|room| room.channel_id))
 409        })
 410        .await
 411    }
 412
 413    pub(crate) async fn join_channel_room_internal(
 414        &self,
 415        room_id: RoomId,
 416        user_id: UserId,
 417        connection: ConnectionId,
 418        role: ChannelRole,
 419        tx: &DatabaseTransaction,
 420    ) -> Result<JoinRoom> {
 421        let participant_index = self
 422            .get_next_participant_index_internal(room_id, &*tx)
 423            .await?;
 424
 425        room_participant::Entity::insert_many([room_participant::ActiveModel {
 426            room_id: ActiveValue::set(room_id),
 427            user_id: ActiveValue::set(user_id),
 428            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 429            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 430                connection.owner_id as i32,
 431            ))),
 432            answering_connection_lost: ActiveValue::set(false),
 433            calling_user_id: ActiveValue::set(user_id),
 434            calling_connection_id: ActiveValue::set(connection.id as i32),
 435            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 436                connection.owner_id as i32,
 437            ))),
 438            participant_index: ActiveValue::Set(Some(participant_index)),
 439            role: ActiveValue::set(Some(role)),
 440            id: ActiveValue::NotSet,
 441            location_kind: ActiveValue::NotSet,
 442            location_project_id: ActiveValue::NotSet,
 443            initial_project_id: ActiveValue::NotSet,
 444        }])
 445        .on_conflict(
 446            OnConflict::columns([room_participant::Column::UserId])
 447                .update_columns([
 448                    room_participant::Column::AnsweringConnectionId,
 449                    room_participant::Column::AnsweringConnectionServerId,
 450                    room_participant::Column::AnsweringConnectionLost,
 451                    room_participant::Column::ParticipantIndex,
 452                    room_participant::Column::Role,
 453                ])
 454                .to_owned(),
 455        )
 456        .exec(&*tx)
 457        .await?;
 458
 459        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 460        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 461        let channel_members = self.get_channel_participants(&channel, &*tx).await?;
 462        Ok(JoinRoom {
 463            room,
 464            channel_id: Some(channel.id),
 465            channel_members,
 466        })
 467    }
 468
 469    pub async fn rejoin_room(
 470        &self,
 471        rejoin_room: proto::RejoinRoom,
 472        user_id: UserId,
 473        connection: ConnectionId,
 474    ) -> Result<RoomGuard<RejoinedRoom>> {
 475        let room_id = RoomId::from_proto(rejoin_room.id);
 476        self.room_transaction(room_id, |tx| async {
 477            let tx = tx;
 478            let participant_update = room_participant::Entity::update_many()
 479                .filter(
 480                    Condition::all()
 481                        .add(room_participant::Column::RoomId.eq(room_id))
 482                        .add(room_participant::Column::UserId.eq(user_id))
 483                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 484                        .add(
 485                            Condition::any()
 486                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 487                                .add(
 488                                    room_participant::Column::AnsweringConnectionServerId
 489                                        .ne(connection.owner_id as i32),
 490                                ),
 491                        ),
 492                )
 493                .set(room_participant::ActiveModel {
 494                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 495                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 496                        connection.owner_id as i32,
 497                    ))),
 498                    answering_connection_lost: ActiveValue::set(false),
 499                    ..Default::default()
 500                })
 501                .exec(&*tx)
 502                .await?;
 503            if participant_update.rows_affected == 0 {
 504                return Err(anyhow!("room does not exist or was already joined"))?;
 505            }
 506
 507            let mut reshared_projects = Vec::new();
 508            for reshared_project in &rejoin_room.reshared_projects {
 509                let project_id = ProjectId::from_proto(reshared_project.project_id);
 510                let project = project::Entity::find_by_id(project_id)
 511                    .one(&*tx)
 512                    .await?
 513                    .ok_or_else(|| anyhow!("project does not exist"))?;
 514                if project.host_user_id != user_id {
 515                    return Err(anyhow!("no such project"))?;
 516                }
 517
 518                let mut collaborators = project
 519                    .find_related(project_collaborator::Entity)
 520                    .all(&*tx)
 521                    .await?;
 522                let host_ix = collaborators
 523                    .iter()
 524                    .position(|collaborator| {
 525                        collaborator.user_id == user_id && collaborator.is_host
 526                    })
 527                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 528                let host = collaborators.swap_remove(host_ix);
 529                let old_connection_id = host.connection();
 530
 531                project::Entity::update(project::ActiveModel {
 532                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 533                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 534                        connection.owner_id as i32,
 535                    ))),
 536                    ..project.into_active_model()
 537                })
 538                .exec(&*tx)
 539                .await?;
 540                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 541                    connection_id: ActiveValue::set(connection.id as i32),
 542                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 543                    ..host.into_active_model()
 544                })
 545                .exec(&*tx)
 546                .await?;
 547
 548                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 549                    .await?;
 550
 551                reshared_projects.push(ResharedProject {
 552                    id: project_id,
 553                    old_connection_id,
 554                    collaborators: collaborators
 555                        .iter()
 556                        .map(|collaborator| ProjectCollaborator {
 557                            connection_id: collaborator.connection(),
 558                            user_id: collaborator.user_id,
 559                            replica_id: collaborator.replica_id,
 560                            is_host: collaborator.is_host,
 561                        })
 562                        .collect(),
 563                    worktrees: reshared_project.worktrees.clone(),
 564                });
 565            }
 566
 567            project::Entity::delete_many()
 568                .filter(
 569                    Condition::all()
 570                        .add(project::Column::RoomId.eq(room_id))
 571                        .add(project::Column::HostUserId.eq(user_id))
 572                        .add(
 573                            project::Column::Id
 574                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 575                        ),
 576                )
 577                .exec(&*tx)
 578                .await?;
 579
 580            let mut rejoined_projects = Vec::new();
 581            for rejoined_project in &rejoin_room.rejoined_projects {
 582                let project_id = ProjectId::from_proto(rejoined_project.id);
 583                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 584                    continue;
 585                };
 586
 587                let mut worktrees = Vec::new();
 588                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 589                for db_worktree in db_worktrees {
 590                    let mut worktree = RejoinedWorktree {
 591                        id: db_worktree.id as u64,
 592                        abs_path: db_worktree.abs_path,
 593                        root_name: db_worktree.root_name,
 594                        visible: db_worktree.visible,
 595                        updated_entries: Default::default(),
 596                        removed_entries: Default::default(),
 597                        updated_repositories: Default::default(),
 598                        removed_repositories: Default::default(),
 599                        diagnostic_summaries: Default::default(),
 600                        settings_files: Default::default(),
 601                        scan_id: db_worktree.scan_id as u64,
 602                        completed_scan_id: db_worktree.completed_scan_id as u64,
 603                    };
 604
 605                    let rejoined_worktree = rejoined_project
 606                        .worktrees
 607                        .iter()
 608                        .find(|worktree| worktree.id == db_worktree.id as u64);
 609
 610                    // File entries
 611                    {
 612                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 613                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 614                        } else {
 615                            worktree_entry::Column::IsDeleted.eq(false)
 616                        };
 617
 618                        let mut db_entries = worktree_entry::Entity::find()
 619                            .filter(
 620                                Condition::all()
 621                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 622                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 623                                    .add(entry_filter),
 624                            )
 625                            .stream(&*tx)
 626                            .await?;
 627
 628                        while let Some(db_entry) = db_entries.next().await {
 629                            let db_entry = db_entry?;
 630                            if db_entry.is_deleted {
 631                                worktree.removed_entries.push(db_entry.id as u64);
 632                            } else {
 633                                worktree.updated_entries.push(proto::Entry {
 634                                    id: db_entry.id as u64,
 635                                    is_dir: db_entry.is_dir,
 636                                    path: db_entry.path,
 637                                    inode: db_entry.inode as u64,
 638                                    mtime: Some(proto::Timestamp {
 639                                        seconds: db_entry.mtime_seconds as u64,
 640                                        nanos: db_entry.mtime_nanos as u32,
 641                                    }),
 642                                    is_symlink: db_entry.is_symlink,
 643                                    is_ignored: db_entry.is_ignored,
 644                                    is_external: db_entry.is_external,
 645                                    git_status: db_entry.git_status.map(|status| status as i32),
 646                                });
 647                            }
 648                        }
 649                    }
 650
 651                    // Repository Entries
 652                    {
 653                        let repository_entry_filter =
 654                            if let Some(rejoined_worktree) = rejoined_worktree {
 655                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 656                            } else {
 657                                worktree_repository::Column::IsDeleted.eq(false)
 658                            };
 659
 660                        let mut db_repositories = worktree_repository::Entity::find()
 661                            .filter(
 662                                Condition::all()
 663                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 664                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 665                                    .add(repository_entry_filter),
 666                            )
 667                            .stream(&*tx)
 668                            .await?;
 669
 670                        while let Some(db_repository) = db_repositories.next().await {
 671                            let db_repository = db_repository?;
 672                            if db_repository.is_deleted {
 673                                worktree
 674                                    .removed_repositories
 675                                    .push(db_repository.work_directory_id as u64);
 676                            } else {
 677                                worktree.updated_repositories.push(proto::RepositoryEntry {
 678                                    work_directory_id: db_repository.work_directory_id as u64,
 679                                    branch: db_repository.branch,
 680                                });
 681                            }
 682                        }
 683                    }
 684
 685                    worktrees.push(worktree);
 686                }
 687
 688                let language_servers = project
 689                    .find_related(language_server::Entity)
 690                    .all(&*tx)
 691                    .await?
 692                    .into_iter()
 693                    .map(|language_server| proto::LanguageServer {
 694                        id: language_server.id as u64,
 695                        name: language_server.name,
 696                    })
 697                    .collect::<Vec<_>>();
 698
 699                {
 700                    let mut db_settings_files = worktree_settings_file::Entity::find()
 701                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 702                        .stream(&*tx)
 703                        .await?;
 704                    while let Some(db_settings_file) = db_settings_files.next().await {
 705                        let db_settings_file = db_settings_file?;
 706                        if let Some(worktree) = worktrees
 707                            .iter_mut()
 708                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 709                        {
 710                            worktree.settings_files.push(WorktreeSettingsFile {
 711                                path: db_settings_file.path,
 712                                content: db_settings_file.content,
 713                            });
 714                        }
 715                    }
 716                }
 717
 718                let mut collaborators = project
 719                    .find_related(project_collaborator::Entity)
 720                    .all(&*tx)
 721                    .await?;
 722                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 723                    .iter()
 724                    .position(|collaborator| collaborator.user_id == user_id)
 725                {
 726                    collaborators.swap_remove(self_collaborator_ix)
 727                } else {
 728                    continue;
 729                };
 730                let old_connection_id = self_collaborator.connection();
 731                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 732                    connection_id: ActiveValue::set(connection.id as i32),
 733                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 734                    ..self_collaborator.into_active_model()
 735                })
 736                .exec(&*tx)
 737                .await?;
 738
 739                let collaborators = collaborators
 740                    .into_iter()
 741                    .map(|collaborator| ProjectCollaborator {
 742                        connection_id: collaborator.connection(),
 743                        user_id: collaborator.user_id,
 744                        replica_id: collaborator.replica_id,
 745                        is_host: collaborator.is_host,
 746                    })
 747                    .collect::<Vec<_>>();
 748
 749                rejoined_projects.push(RejoinedProject {
 750                    id: project_id,
 751                    old_connection_id,
 752                    collaborators,
 753                    worktrees,
 754                    language_servers,
 755                });
 756            }
 757
 758            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 759            let channel_members = if let Some(channel) = &channel {
 760                self.get_channel_participants(&channel, &tx).await?
 761            } else {
 762                Vec::new()
 763            };
 764
 765            Ok(RejoinedRoom {
 766                room,
 767                channel_id: channel.map(|channel| channel.id),
 768                channel_members,
 769                rejoined_projects,
 770                reshared_projects,
 771            })
 772        })
 773        .await
 774    }
 775
 776    pub async fn leave_room(
 777        &self,
 778        connection: ConnectionId,
 779    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 780        self.optional_room_transaction(|tx| async move {
 781            let leaving_participant = room_participant::Entity::find()
 782                .filter(
 783                    Condition::all()
 784                        .add(
 785                            room_participant::Column::AnsweringConnectionId
 786                                .eq(connection.id as i32),
 787                        )
 788                        .add(
 789                            room_participant::Column::AnsweringConnectionServerId
 790                                .eq(connection.owner_id as i32),
 791                        ),
 792                )
 793                .one(&*tx)
 794                .await?;
 795
 796            if let Some(leaving_participant) = leaving_participant {
 797                // Leave room.
 798                let room_id = leaving_participant.room_id;
 799                room_participant::Entity::delete_by_id(leaving_participant.id)
 800                    .exec(&*tx)
 801                    .await?;
 802
 803                // Cancel pending calls initiated by the leaving user.
 804                let called_participants = room_participant::Entity::find()
 805                    .filter(
 806                        Condition::all()
 807                            .add(
 808                                room_participant::Column::CallingUserId
 809                                    .eq(leaving_participant.user_id),
 810                            )
 811                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 812                    )
 813                    .all(&*tx)
 814                    .await?;
 815                room_participant::Entity::delete_many()
 816                    .filter(
 817                        room_participant::Column::Id
 818                            .is_in(called_participants.iter().map(|participant| participant.id)),
 819                    )
 820                    .exec(&*tx)
 821                    .await?;
 822                let canceled_calls_to_user_ids = called_participants
 823                    .into_iter()
 824                    .map(|participant| participant.user_id)
 825                    .collect();
 826
 827                // Detect left projects.
 828                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 829                enum QueryProjectIds {
 830                    ProjectId,
 831                }
 832                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 833                    .select_only()
 834                    .column_as(
 835                        project_collaborator::Column::ProjectId,
 836                        QueryProjectIds::ProjectId,
 837                    )
 838                    .filter(
 839                        Condition::all()
 840                            .add(
 841                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 842                            )
 843                            .add(
 844                                project_collaborator::Column::ConnectionServerId
 845                                    .eq(connection.owner_id as i32),
 846                            ),
 847                    )
 848                    .into_values::<_, QueryProjectIds>()
 849                    .all(&*tx)
 850                    .await?;
 851                let mut left_projects = HashMap::default();
 852                let mut collaborators = project_collaborator::Entity::find()
 853                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 854                    .stream(&*tx)
 855                    .await?;
 856                while let Some(collaborator) = collaborators.next().await {
 857                    let collaborator = collaborator?;
 858                    let left_project =
 859                        left_projects
 860                            .entry(collaborator.project_id)
 861                            .or_insert(LeftProject {
 862                                id: collaborator.project_id,
 863                                host_user_id: Default::default(),
 864                                connection_ids: Default::default(),
 865                                host_connection_id: Default::default(),
 866                            });
 867
 868                    let collaborator_connection_id = collaborator.connection();
 869                    if collaborator_connection_id != connection {
 870                        left_project.connection_ids.push(collaborator_connection_id);
 871                    }
 872
 873                    if collaborator.is_host {
 874                        left_project.host_user_id = collaborator.user_id;
 875                        left_project.host_connection_id = collaborator_connection_id;
 876                    }
 877                }
 878                drop(collaborators);
 879
 880                // Leave projects.
 881                project_collaborator::Entity::delete_many()
 882                    .filter(
 883                        Condition::all()
 884                            .add(
 885                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 886                            )
 887                            .add(
 888                                project_collaborator::Column::ConnectionServerId
 889                                    .eq(connection.owner_id as i32),
 890                            ),
 891                    )
 892                    .exec(&*tx)
 893                    .await?;
 894
 895                follower::Entity::delete_many()
 896                    .filter(
 897                        Condition::all()
 898                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
 899                    )
 900                    .exec(&*tx)
 901                    .await?;
 902
 903                // Unshare projects.
 904                project::Entity::delete_many()
 905                    .filter(
 906                        Condition::all()
 907                            .add(project::Column::RoomId.eq(room_id))
 908                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 909                            .add(
 910                                project::Column::HostConnectionServerId
 911                                    .eq(connection.owner_id as i32),
 912                            ),
 913                    )
 914                    .exec(&*tx)
 915                    .await?;
 916
 917                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 918                let deleted = if room.participants.is_empty() {
 919                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 920                    result.rows_affected > 0
 921                } else {
 922                    false
 923                };
 924
 925                let channel_members = if let Some(channel) = &channel {
 926                    self.get_channel_participants(channel, &tx).await?
 927                } else {
 928                    Vec::new()
 929                };
 930                let left_room = LeftRoom {
 931                    room,
 932                    channel_id: channel.map(|channel| channel.id),
 933                    channel_members,
 934                    left_projects,
 935                    canceled_calls_to_user_ids,
 936                    deleted,
 937                };
 938
 939                if left_room.room.participants.is_empty() {
 940                    self.rooms.remove(&room_id);
 941                }
 942
 943                Ok(Some((room_id, left_room)))
 944            } else {
 945                Ok(None)
 946            }
 947        })
 948        .await
 949    }
 950
 951    /// Updates the location of a participant in the given room.
 952    pub async fn update_room_participant_location(
 953        &self,
 954        room_id: RoomId,
 955        connection: ConnectionId,
 956        location: proto::ParticipantLocation,
 957    ) -> Result<RoomGuard<proto::Room>> {
 958        self.room_transaction(room_id, |tx| async {
 959            let tx = tx;
 960            let location_kind;
 961            let location_project_id;
 962            match location
 963                .variant
 964                .as_ref()
 965                .ok_or_else(|| anyhow!("invalid location"))?
 966            {
 967                proto::participant_location::Variant::SharedProject(project) => {
 968                    location_kind = 0;
 969                    location_project_id = Some(ProjectId::from_proto(project.id));
 970                }
 971                proto::participant_location::Variant::UnsharedProject(_) => {
 972                    location_kind = 1;
 973                    location_project_id = None;
 974                }
 975                proto::participant_location::Variant::External(_) => {
 976                    location_kind = 2;
 977                    location_project_id = None;
 978                }
 979            }
 980
 981            let result = room_participant::Entity::update_many()
 982                .filter(
 983                    Condition::all()
 984                        .add(room_participant::Column::RoomId.eq(room_id))
 985                        .add(
 986                            room_participant::Column::AnsweringConnectionId
 987                                .eq(connection.id as i32),
 988                        )
 989                        .add(
 990                            room_participant::Column::AnsweringConnectionServerId
 991                                .eq(connection.owner_id as i32),
 992                        ),
 993                )
 994                .set(room_participant::ActiveModel {
 995                    location_kind: ActiveValue::set(Some(location_kind)),
 996                    location_project_id: ActiveValue::set(location_project_id),
 997                    ..Default::default()
 998                })
 999                .exec(&*tx)
1000                .await?;
1001
1002            if result.rows_affected == 1 {
1003                let room = self.get_room(room_id, &tx).await?;
1004                Ok(room)
1005            } else {
1006                Err(anyhow!("could not update room participant location"))?
1007            }
1008        })
1009        .await
1010    }
1011
1012    /// Sets the role of a participant in the given room.
1013    pub async fn set_room_participant_role(
1014        &self,
1015        admin_id: UserId,
1016        room_id: RoomId,
1017        user_id: UserId,
1018        role: ChannelRole,
1019    ) -> Result<RoomGuard<proto::Room>> {
1020        self.room_transaction(room_id, |tx| async move {
1021            room_participant::Entity::find()
1022                .filter(
1023                    Condition::all()
1024                        .add(room_participant::Column::RoomId.eq(room_id))
1025                        .add(room_participant::Column::UserId.eq(admin_id))
1026                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1027                )
1028                .one(&*tx)
1029                .await?
1030                .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1031
1032            if role.requires_cla() {
1033                self.check_user_has_signed_cla(user_id, room_id, &*tx)
1034                    .await?;
1035            }
1036
1037            let result = room_participant::Entity::update_many()
1038                .filter(
1039                    Condition::all()
1040                        .add(room_participant::Column::RoomId.eq(room_id))
1041                        .add(room_participant::Column::UserId.eq(user_id)),
1042                )
1043                .set(room_participant::ActiveModel {
1044                    role: ActiveValue::set(Some(ChannelRole::from(role))),
1045                    ..Default::default()
1046                })
1047                .exec(&*tx)
1048                .await?;
1049
1050            if result.rows_affected != 1 {
1051                Err(anyhow!("could not update room participant role"))?;
1052            }
1053            Ok(self.get_room(room_id, &tx).await?)
1054        })
1055        .await
1056    }
1057
1058    async fn check_user_has_signed_cla(
1059        &self,
1060        user_id: UserId,
1061        room_id: RoomId,
1062        tx: &DatabaseTransaction,
1063    ) -> Result<()> {
1064        let channel = room::Entity::find_by_id(room_id)
1065            .one(&*tx)
1066            .await?
1067            .ok_or_else(|| anyhow!("could not find room"))?
1068            .find_related(channel::Entity)
1069            .one(&*tx)
1070            .await?;
1071
1072        if let Some(channel) = channel {
1073            let requires_zed_cla = channel.requires_zed_cla
1074                || channel::Entity::find()
1075                    .filter(
1076                        channel::Column::Id
1077                            .is_in(channel.ancestors())
1078                            .and(channel::Column::RequiresZedCla.eq(true)),
1079                    )
1080                    .count(&*tx)
1081                    .await?
1082                    > 0;
1083            if requires_zed_cla {
1084                if contributor::Entity::find()
1085                    .filter(contributor::Column::UserId.eq(user_id))
1086                    .one(&*tx)
1087                    .await?
1088                    .is_none()
1089                {
1090                    Err(anyhow!("user has not signed the Zed CLA"))?;
1091                }
1092            }
1093        }
1094        Ok(())
1095    }
1096
1097    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1098        self.transaction(|tx| async move {
1099            self.room_connection_lost(connection, &*tx).await?;
1100            self.channel_buffer_connection_lost(connection, &*tx)
1101                .await?;
1102            self.channel_chat_connection_lost(connection, &*tx).await?;
1103            Ok(())
1104        })
1105        .await
1106    }
1107
1108    pub async fn room_connection_lost(
1109        &self,
1110        connection: ConnectionId,
1111        tx: &DatabaseTransaction,
1112    ) -> Result<()> {
1113        let participant = room_participant::Entity::find()
1114            .filter(
1115                Condition::all()
1116                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1117                    .add(
1118                        room_participant::Column::AnsweringConnectionServerId
1119                            .eq(connection.owner_id as i32),
1120                    ),
1121            )
1122            .one(&*tx)
1123            .await?;
1124
1125        if let Some(participant) = participant {
1126            room_participant::Entity::update(room_participant::ActiveModel {
1127                answering_connection_lost: ActiveValue::set(true),
1128                ..participant.into_active_model()
1129            })
1130            .exec(&*tx)
1131            .await?;
1132        }
1133        Ok(())
1134    }
1135
1136    fn build_incoming_call(
1137        room: &proto::Room,
1138        called_user_id: UserId,
1139    ) -> Option<proto::IncomingCall> {
1140        let pending_participant = room
1141            .pending_participants
1142            .iter()
1143            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1144
1145        Some(proto::IncomingCall {
1146            room_id: room.id,
1147            calling_user_id: pending_participant.calling_user_id,
1148            participant_user_ids: room
1149                .participants
1150                .iter()
1151                .map(|participant| participant.user_id)
1152                .collect(),
1153            initial_project: room.participants.iter().find_map(|participant| {
1154                let initial_project_id = pending_participant.initial_project_id?;
1155                participant
1156                    .projects
1157                    .iter()
1158                    .find(|project| project.id == initial_project_id)
1159                    .cloned()
1160            }),
1161        })
1162    }
1163
1164    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1165        let (_, room) = self.get_channel_room(room_id, tx).await?;
1166        Ok(room)
1167    }
1168
1169    pub async fn room_connection_ids(
1170        &self,
1171        room_id: RoomId,
1172        connection_id: ConnectionId,
1173    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1174        self.room_transaction(room_id, |tx| async move {
1175            let mut participants = room_participant::Entity::find()
1176                .filter(room_participant::Column::RoomId.eq(room_id))
1177                .stream(&*tx)
1178                .await?;
1179
1180            let mut is_participant = false;
1181            let mut connection_ids = HashSet::default();
1182            while let Some(participant) = participants.next().await {
1183                let participant = participant?;
1184                if let Some(answering_connection) = participant.answering_connection() {
1185                    if answering_connection == connection_id {
1186                        is_participant = true;
1187                    } else {
1188                        connection_ids.insert(answering_connection);
1189                    }
1190                }
1191            }
1192
1193            if !is_participant {
1194                Err(anyhow!("not a room participant"))?;
1195            }
1196
1197            Ok(connection_ids)
1198        })
1199        .await
1200    }
1201
1202    async fn get_channel_room(
1203        &self,
1204        room_id: RoomId,
1205        tx: &DatabaseTransaction,
1206    ) -> Result<(Option<channel::Model>, proto::Room)> {
1207        let db_room = room::Entity::find_by_id(room_id)
1208            .one(tx)
1209            .await?
1210            .ok_or_else(|| anyhow!("could not find room"))?;
1211
1212        let mut db_participants = db_room
1213            .find_related(room_participant::Entity)
1214            .stream(tx)
1215            .await?;
1216        let mut participants = HashMap::default();
1217        let mut pending_participants = Vec::new();
1218        while let Some(db_participant) = db_participants.next().await {
1219            let db_participant = db_participant?;
1220            if let (
1221                Some(answering_connection_id),
1222                Some(answering_connection_server_id),
1223                Some(participant_index),
1224            ) = (
1225                db_participant.answering_connection_id,
1226                db_participant.answering_connection_server_id,
1227                db_participant.participant_index,
1228            ) {
1229                let location = match (
1230                    db_participant.location_kind,
1231                    db_participant.location_project_id,
1232                ) {
1233                    (Some(0), Some(project_id)) => {
1234                        Some(proto::participant_location::Variant::SharedProject(
1235                            proto::participant_location::SharedProject {
1236                                id: project_id.to_proto(),
1237                            },
1238                        ))
1239                    }
1240                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1241                        Default::default(),
1242                    )),
1243                    _ => Some(proto::participant_location::Variant::External(
1244                        Default::default(),
1245                    )),
1246                };
1247
1248                let answering_connection = ConnectionId {
1249                    owner_id: answering_connection_server_id.0 as u32,
1250                    id: answering_connection_id as u32,
1251                };
1252                participants.insert(
1253                    answering_connection,
1254                    proto::Participant {
1255                        user_id: db_participant.user_id.to_proto(),
1256                        peer_id: Some(answering_connection.into()),
1257                        projects: Default::default(),
1258                        location: Some(proto::ParticipantLocation { variant: location }),
1259                        participant_index: participant_index as u32,
1260                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1261                    },
1262                );
1263            } else {
1264                pending_participants.push(proto::PendingParticipant {
1265                    user_id: db_participant.user_id.to_proto(),
1266                    calling_user_id: db_participant.calling_user_id.to_proto(),
1267                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1268                });
1269            }
1270        }
1271        drop(db_participants);
1272
1273        let mut db_projects = db_room
1274            .find_related(project::Entity)
1275            .find_with_related(worktree::Entity)
1276            .stream(tx)
1277            .await?;
1278
1279        while let Some(row) = db_projects.next().await {
1280            let (db_project, db_worktree) = row?;
1281            let host_connection = db_project.host_connection()?;
1282            if let Some(participant) = participants.get_mut(&host_connection) {
1283                let project = if let Some(project) = participant
1284                    .projects
1285                    .iter_mut()
1286                    .find(|project| project.id == db_project.id.to_proto())
1287                {
1288                    project
1289                } else {
1290                    participant.projects.push(proto::ParticipantProject {
1291                        id: db_project.id.to_proto(),
1292                        worktree_root_names: Default::default(),
1293                    });
1294                    participant.projects.last_mut().unwrap()
1295                };
1296
1297                if let Some(db_worktree) = db_worktree {
1298                    if db_worktree.visible {
1299                        project.worktree_root_names.push(db_worktree.root_name);
1300                    }
1301                }
1302            }
1303        }
1304        drop(db_projects);
1305
1306        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1307        let mut followers = Vec::new();
1308        while let Some(db_follower) = db_followers.next().await {
1309            let db_follower = db_follower?;
1310            followers.push(proto::Follower {
1311                leader_id: Some(db_follower.leader_connection().into()),
1312                follower_id: Some(db_follower.follower_connection().into()),
1313                project_id: db_follower.project_id.to_proto(),
1314            });
1315        }
1316        drop(db_followers);
1317
1318        let channel = if let Some(channel_id) = db_room.channel_id {
1319            Some(self.get_channel_internal(channel_id, &*tx).await?)
1320        } else {
1321            None
1322        };
1323
1324        Ok((
1325            channel,
1326            proto::Room {
1327                id: db_room.id.to_proto(),
1328                live_kit_room: db_room.live_kit_room,
1329                participants: participants.into_values().collect(),
1330                pending_participants,
1331                followers,
1332            },
1333        ))
1334    }
1335}