rooms.rs

   1use super::*;
   2
   3impl Database {
   4    /// Clears all room participants in rooms attached to a stale server.
   5    pub async fn clear_stale_room_participants(
   6        &self,
   7        room_id: RoomId,
   8        new_server_id: ServerId,
   9    ) -> Result<RoomGuard<RefreshedRoom>> {
  10        self.room_transaction(room_id, |tx| async move {
  11            let stale_participant_filter = Condition::all()
  12                .add(room_participant::Column::RoomId.eq(room_id))
  13                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  14                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  15
  16            let stale_participant_user_ids = room_participant::Entity::find()
  17                .filter(stale_participant_filter.clone())
  18                .all(&*tx)
  19                .await?
  20                .into_iter()
  21                .map(|participant| participant.user_id)
  22                .collect::<Vec<_>>();
  23
  24            // Delete participants who failed to reconnect and cancel their calls.
  25            let mut canceled_calls_to_user_ids = Vec::new();
  26            room_participant::Entity::delete_many()
  27                .filter(stale_participant_filter)
  28                .exec(&*tx)
  29                .await?;
  30            let called_participants = room_participant::Entity::find()
  31                .filter(
  32                    Condition::all()
  33                        .add(
  34                            room_participant::Column::CallingUserId
  35                                .is_in(stale_participant_user_ids.iter().copied()),
  36                        )
  37                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  38                )
  39                .all(&*tx)
  40                .await?;
  41            room_participant::Entity::delete_many()
  42                .filter(
  43                    room_participant::Column::Id
  44                        .is_in(called_participants.iter().map(|participant| participant.id)),
  45                )
  46                .exec(&*tx)
  47                .await?;
  48            canceled_calls_to_user_ids.extend(
  49                called_participants
  50                    .into_iter()
  51                    .map(|participant| participant.user_id),
  52            );
  53
  54            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  55            if channel.is_none() {
  56                // Delete the room if it becomes empty.
  57                if room.participants.is_empty() {
  58                    project::Entity::delete_many()
  59                        .filter(project::Column::RoomId.eq(room_id))
  60                        .exec(&*tx)
  61                        .await?;
  62                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  63                }
  64            };
  65
  66            Ok(RefreshedRoom {
  67                room,
  68                channel,
  69                stale_participant_user_ids,
  70                canceled_calls_to_user_ids,
  71            })
  72        })
  73        .await
  74    }
  75
  76    /// Returns the incoming calls for user with the given ID.
  77    pub async fn incoming_call_for_user(
  78        &self,
  79        user_id: UserId,
  80    ) -> Result<Option<proto::IncomingCall>> {
  81        self.transaction(|tx| async move {
  82            let pending_participant = room_participant::Entity::find()
  83                .filter(
  84                    room_participant::Column::UserId
  85                        .eq(user_id)
  86                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  87                )
  88                .one(&*tx)
  89                .await?;
  90
  91            if let Some(pending_participant) = pending_participant {
  92                let room = self.get_room(pending_participant.room_id, &tx).await?;
  93                Ok(Self::build_incoming_call(&room, user_id))
  94            } else {
  95                Ok(None)
  96            }
  97        })
  98        .await
  99    }
 100
 101    /// Creates a new room.
 102    pub async fn create_room(
 103        &self,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106        live_kit_room: &str,
 107    ) -> Result<proto::Room> {
 108        self.transaction(|tx| async move {
 109            let room = room::ActiveModel {
 110                live_kit_room: ActiveValue::set(live_kit_room.into()),
 111                ..Default::default()
 112            }
 113            .insert(&*tx)
 114            .await?;
 115            room_participant::ActiveModel {
 116                room_id: ActiveValue::set(room.id),
 117                user_id: ActiveValue::set(user_id),
 118                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 119                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 120                    connection.owner_id as i32,
 121                ))),
 122                answering_connection_lost: ActiveValue::set(false),
 123                calling_user_id: ActiveValue::set(user_id),
 124                calling_connection_id: ActiveValue::set(connection.id as i32),
 125                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 126                    connection.owner_id as i32,
 127                ))),
 128                participant_index: ActiveValue::set(Some(0)),
 129                role: ActiveValue::set(Some(ChannelRole::Admin)),
 130
 131                id: ActiveValue::NotSet,
 132                location_kind: ActiveValue::NotSet,
 133                location_project_id: ActiveValue::NotSet,
 134                initial_project_id: ActiveValue::NotSet,
 135            }
 136            .insert(&*tx)
 137            .await?;
 138
 139            let room = self.get_room(room.id, &tx).await?;
 140            Ok(room)
 141        })
 142        .await
 143    }
 144
 145    pub async fn call(
 146        &self,
 147        room_id: RoomId,
 148        calling_user_id: UserId,
 149        calling_connection: ConnectionId,
 150        called_user_id: UserId,
 151        initial_project_id: Option<ProjectId>,
 152    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 153        self.room_transaction(room_id, |tx| async move {
 154            let caller = room_participant::Entity::find()
 155                .filter(
 156                    room_participant::Column::UserId
 157                        .eq(calling_user_id)
 158                        .and(room_participant::Column::RoomId.eq(room_id)),
 159                )
 160                .one(&*tx)
 161                .await?
 162                .ok_or_else(|| anyhow!("user is not in the room"))?;
 163
 164            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 165                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 166                ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
 167                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 168            };
 169
 170            room_participant::ActiveModel {
 171                room_id: ActiveValue::set(room_id),
 172                user_id: ActiveValue::set(called_user_id),
 173                answering_connection_lost: ActiveValue::set(false),
 174                participant_index: ActiveValue::NotSet,
 175                calling_user_id: ActiveValue::set(calling_user_id),
 176                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 177                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 178                    calling_connection.owner_id as i32,
 179                ))),
 180                initial_project_id: ActiveValue::set(initial_project_id),
 181                role: ActiveValue::set(Some(called_user_role)),
 182
 183                id: ActiveValue::NotSet,
 184                answering_connection_id: ActiveValue::NotSet,
 185                answering_connection_server_id: ActiveValue::NotSet,
 186                location_kind: ActiveValue::NotSet,
 187                location_project_id: ActiveValue::NotSet,
 188            }
 189            .insert(&*tx)
 190            .await?;
 191
 192            let room = self.get_room(room_id, &tx).await?;
 193            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 194                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 195            Ok((room, incoming_call))
 196        })
 197        .await
 198    }
 199
 200    pub async fn call_failed(
 201        &self,
 202        room_id: RoomId,
 203        called_user_id: UserId,
 204    ) -> Result<RoomGuard<proto::Room>> {
 205        self.room_transaction(room_id, |tx| async move {
 206            room_participant::Entity::delete_many()
 207                .filter(
 208                    room_participant::Column::RoomId
 209                        .eq(room_id)
 210                        .and(room_participant::Column::UserId.eq(called_user_id)),
 211                )
 212                .exec(&*tx)
 213                .await?;
 214            let room = self.get_room(room_id, &tx).await?;
 215            Ok(room)
 216        })
 217        .await
 218    }
 219
 220    pub async fn decline_call(
 221        &self,
 222        expected_room_id: Option<RoomId>,
 223        user_id: UserId,
 224    ) -> Result<Option<RoomGuard<proto::Room>>> {
 225        self.optional_room_transaction(|tx| async move {
 226            let mut filter = Condition::all()
 227                .add(room_participant::Column::UserId.eq(user_id))
 228                .add(room_participant::Column::AnsweringConnectionId.is_null());
 229            if let Some(room_id) = expected_room_id {
 230                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 231            }
 232            let participant = room_participant::Entity::find()
 233                .filter(filter)
 234                .one(&*tx)
 235                .await?;
 236
 237            let participant = if let Some(participant) = participant {
 238                participant
 239            } else if expected_room_id.is_some() {
 240                return Err(anyhow!("could not find call to decline"))?;
 241            } else {
 242                return Ok(None);
 243            };
 244
 245            let room_id = participant.room_id;
 246            room_participant::Entity::delete(participant.into_active_model())
 247                .exec(&*tx)
 248                .await?;
 249
 250            let room = self.get_room(room_id, &tx).await?;
 251            Ok(Some((room_id, room)))
 252        })
 253        .await
 254    }
 255
 256    pub async fn cancel_call(
 257        &self,
 258        room_id: RoomId,
 259        calling_connection: ConnectionId,
 260        called_user_id: UserId,
 261    ) -> Result<RoomGuard<proto::Room>> {
 262        self.room_transaction(room_id, |tx| async move {
 263            let participant = room_participant::Entity::find()
 264                .filter(
 265                    Condition::all()
 266                        .add(room_participant::Column::UserId.eq(called_user_id))
 267                        .add(room_participant::Column::RoomId.eq(room_id))
 268                        .add(
 269                            room_participant::Column::CallingConnectionId
 270                                .eq(calling_connection.id as i32),
 271                        )
 272                        .add(
 273                            room_participant::Column::CallingConnectionServerId
 274                                .eq(calling_connection.owner_id as i32),
 275                        )
 276                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 277                )
 278                .one(&*tx)
 279                .await?
 280                .ok_or_else(|| anyhow!("no call to cancel"))?;
 281
 282            room_participant::Entity::delete(participant.into_active_model())
 283                .exec(&*tx)
 284                .await?;
 285
 286            let room = self.get_room(room_id, &tx).await?;
 287            Ok(room)
 288        })
 289        .await
 290    }
 291
 292    pub async fn join_room(
 293        &self,
 294        room_id: RoomId,
 295        user_id: UserId,
 296        connection: ConnectionId,
 297    ) -> Result<RoomGuard<JoinRoom>> {
 298        self.room_transaction(room_id, |tx| async move {
 299            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 300            enum QueryChannelId {
 301                ChannelId,
 302            }
 303
 304            let channel_id: Option<ChannelId> = room::Entity::find()
 305                .select_only()
 306                .column(room::Column::ChannelId)
 307                .filter(room::Column::Id.eq(room_id))
 308                .into_values::<_, QueryChannelId>()
 309                .one(&*tx)
 310                .await?
 311                .ok_or_else(|| anyhow!("no such room"))?;
 312
 313            if channel_id.is_some() {
 314                Err(anyhow!("tried to join channel call directly"))?
 315            }
 316
 317            let participant_index = self
 318                .get_next_participant_index_internal(room_id, &tx)
 319                .await?;
 320
 321            let result = room_participant::Entity::update_many()
 322                .filter(
 323                    Condition::all()
 324                        .add(room_participant::Column::RoomId.eq(room_id))
 325                        .add(room_participant::Column::UserId.eq(user_id))
 326                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 327                )
 328                .set(room_participant::ActiveModel {
 329                    participant_index: ActiveValue::Set(Some(participant_index)),
 330                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 331                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 332                        connection.owner_id as i32,
 333                    ))),
 334                    answering_connection_lost: ActiveValue::set(false),
 335                    ..Default::default()
 336                })
 337                .exec(&*tx)
 338                .await?;
 339            if result.rows_affected == 0 {
 340                Err(anyhow!("room does not exist or was already joined"))?;
 341            }
 342
 343            let room = self.get_room(room_id, &tx).await?;
 344            Ok(JoinRoom {
 345                room,
 346                channel: None,
 347            })
 348        })
 349        .await
 350    }
 351
 352    async fn get_next_participant_index_internal(
 353        &self,
 354        room_id: RoomId,
 355        tx: &DatabaseTransaction,
 356    ) -> Result<i32> {
 357        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 358        enum QueryParticipantIndices {
 359            ParticipantIndex,
 360        }
 361        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 362            .filter(
 363                room_participant::Column::RoomId
 364                    .eq(room_id)
 365                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 366            )
 367            .select_only()
 368            .column(room_participant::Column::ParticipantIndex)
 369            .into_values::<_, QueryParticipantIndices>()
 370            .all(tx)
 371            .await?;
 372
 373        let mut participant_index = 0;
 374        while existing_participant_indices.contains(&participant_index) {
 375            participant_index += 1;
 376        }
 377
 378        Ok(participant_index)
 379    }
 380
 381    /// Returns the channel ID for the given room, if it has one.
 382    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 383        self.transaction(|tx| async move {
 384            let room: Option<room::Model> = room::Entity::find()
 385                .filter(room::Column::Id.eq(room_id))
 386                .one(&*tx)
 387                .await?;
 388
 389            Ok(room.and_then(|room| room.channel_id))
 390        })
 391        .await
 392    }
 393
 394    pub(crate) async fn join_channel_room_internal(
 395        &self,
 396        room_id: RoomId,
 397        user_id: UserId,
 398        connection: ConnectionId,
 399        role: ChannelRole,
 400        tx: &DatabaseTransaction,
 401    ) -> Result<JoinRoom> {
 402        let participant_index = self
 403            .get_next_participant_index_internal(room_id, tx)
 404            .await?;
 405
 406        room_participant::Entity::insert_many([room_participant::ActiveModel {
 407            room_id: ActiveValue::set(room_id),
 408            user_id: ActiveValue::set(user_id),
 409            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 410            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 411                connection.owner_id as i32,
 412            ))),
 413            answering_connection_lost: ActiveValue::set(false),
 414            calling_user_id: ActiveValue::set(user_id),
 415            calling_connection_id: ActiveValue::set(connection.id as i32),
 416            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 417                connection.owner_id as i32,
 418            ))),
 419            participant_index: ActiveValue::Set(Some(participant_index)),
 420            role: ActiveValue::set(Some(role)),
 421            id: ActiveValue::NotSet,
 422            location_kind: ActiveValue::NotSet,
 423            location_project_id: ActiveValue::NotSet,
 424            initial_project_id: ActiveValue::NotSet,
 425        }])
 426        .on_conflict(
 427            OnConflict::columns([room_participant::Column::UserId])
 428                .update_columns([
 429                    room_participant::Column::AnsweringConnectionId,
 430                    room_participant::Column::AnsweringConnectionServerId,
 431                    room_participant::Column::AnsweringConnectionLost,
 432                    room_participant::Column::ParticipantIndex,
 433                    room_participant::Column::Role,
 434                ])
 435                .to_owned(),
 436        )
 437        .exec(tx)
 438        .await?;
 439
 440        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 441        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 442        Ok(JoinRoom {
 443            room,
 444            channel: Some(channel),
 445        })
 446    }
 447
 448    pub async fn rejoin_room(
 449        &self,
 450        rejoin_room: proto::RejoinRoom,
 451        user_id: UserId,
 452        connection: ConnectionId,
 453    ) -> Result<RoomGuard<RejoinedRoom>> {
 454        let room_id = RoomId::from_proto(rejoin_room.id);
 455        self.room_transaction(room_id, |tx| async {
 456            let tx = tx;
 457            let participant_update = room_participant::Entity::update_many()
 458                .filter(
 459                    Condition::all()
 460                        .add(room_participant::Column::RoomId.eq(room_id))
 461                        .add(room_participant::Column::UserId.eq(user_id))
 462                        .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
 463                )
 464                .set(room_participant::ActiveModel {
 465                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 466                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 467                        connection.owner_id as i32,
 468                    ))),
 469                    answering_connection_lost: ActiveValue::set(false),
 470                    ..Default::default()
 471                })
 472                .exec(&*tx)
 473                .await?;
 474            if participant_update.rows_affected == 0 {
 475                return Err(anyhow!("room does not exist or was already joined"))?;
 476            }
 477
 478            let mut reshared_projects = Vec::new();
 479            for reshared_project in &rejoin_room.reshared_projects {
 480                let project_id = ProjectId::from_proto(reshared_project.project_id);
 481                let project = project::Entity::find_by_id(project_id)
 482                    .one(&*tx)
 483                    .await?
 484                    .ok_or_else(|| anyhow!("project does not exist"))?;
 485                if project.host_user_id != Some(user_id) {
 486                    return Err(anyhow!("no such project"))?;
 487                }
 488
 489                let mut collaborators = project
 490                    .find_related(project_collaborator::Entity)
 491                    .all(&*tx)
 492                    .await?;
 493                let host_ix = collaborators
 494                    .iter()
 495                    .position(|collaborator| {
 496                        collaborator.user_id == user_id && collaborator.is_host
 497                    })
 498                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 499                let host = collaborators.swap_remove(host_ix);
 500                let old_connection_id = host.connection();
 501
 502                project::Entity::update(project::ActiveModel {
 503                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 504                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 505                        connection.owner_id as i32,
 506                    ))),
 507                    ..project.into_active_model()
 508                })
 509                .exec(&*tx)
 510                .await?;
 511                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 512                    connection_id: ActiveValue::set(connection.id as i32),
 513                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 514                    ..host.into_active_model()
 515                })
 516                .exec(&*tx)
 517                .await?;
 518
 519                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 520                    .await?;
 521
 522                reshared_projects.push(ResharedProject {
 523                    id: project_id,
 524                    old_connection_id,
 525                    collaborators: collaborators
 526                        .iter()
 527                        .map(|collaborator| ProjectCollaborator {
 528                            connection_id: collaborator.connection(),
 529                            user_id: collaborator.user_id,
 530                            replica_id: collaborator.replica_id,
 531                            is_host: collaborator.is_host,
 532                        })
 533                        .collect(),
 534                    worktrees: reshared_project.worktrees.clone(),
 535                });
 536            }
 537
 538            project::Entity::delete_many()
 539                .filter(
 540                    Condition::all()
 541                        .add(project::Column::RoomId.eq(room_id))
 542                        .add(project::Column::HostUserId.eq(user_id))
 543                        .add(
 544                            project::Column::Id
 545                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 546                        ),
 547                )
 548                .exec(&*tx)
 549                .await?;
 550
 551            let mut rejoined_projects = Vec::new();
 552            for rejoined_project in &rejoin_room.rejoined_projects {
 553                let project_id = ProjectId::from_proto(rejoined_project.id);
 554                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 555                    continue;
 556                };
 557
 558                let mut worktrees = Vec::new();
 559                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 560                for db_worktree in db_worktrees {
 561                    let mut worktree = RejoinedWorktree {
 562                        id: db_worktree.id as u64,
 563                        abs_path: db_worktree.abs_path,
 564                        root_name: db_worktree.root_name,
 565                        visible: db_worktree.visible,
 566                        updated_entries: Default::default(),
 567                        removed_entries: Default::default(),
 568                        updated_repositories: Default::default(),
 569                        removed_repositories: Default::default(),
 570                        diagnostic_summaries: Default::default(),
 571                        settings_files: Default::default(),
 572                        scan_id: db_worktree.scan_id as u64,
 573                        completed_scan_id: db_worktree.completed_scan_id as u64,
 574                    };
 575
 576                    let rejoined_worktree = rejoined_project
 577                        .worktrees
 578                        .iter()
 579                        .find(|worktree| worktree.id == db_worktree.id as u64);
 580
 581                    // File entries
 582                    {
 583                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 584                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 585                        } else {
 586                            worktree_entry::Column::IsDeleted.eq(false)
 587                        };
 588
 589                        let mut db_entries = worktree_entry::Entity::find()
 590                            .filter(
 591                                Condition::all()
 592                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 593                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 594                                    .add(entry_filter),
 595                            )
 596                            .stream(&*tx)
 597                            .await?;
 598
 599                        while let Some(db_entry) = db_entries.next().await {
 600                            let db_entry = db_entry?;
 601                            if db_entry.is_deleted {
 602                                worktree.removed_entries.push(db_entry.id as u64);
 603                            } else {
 604                                worktree.updated_entries.push(proto::Entry {
 605                                    id: db_entry.id as u64,
 606                                    is_dir: db_entry.is_dir,
 607                                    path: db_entry.path,
 608                                    inode: db_entry.inode as u64,
 609                                    mtime: Some(proto::Timestamp {
 610                                        seconds: db_entry.mtime_seconds as u64,
 611                                        nanos: db_entry.mtime_nanos as u32,
 612                                    }),
 613                                    is_symlink: db_entry.is_symlink,
 614                                    is_ignored: db_entry.is_ignored,
 615                                    is_external: db_entry.is_external,
 616                                    git_status: db_entry.git_status.map(|status| status as i32),
 617                                });
 618                            }
 619                        }
 620                    }
 621
 622                    // Repository Entries
 623                    {
 624                        let repository_entry_filter =
 625                            if let Some(rejoined_worktree) = rejoined_worktree {
 626                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 627                            } else {
 628                                worktree_repository::Column::IsDeleted.eq(false)
 629                            };
 630
 631                        let mut db_repositories = worktree_repository::Entity::find()
 632                            .filter(
 633                                Condition::all()
 634                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 635                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 636                                    .add(repository_entry_filter),
 637                            )
 638                            .stream(&*tx)
 639                            .await?;
 640
 641                        while let Some(db_repository) = db_repositories.next().await {
 642                            let db_repository = db_repository?;
 643                            if db_repository.is_deleted {
 644                                worktree
 645                                    .removed_repositories
 646                                    .push(db_repository.work_directory_id as u64);
 647                            } else {
 648                                worktree.updated_repositories.push(proto::RepositoryEntry {
 649                                    work_directory_id: db_repository.work_directory_id as u64,
 650                                    branch: db_repository.branch,
 651                                });
 652                            }
 653                        }
 654                    }
 655
 656                    worktrees.push(worktree);
 657                }
 658
 659                let language_servers = project
 660                    .find_related(language_server::Entity)
 661                    .all(&*tx)
 662                    .await?
 663                    .into_iter()
 664                    .map(|language_server| proto::LanguageServer {
 665                        id: language_server.id as u64,
 666                        name: language_server.name,
 667                    })
 668                    .collect::<Vec<_>>();
 669
 670                {
 671                    let mut db_settings_files = worktree_settings_file::Entity::find()
 672                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 673                        .stream(&*tx)
 674                        .await?;
 675                    while let Some(db_settings_file) = db_settings_files.next().await {
 676                        let db_settings_file = db_settings_file?;
 677                        if let Some(worktree) = worktrees
 678                            .iter_mut()
 679                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 680                        {
 681                            worktree.settings_files.push(WorktreeSettingsFile {
 682                                path: db_settings_file.path,
 683                                content: db_settings_file.content,
 684                            });
 685                        }
 686                    }
 687                }
 688
 689                let mut collaborators = project
 690                    .find_related(project_collaborator::Entity)
 691                    .all(&*tx)
 692                    .await?;
 693                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 694                    .iter()
 695                    .position(|collaborator| collaborator.user_id == user_id)
 696                {
 697                    collaborators.swap_remove(self_collaborator_ix)
 698                } else {
 699                    continue;
 700                };
 701                let old_connection_id = self_collaborator.connection();
 702                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 703                    connection_id: ActiveValue::set(connection.id as i32),
 704                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 705                    ..self_collaborator.into_active_model()
 706                })
 707                .exec(&*tx)
 708                .await?;
 709
 710                let collaborators = collaborators
 711                    .into_iter()
 712                    .map(|collaborator| ProjectCollaborator {
 713                        connection_id: collaborator.connection(),
 714                        user_id: collaborator.user_id,
 715                        replica_id: collaborator.replica_id,
 716                        is_host: collaborator.is_host,
 717                    })
 718                    .collect::<Vec<_>>();
 719
 720                rejoined_projects.push(RejoinedProject {
 721                    id: project_id,
 722                    old_connection_id,
 723                    collaborators,
 724                    worktrees,
 725                    language_servers,
 726                });
 727            }
 728
 729            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 730
 731            Ok(RejoinedRoom {
 732                room,
 733                channel,
 734                rejoined_projects,
 735                reshared_projects,
 736            })
 737        })
 738        .await
 739    }
 740
 741    pub async fn leave_room(
 742        &self,
 743        connection: ConnectionId,
 744    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 745        self.optional_room_transaction(|tx| async move {
 746            let leaving_participant = room_participant::Entity::find()
 747                .filter(
 748                    Condition::all()
 749                        .add(
 750                            room_participant::Column::AnsweringConnectionId
 751                                .eq(connection.id as i32),
 752                        )
 753                        .add(
 754                            room_participant::Column::AnsweringConnectionServerId
 755                                .eq(connection.owner_id as i32),
 756                        ),
 757                )
 758                .one(&*tx)
 759                .await?;
 760
 761            if let Some(leaving_participant) = leaving_participant {
 762                // Leave room.
 763                let room_id = leaving_participant.room_id;
 764                room_participant::Entity::delete_by_id(leaving_participant.id)
 765                    .exec(&*tx)
 766                    .await?;
 767
 768                // Cancel pending calls initiated by the leaving user.
 769                let called_participants = room_participant::Entity::find()
 770                    .filter(
 771                        Condition::all()
 772                            .add(
 773                                room_participant::Column::CallingUserId
 774                                    .eq(leaving_participant.user_id),
 775                            )
 776                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 777                    )
 778                    .all(&*tx)
 779                    .await?;
 780                room_participant::Entity::delete_many()
 781                    .filter(
 782                        room_participant::Column::Id
 783                            .is_in(called_participants.iter().map(|participant| participant.id)),
 784                    )
 785                    .exec(&*tx)
 786                    .await?;
 787                let canceled_calls_to_user_ids = called_participants
 788                    .into_iter()
 789                    .map(|participant| participant.user_id)
 790                    .collect();
 791
 792                // Detect left projects.
 793                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 794                enum QueryProjectIds {
 795                    ProjectId,
 796                }
 797                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 798                    .select_only()
 799                    .column_as(
 800                        project_collaborator::Column::ProjectId,
 801                        QueryProjectIds::ProjectId,
 802                    )
 803                    .filter(
 804                        Condition::all()
 805                            .add(
 806                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 807                            )
 808                            .add(
 809                                project_collaborator::Column::ConnectionServerId
 810                                    .eq(connection.owner_id as i32),
 811                            ),
 812                    )
 813                    .into_values::<_, QueryProjectIds>()
 814                    .all(&*tx)
 815                    .await?;
 816                let mut left_projects = HashMap::default();
 817                let mut collaborators = project_collaborator::Entity::find()
 818                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 819                    .stream(&*tx)
 820                    .await?;
 821                while let Some(collaborator) = collaborators.next().await {
 822                    let collaborator = collaborator?;
 823                    let left_project =
 824                        left_projects
 825                            .entry(collaborator.project_id)
 826                            .or_insert(LeftProject {
 827                                id: collaborator.project_id,
 828                                host_user_id: Default::default(),
 829                                connection_ids: Default::default(),
 830                                host_connection_id: None,
 831                            });
 832
 833                    let collaborator_connection_id = collaborator.connection();
 834                    if collaborator_connection_id != connection {
 835                        left_project.connection_ids.push(collaborator_connection_id);
 836                    }
 837
 838                    if collaborator.is_host {
 839                        left_project.host_user_id = Some(collaborator.user_id);
 840                        left_project.host_connection_id = Some(collaborator_connection_id);
 841                    }
 842                }
 843                drop(collaborators);
 844
 845                // Leave projects.
 846                project_collaborator::Entity::delete_many()
 847                    .filter(
 848                        Condition::all()
 849                            .add(
 850                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 851                            )
 852                            .add(
 853                                project_collaborator::Column::ConnectionServerId
 854                                    .eq(connection.owner_id as i32),
 855                            ),
 856                    )
 857                    .exec(&*tx)
 858                    .await?;
 859
 860                follower::Entity::delete_many()
 861                    .filter(
 862                        Condition::all()
 863                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
 864                    )
 865                    .exec(&*tx)
 866                    .await?;
 867
 868                // Unshare projects.
 869                project::Entity::delete_many()
 870                    .filter(
 871                        Condition::all()
 872                            .add(project::Column::RoomId.eq(room_id))
 873                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 874                            .add(
 875                                project::Column::HostConnectionServerId
 876                                    .eq(connection.owner_id as i32),
 877                            ),
 878                    )
 879                    .exec(&*tx)
 880                    .await?;
 881
 882                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 883                let deleted = if room.participants.is_empty() {
 884                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 885                    result.rows_affected > 0
 886                } else {
 887                    false
 888                };
 889
 890                let left_room = LeftRoom {
 891                    room,
 892                    channel,
 893                    left_projects,
 894                    canceled_calls_to_user_ids,
 895                    deleted,
 896                };
 897
 898                if left_room.room.participants.is_empty() {
 899                    self.rooms.remove(&room_id);
 900                }
 901
 902                Ok(Some((room_id, left_room)))
 903            } else {
 904                Ok(None)
 905            }
 906        })
 907        .await
 908    }
 909
 910    /// Updates the location of a participant in the given room.
 911    pub async fn update_room_participant_location(
 912        &self,
 913        room_id: RoomId,
 914        connection: ConnectionId,
 915        location: proto::ParticipantLocation,
 916    ) -> Result<RoomGuard<proto::Room>> {
 917        self.room_transaction(room_id, |tx| async {
 918            let tx = tx;
 919            let location_kind;
 920            let location_project_id;
 921            match location
 922                .variant
 923                .as_ref()
 924                .ok_or_else(|| anyhow!("invalid location"))?
 925            {
 926                proto::participant_location::Variant::SharedProject(project) => {
 927                    location_kind = 0;
 928                    location_project_id = Some(ProjectId::from_proto(project.id));
 929                }
 930                proto::participant_location::Variant::UnsharedProject(_) => {
 931                    location_kind = 1;
 932                    location_project_id = None;
 933                }
 934                proto::participant_location::Variant::External(_) => {
 935                    location_kind = 2;
 936                    location_project_id = None;
 937                }
 938            }
 939
 940            let result = room_participant::Entity::update_many()
 941                .filter(
 942                    Condition::all()
 943                        .add(room_participant::Column::RoomId.eq(room_id))
 944                        .add(
 945                            room_participant::Column::AnsweringConnectionId
 946                                .eq(connection.id as i32),
 947                        )
 948                        .add(
 949                            room_participant::Column::AnsweringConnectionServerId
 950                                .eq(connection.owner_id as i32),
 951                        ),
 952                )
 953                .set(room_participant::ActiveModel {
 954                    location_kind: ActiveValue::set(Some(location_kind)),
 955                    location_project_id: ActiveValue::set(location_project_id),
 956                    ..Default::default()
 957                })
 958                .exec(&*tx)
 959                .await?;
 960
 961            if result.rows_affected == 1 {
 962                let room = self.get_room(room_id, &tx).await?;
 963                Ok(room)
 964            } else {
 965                Err(anyhow!("could not update room participant location"))?
 966            }
 967        })
 968        .await
 969    }
 970
 971    /// Sets the role of a participant in the given room.
 972    pub async fn set_room_participant_role(
 973        &self,
 974        admin_id: UserId,
 975        room_id: RoomId,
 976        user_id: UserId,
 977        role: ChannelRole,
 978    ) -> Result<RoomGuard<proto::Room>> {
 979        self.room_transaction(room_id, |tx| async move {
 980            room_participant::Entity::find()
 981                .filter(
 982                    Condition::all()
 983                        .add(room_participant::Column::RoomId.eq(room_id))
 984                        .add(room_participant::Column::UserId.eq(admin_id))
 985                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
 986                )
 987                .one(&*tx)
 988                .await?
 989                .ok_or_else(|| anyhow!("only admins can set participant role"))?;
 990
 991            if role.requires_cla() {
 992                self.check_user_has_signed_cla(user_id, room_id, &tx)
 993                    .await?;
 994            }
 995
 996            let result = room_participant::Entity::update_many()
 997                .filter(
 998                    Condition::all()
 999                        .add(room_participant::Column::RoomId.eq(room_id))
1000                        .add(room_participant::Column::UserId.eq(user_id)),
1001                )
1002                .set(room_participant::ActiveModel {
1003                    role: ActiveValue::set(Some(role)),
1004                    ..Default::default()
1005                })
1006                .exec(&*tx)
1007                .await?;
1008
1009            if result.rows_affected != 1 {
1010                Err(anyhow!("could not update room participant role"))?;
1011            }
1012            self.get_room(room_id, &tx).await
1013        })
1014        .await
1015    }
1016
1017    async fn check_user_has_signed_cla(
1018        &self,
1019        user_id: UserId,
1020        room_id: RoomId,
1021        tx: &DatabaseTransaction,
1022    ) -> Result<()> {
1023        let channel = room::Entity::find_by_id(room_id)
1024            .one(tx)
1025            .await?
1026            .ok_or_else(|| anyhow!("could not find room"))?
1027            .find_related(channel::Entity)
1028            .one(tx)
1029            .await?;
1030
1031        if let Some(channel) = channel {
1032            let requires_zed_cla = channel.requires_zed_cla
1033                || channel::Entity::find()
1034                    .filter(
1035                        channel::Column::Id
1036                            .is_in(channel.ancestors())
1037                            .and(channel::Column::RequiresZedCla.eq(true)),
1038                    )
1039                    .count(tx)
1040                    .await?
1041                    > 0;
1042            if requires_zed_cla {
1043                if contributor::Entity::find()
1044                    .filter(contributor::Column::UserId.eq(user_id))
1045                    .one(tx)
1046                    .await?
1047                    .is_none()
1048                {
1049                    Err(anyhow!("user has not signed the Zed CLA"))?;
1050                }
1051            }
1052        }
1053        Ok(())
1054    }
1055
1056    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1057        self.transaction(|tx| async move {
1058            self.room_connection_lost(connection, &tx).await?;
1059            self.channel_buffer_connection_lost(connection, &tx).await?;
1060            self.channel_chat_connection_lost(connection, &tx).await?;
1061            Ok(())
1062        })
1063        .await
1064    }
1065
1066    pub async fn room_connection_lost(
1067        &self,
1068        connection: ConnectionId,
1069        tx: &DatabaseTransaction,
1070    ) -> Result<()> {
1071        let participant = room_participant::Entity::find()
1072            .filter(
1073                Condition::all()
1074                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1075                    .add(
1076                        room_participant::Column::AnsweringConnectionServerId
1077                            .eq(connection.owner_id as i32),
1078                    ),
1079            )
1080            .one(tx)
1081            .await?;
1082
1083        if let Some(participant) = participant {
1084            room_participant::Entity::update(room_participant::ActiveModel {
1085                answering_connection_lost: ActiveValue::set(true),
1086                ..participant.into_active_model()
1087            })
1088            .exec(tx)
1089            .await?;
1090        }
1091        Ok(())
1092    }
1093
1094    fn build_incoming_call(
1095        room: &proto::Room,
1096        called_user_id: UserId,
1097    ) -> Option<proto::IncomingCall> {
1098        let pending_participant = room
1099            .pending_participants
1100            .iter()
1101            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1102
1103        Some(proto::IncomingCall {
1104            room_id: room.id,
1105            calling_user_id: pending_participant.calling_user_id,
1106            participant_user_ids: room
1107                .participants
1108                .iter()
1109                .map(|participant| participant.user_id)
1110                .collect(),
1111            initial_project: room.participants.iter().find_map(|participant| {
1112                let initial_project_id = pending_participant.initial_project_id?;
1113                participant
1114                    .projects
1115                    .iter()
1116                    .find(|project| project.id == initial_project_id)
1117                    .cloned()
1118            }),
1119        })
1120    }
1121
1122    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1123        let (_, room) = self.get_channel_room(room_id, tx).await?;
1124        Ok(room)
1125    }
1126
1127    pub async fn room_connection_ids(
1128        &self,
1129        room_id: RoomId,
1130        connection_id: ConnectionId,
1131    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1132        self.room_transaction(room_id, |tx| async move {
1133            let mut participants = room_participant::Entity::find()
1134                .filter(room_participant::Column::RoomId.eq(room_id))
1135                .stream(&*tx)
1136                .await?;
1137
1138            let mut is_participant = false;
1139            let mut connection_ids = HashSet::default();
1140            while let Some(participant) = participants.next().await {
1141                let participant = participant?;
1142                if let Some(answering_connection) = participant.answering_connection() {
1143                    if answering_connection == connection_id {
1144                        is_participant = true;
1145                    } else {
1146                        connection_ids.insert(answering_connection);
1147                    }
1148                }
1149            }
1150
1151            if !is_participant {
1152                Err(anyhow!("not a room participant"))?;
1153            }
1154
1155            Ok(connection_ids)
1156        })
1157        .await
1158    }
1159
1160    async fn get_channel_room(
1161        &self,
1162        room_id: RoomId,
1163        tx: &DatabaseTransaction,
1164    ) -> Result<(Option<channel::Model>, proto::Room)> {
1165        let db_room = room::Entity::find_by_id(room_id)
1166            .one(tx)
1167            .await?
1168            .ok_or_else(|| anyhow!("could not find room"))?;
1169
1170        let mut db_participants = db_room
1171            .find_related(room_participant::Entity)
1172            .stream(tx)
1173            .await?;
1174        let mut participants = HashMap::default();
1175        let mut pending_participants = Vec::new();
1176        while let Some(db_participant) = db_participants.next().await {
1177            let db_participant = db_participant?;
1178            if let (
1179                Some(answering_connection_id),
1180                Some(answering_connection_server_id),
1181                Some(participant_index),
1182            ) = (
1183                db_participant.answering_connection_id,
1184                db_participant.answering_connection_server_id,
1185                db_participant.participant_index,
1186            ) {
1187                let location = match (
1188                    db_participant.location_kind,
1189                    db_participant.location_project_id,
1190                ) {
1191                    (Some(0), Some(project_id)) => {
1192                        Some(proto::participant_location::Variant::SharedProject(
1193                            proto::participant_location::SharedProject {
1194                                id: project_id.to_proto(),
1195                            },
1196                        ))
1197                    }
1198                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1199                        Default::default(),
1200                    )),
1201                    _ => Some(proto::participant_location::Variant::External(
1202                        Default::default(),
1203                    )),
1204                };
1205
1206                let answering_connection = ConnectionId {
1207                    owner_id: answering_connection_server_id.0 as u32,
1208                    id: answering_connection_id as u32,
1209                };
1210                participants.insert(
1211                    answering_connection,
1212                    proto::Participant {
1213                        user_id: db_participant.user_id.to_proto(),
1214                        peer_id: Some(answering_connection.into()),
1215                        projects: Default::default(),
1216                        location: Some(proto::ParticipantLocation { variant: location }),
1217                        participant_index: participant_index as u32,
1218                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1219                    },
1220                );
1221            } else {
1222                pending_participants.push(proto::PendingParticipant {
1223                    user_id: db_participant.user_id.to_proto(),
1224                    calling_user_id: db_participant.calling_user_id.to_proto(),
1225                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1226                });
1227            }
1228        }
1229        drop(db_participants);
1230
1231        let mut db_projects = db_room
1232            .find_related(project::Entity)
1233            .find_with_related(worktree::Entity)
1234            .stream(tx)
1235            .await?;
1236
1237        while let Some(row) = db_projects.next().await {
1238            let (db_project, db_worktree) = row?;
1239            let host_connection = db_project.host_connection()?;
1240            if let Some(participant) = participants.get_mut(&host_connection) {
1241                let project = if let Some(project) = participant
1242                    .projects
1243                    .iter_mut()
1244                    .find(|project| project.id == db_project.id.to_proto())
1245                {
1246                    project
1247                } else {
1248                    participant.projects.push(proto::ParticipantProject {
1249                        id: db_project.id.to_proto(),
1250                        worktree_root_names: Default::default(),
1251                    });
1252                    participant.projects.last_mut().unwrap()
1253                };
1254
1255                if let Some(db_worktree) = db_worktree {
1256                    if db_worktree.visible {
1257                        project.worktree_root_names.push(db_worktree.root_name);
1258                    }
1259                }
1260            }
1261        }
1262        drop(db_projects);
1263
1264        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1265        let mut followers = Vec::new();
1266        while let Some(db_follower) = db_followers.next().await {
1267            let db_follower = db_follower?;
1268            followers.push(proto::Follower {
1269                leader_id: Some(db_follower.leader_connection().into()),
1270                follower_id: Some(db_follower.follower_connection().into()),
1271                project_id: db_follower.project_id.to_proto(),
1272            });
1273        }
1274        drop(db_followers);
1275
1276        let channel = if let Some(channel_id) = db_room.channel_id {
1277            Some(self.get_channel_internal(channel_id, tx).await?)
1278        } else {
1279            None
1280        };
1281
1282        Ok((
1283            channel,
1284            proto::Room {
1285                id: db_room.id.to_proto(),
1286                live_kit_room: db_room.live_kit_room,
1287                participants: participants.into_values().collect(),
1288                pending_participants,
1289                followers,
1290            },
1291        ))
1292    }
1293}