rooms.rs

   1use super::*;
   2
   3impl Database {
   4    /// Clears all room participants in rooms attached to a stale server.
   5    pub async fn clear_stale_room_participants(
   6        &self,
   7        room_id: RoomId,
   8        new_server_id: ServerId,
   9    ) -> Result<RoomGuard<RefreshedRoom>> {
  10        self.room_transaction(room_id, |tx| async move {
  11            let stale_participant_filter = Condition::all()
  12                .add(room_participant::Column::RoomId.eq(room_id))
  13                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  14                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  15
  16            let stale_participant_user_ids = room_participant::Entity::find()
  17                .filter(stale_participant_filter.clone())
  18                .all(&*tx)
  19                .await?
  20                .into_iter()
  21                .map(|participant| participant.user_id)
  22                .collect::<Vec<_>>();
  23
  24            // Delete participants who failed to reconnect and cancel their calls.
  25            let mut canceled_calls_to_user_ids = Vec::new();
  26            room_participant::Entity::delete_many()
  27                .filter(stale_participant_filter)
  28                .exec(&*tx)
  29                .await?;
  30            let called_participants = room_participant::Entity::find()
  31                .filter(
  32                    Condition::all()
  33                        .add(
  34                            room_participant::Column::CallingUserId
  35                                .is_in(stale_participant_user_ids.iter().copied()),
  36                        )
  37                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  38                )
  39                .all(&*tx)
  40                .await?;
  41            room_participant::Entity::delete_many()
  42                .filter(
  43                    room_participant::Column::Id
  44                        .is_in(called_participants.iter().map(|participant| participant.id)),
  45                )
  46                .exec(&*tx)
  47                .await?;
  48            canceled_calls_to_user_ids.extend(
  49                called_participants
  50                    .into_iter()
  51                    .map(|participant| participant.user_id),
  52            );
  53
  54            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  55            let channel_members;
  56            if let Some(channel) = &channel {
  57                channel_members = self.get_channel_participants(channel, &tx).await?;
  58            } else {
  59                channel_members = Vec::new();
  60
  61                // Delete the room if it becomes empty.
  62                if room.participants.is_empty() {
  63                    project::Entity::delete_many()
  64                        .filter(project::Column::RoomId.eq(room_id))
  65                        .exec(&*tx)
  66                        .await?;
  67                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  68                }
  69            };
  70
  71            Ok(RefreshedRoom {
  72                room,
  73                channel_id: channel.map(|channel| channel.id),
  74                channel_members,
  75                stale_participant_user_ids,
  76                canceled_calls_to_user_ids,
  77            })
  78        })
  79        .await
  80    }
  81
  82    /// Returns the incoming calls for user with the given ID.
  83    pub async fn incoming_call_for_user(
  84        &self,
  85        user_id: UserId,
  86    ) -> Result<Option<proto::IncomingCall>> {
  87        self.transaction(|tx| async move {
  88            let pending_participant = room_participant::Entity::find()
  89                .filter(
  90                    room_participant::Column::UserId
  91                        .eq(user_id)
  92                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  93                )
  94                .one(&*tx)
  95                .await?;
  96
  97            if let Some(pending_participant) = pending_participant {
  98                let room = self.get_room(pending_participant.room_id, &tx).await?;
  99                Ok(Self::build_incoming_call(&room, user_id))
 100            } else {
 101                Ok(None)
 102            }
 103        })
 104        .await
 105    }
 106
 107    /// Creates a new room.
 108    pub async fn create_room(
 109        &self,
 110        user_id: UserId,
 111        connection: ConnectionId,
 112        live_kit_room: &str,
 113    ) -> Result<proto::Room> {
 114        self.transaction(|tx| async move {
 115            let room = room::ActiveModel {
 116                live_kit_room: ActiveValue::set(live_kit_room.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            room_participant::ActiveModel {
 122                room_id: ActiveValue::set(room.id),
 123                user_id: ActiveValue::set(user_id),
 124                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 125                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 126                    connection.owner_id as i32,
 127                ))),
 128                answering_connection_lost: ActiveValue::set(false),
 129                calling_user_id: ActiveValue::set(user_id),
 130                calling_connection_id: ActiveValue::set(connection.id as i32),
 131                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 132                    connection.owner_id as i32,
 133                ))),
 134                participant_index: ActiveValue::set(Some(0)),
 135                role: ActiveValue::set(Some(ChannelRole::Admin)),
 136
 137                id: ActiveValue::NotSet,
 138                location_kind: ActiveValue::NotSet,
 139                location_project_id: ActiveValue::NotSet,
 140                initial_project_id: ActiveValue::NotSet,
 141            }
 142            .insert(&*tx)
 143            .await?;
 144
 145            let room = self.get_room(room.id, &tx).await?;
 146            Ok(room)
 147        })
 148        .await
 149    }
 150
 151    pub async fn call(
 152        &self,
 153        room_id: RoomId,
 154        calling_user_id: UserId,
 155        calling_connection: ConnectionId,
 156        called_user_id: UserId,
 157        initial_project_id: Option<ProjectId>,
 158    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 159        self.room_transaction(room_id, |tx| async move {
 160            let caller = room_participant::Entity::find()
 161                .filter(
 162                    room_participant::Column::UserId
 163                        .eq(calling_user_id)
 164                        .and(room_participant::Column::RoomId.eq(room_id)),
 165                )
 166                .one(&*tx)
 167                .await?
 168                .ok_or_else(|| anyhow!("user is not in the room"))?;
 169
 170            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 171                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 172                ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
 173                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 174            };
 175
 176            room_participant::ActiveModel {
 177                room_id: ActiveValue::set(room_id),
 178                user_id: ActiveValue::set(called_user_id),
 179                answering_connection_lost: ActiveValue::set(false),
 180                participant_index: ActiveValue::NotSet,
 181                calling_user_id: ActiveValue::set(calling_user_id),
 182                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 183                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 184                    calling_connection.owner_id as i32,
 185                ))),
 186                initial_project_id: ActiveValue::set(initial_project_id),
 187                role: ActiveValue::set(Some(called_user_role)),
 188
 189                id: ActiveValue::NotSet,
 190                answering_connection_id: ActiveValue::NotSet,
 191                answering_connection_server_id: ActiveValue::NotSet,
 192                location_kind: ActiveValue::NotSet,
 193                location_project_id: ActiveValue::NotSet,
 194            }
 195            .insert(&*tx)
 196            .await?;
 197
 198            let room = self.get_room(room_id, &tx).await?;
 199            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 200                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 201            Ok((room, incoming_call))
 202        })
 203        .await
 204    }
 205
 206    pub async fn call_failed(
 207        &self,
 208        room_id: RoomId,
 209        called_user_id: UserId,
 210    ) -> Result<RoomGuard<proto::Room>> {
 211        self.room_transaction(room_id, |tx| async move {
 212            room_participant::Entity::delete_many()
 213                .filter(
 214                    room_participant::Column::RoomId
 215                        .eq(room_id)
 216                        .and(room_participant::Column::UserId.eq(called_user_id)),
 217                )
 218                .exec(&*tx)
 219                .await?;
 220            let room = self.get_room(room_id, &tx).await?;
 221            Ok(room)
 222        })
 223        .await
 224    }
 225
 226    pub async fn decline_call(
 227        &self,
 228        expected_room_id: Option<RoomId>,
 229        user_id: UserId,
 230    ) -> Result<Option<RoomGuard<proto::Room>>> {
 231        self.optional_room_transaction(|tx| async move {
 232            let mut filter = Condition::all()
 233                .add(room_participant::Column::UserId.eq(user_id))
 234                .add(room_participant::Column::AnsweringConnectionId.is_null());
 235            if let Some(room_id) = expected_room_id {
 236                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 237            }
 238            let participant = room_participant::Entity::find()
 239                .filter(filter)
 240                .one(&*tx)
 241                .await?;
 242
 243            let participant = if let Some(participant) = participant {
 244                participant
 245            } else if expected_room_id.is_some() {
 246                return Err(anyhow!("could not find call to decline"))?;
 247            } else {
 248                return Ok(None);
 249            };
 250
 251            let room_id = participant.room_id;
 252            room_participant::Entity::delete(participant.into_active_model())
 253                .exec(&*tx)
 254                .await?;
 255
 256            let room = self.get_room(room_id, &tx).await?;
 257            Ok(Some((room_id, room)))
 258        })
 259        .await
 260    }
 261
 262    pub async fn cancel_call(
 263        &self,
 264        room_id: RoomId,
 265        calling_connection: ConnectionId,
 266        called_user_id: UserId,
 267    ) -> Result<RoomGuard<proto::Room>> {
 268        self.room_transaction(room_id, |tx| async move {
 269            let participant = room_participant::Entity::find()
 270                .filter(
 271                    Condition::all()
 272                        .add(room_participant::Column::UserId.eq(called_user_id))
 273                        .add(room_participant::Column::RoomId.eq(room_id))
 274                        .add(
 275                            room_participant::Column::CallingConnectionId
 276                                .eq(calling_connection.id as i32),
 277                        )
 278                        .add(
 279                            room_participant::Column::CallingConnectionServerId
 280                                .eq(calling_connection.owner_id as i32),
 281                        )
 282                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 283                )
 284                .one(&*tx)
 285                .await?
 286                .ok_or_else(|| anyhow!("no call to cancel"))?;
 287
 288            room_participant::Entity::delete(participant.into_active_model())
 289                .exec(&*tx)
 290                .await?;
 291
 292            let room = self.get_room(room_id, &tx).await?;
 293            Ok(room)
 294        })
 295        .await
 296    }
 297
 298    pub async fn join_room(
 299        &self,
 300        room_id: RoomId,
 301        user_id: UserId,
 302        connection: ConnectionId,
 303    ) -> Result<RoomGuard<JoinRoom>> {
 304        self.room_transaction(room_id, |tx| async move {
 305            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 306            enum QueryChannelId {
 307                ChannelId,
 308            }
 309
 310            let channel_id: Option<ChannelId> = room::Entity::find()
 311                .select_only()
 312                .column(room::Column::ChannelId)
 313                .filter(room::Column::Id.eq(room_id))
 314                .into_values::<_, QueryChannelId>()
 315                .one(&*tx)
 316                .await?
 317                .ok_or_else(|| anyhow!("no such room"))?;
 318
 319            if channel_id.is_some() {
 320                Err(anyhow!("tried to join channel call directly"))?
 321            }
 322
 323            let participant_index = self
 324                .get_next_participant_index_internal(room_id, &tx)
 325                .await?;
 326
 327            let result = room_participant::Entity::update_many()
 328                .filter(
 329                    Condition::all()
 330                        .add(room_participant::Column::RoomId.eq(room_id))
 331                        .add(room_participant::Column::UserId.eq(user_id))
 332                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 333                )
 334                .set(room_participant::ActiveModel {
 335                    participant_index: ActiveValue::Set(Some(participant_index)),
 336                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 337                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 338                        connection.owner_id as i32,
 339                    ))),
 340                    answering_connection_lost: ActiveValue::set(false),
 341                    ..Default::default()
 342                })
 343                .exec(&*tx)
 344                .await?;
 345            if result.rows_affected == 0 {
 346                Err(anyhow!("room does not exist or was already joined"))?;
 347            }
 348
 349            let room = self.get_room(room_id, &tx).await?;
 350            Ok(JoinRoom {
 351                room,
 352                channel_id: None,
 353                channel_members: vec![],
 354            })
 355        })
 356        .await
 357    }
 358
 359    async fn get_next_participant_index_internal(
 360        &self,
 361        room_id: RoomId,
 362        tx: &DatabaseTransaction,
 363    ) -> Result<i32> {
 364        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 365        enum QueryParticipantIndices {
 366            ParticipantIndex,
 367        }
 368        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 369            .filter(
 370                room_participant::Column::RoomId
 371                    .eq(room_id)
 372                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 373            )
 374            .select_only()
 375            .column(room_participant::Column::ParticipantIndex)
 376            .into_values::<_, QueryParticipantIndices>()
 377            .all(tx)
 378            .await?;
 379
 380        let mut participant_index = 0;
 381        while existing_participant_indices.contains(&participant_index) {
 382            participant_index += 1;
 383        }
 384
 385        Ok(participant_index)
 386    }
 387
 388    /// Returns the channel ID for the given room, if it has one.
 389    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 390        self.transaction(|tx| async move {
 391            let room: Option<room::Model> = room::Entity::find()
 392                .filter(room::Column::Id.eq(room_id))
 393                .one(&*tx)
 394                .await?;
 395
 396            Ok(room.and_then(|room| room.channel_id))
 397        })
 398        .await
 399    }
 400
 401    pub(crate) async fn join_channel_room_internal(
 402        &self,
 403        room_id: RoomId,
 404        user_id: UserId,
 405        connection: ConnectionId,
 406        role: ChannelRole,
 407        tx: &DatabaseTransaction,
 408    ) -> Result<JoinRoom> {
 409        let participant_index = self
 410            .get_next_participant_index_internal(room_id, tx)
 411            .await?;
 412
 413        room_participant::Entity::insert_many([room_participant::ActiveModel {
 414            room_id: ActiveValue::set(room_id),
 415            user_id: ActiveValue::set(user_id),
 416            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 417            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 418                connection.owner_id as i32,
 419            ))),
 420            answering_connection_lost: ActiveValue::set(false),
 421            calling_user_id: ActiveValue::set(user_id),
 422            calling_connection_id: ActiveValue::set(connection.id as i32),
 423            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 424                connection.owner_id as i32,
 425            ))),
 426            participant_index: ActiveValue::Set(Some(participant_index)),
 427            role: ActiveValue::set(Some(role)),
 428            id: ActiveValue::NotSet,
 429            location_kind: ActiveValue::NotSet,
 430            location_project_id: ActiveValue::NotSet,
 431            initial_project_id: ActiveValue::NotSet,
 432        }])
 433        .on_conflict(
 434            OnConflict::columns([room_participant::Column::UserId])
 435                .update_columns([
 436                    room_participant::Column::AnsweringConnectionId,
 437                    room_participant::Column::AnsweringConnectionServerId,
 438                    room_participant::Column::AnsweringConnectionLost,
 439                    room_participant::Column::ParticipantIndex,
 440                    room_participant::Column::Role,
 441                ])
 442                .to_owned(),
 443        )
 444        .exec(tx)
 445        .await?;
 446
 447        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 448        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 449        let channel_members = self.get_channel_participants(&channel, tx).await?;
 450        Ok(JoinRoom {
 451            room,
 452            channel_id: Some(channel.id),
 453            channel_members,
 454        })
 455    }
 456
 457    pub async fn rejoin_room(
 458        &self,
 459        rejoin_room: proto::RejoinRoom,
 460        user_id: UserId,
 461        connection: ConnectionId,
 462    ) -> Result<RoomGuard<RejoinedRoom>> {
 463        let room_id = RoomId::from_proto(rejoin_room.id);
 464        self.room_transaction(room_id, |tx| async {
 465            let tx = tx;
 466            let participant_update = room_participant::Entity::update_many()
 467                .filter(
 468                    Condition::all()
 469                        .add(room_participant::Column::RoomId.eq(room_id))
 470                        .add(room_participant::Column::UserId.eq(user_id))
 471                        .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
 472                )
 473                .set(room_participant::ActiveModel {
 474                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 475                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 476                        connection.owner_id as i32,
 477                    ))),
 478                    answering_connection_lost: ActiveValue::set(false),
 479                    ..Default::default()
 480                })
 481                .exec(&*tx)
 482                .await?;
 483            if participant_update.rows_affected == 0 {
 484                return Err(anyhow!("room does not exist or was already joined"))?;
 485            }
 486
 487            let mut reshared_projects = Vec::new();
 488            for reshared_project in &rejoin_room.reshared_projects {
 489                let project_id = ProjectId::from_proto(reshared_project.project_id);
 490                let project = project::Entity::find_by_id(project_id)
 491                    .one(&*tx)
 492                    .await?
 493                    .ok_or_else(|| anyhow!("project does not exist"))?;
 494                if project.host_user_id != Some(user_id) {
 495                    return Err(anyhow!("no such project"))?;
 496                }
 497
 498                let mut collaborators = project
 499                    .find_related(project_collaborator::Entity)
 500                    .all(&*tx)
 501                    .await?;
 502                let host_ix = collaborators
 503                    .iter()
 504                    .position(|collaborator| {
 505                        collaborator.user_id == user_id && collaborator.is_host
 506                    })
 507                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 508                let host = collaborators.swap_remove(host_ix);
 509                let old_connection_id = host.connection();
 510
 511                project::Entity::update(project::ActiveModel {
 512                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 513                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 514                        connection.owner_id as i32,
 515                    ))),
 516                    ..project.into_active_model()
 517                })
 518                .exec(&*tx)
 519                .await?;
 520                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 521                    connection_id: ActiveValue::set(connection.id as i32),
 522                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 523                    ..host.into_active_model()
 524                })
 525                .exec(&*tx)
 526                .await?;
 527
 528                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 529                    .await?;
 530
 531                reshared_projects.push(ResharedProject {
 532                    id: project_id,
 533                    old_connection_id,
 534                    collaborators: collaborators
 535                        .iter()
 536                        .map(|collaborator| ProjectCollaborator {
 537                            connection_id: collaborator.connection(),
 538                            user_id: collaborator.user_id,
 539                            replica_id: collaborator.replica_id,
 540                            is_host: collaborator.is_host,
 541                        })
 542                        .collect(),
 543                    worktrees: reshared_project.worktrees.clone(),
 544                });
 545            }
 546
 547            project::Entity::delete_many()
 548                .filter(
 549                    Condition::all()
 550                        .add(project::Column::RoomId.eq(room_id))
 551                        .add(project::Column::HostUserId.eq(user_id))
 552                        .add(
 553                            project::Column::Id
 554                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 555                        ),
 556                )
 557                .exec(&*tx)
 558                .await?;
 559
 560            let mut rejoined_projects = Vec::new();
 561            for rejoined_project in &rejoin_room.rejoined_projects {
 562                let project_id = ProjectId::from_proto(rejoined_project.id);
 563                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 564                    continue;
 565                };
 566
 567                let mut worktrees = Vec::new();
 568                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 569                for db_worktree in db_worktrees {
 570                    let mut worktree = RejoinedWorktree {
 571                        id: db_worktree.id as u64,
 572                        abs_path: db_worktree.abs_path,
 573                        root_name: db_worktree.root_name,
 574                        visible: db_worktree.visible,
 575                        updated_entries: Default::default(),
 576                        removed_entries: Default::default(),
 577                        updated_repositories: Default::default(),
 578                        removed_repositories: Default::default(),
 579                        diagnostic_summaries: Default::default(),
 580                        settings_files: Default::default(),
 581                        scan_id: db_worktree.scan_id as u64,
 582                        completed_scan_id: db_worktree.completed_scan_id as u64,
 583                    };
 584
 585                    let rejoined_worktree = rejoined_project
 586                        .worktrees
 587                        .iter()
 588                        .find(|worktree| worktree.id == db_worktree.id as u64);
 589
 590                    // File entries
 591                    {
 592                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 593                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 594                        } else {
 595                            worktree_entry::Column::IsDeleted.eq(false)
 596                        };
 597
 598                        let mut db_entries = worktree_entry::Entity::find()
 599                            .filter(
 600                                Condition::all()
 601                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 602                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 603                                    .add(entry_filter),
 604                            )
 605                            .stream(&*tx)
 606                            .await?;
 607
 608                        while let Some(db_entry) = db_entries.next().await {
 609                            let db_entry = db_entry?;
 610                            if db_entry.is_deleted {
 611                                worktree.removed_entries.push(db_entry.id as u64);
 612                            } else {
 613                                worktree.updated_entries.push(proto::Entry {
 614                                    id: db_entry.id as u64,
 615                                    is_dir: db_entry.is_dir,
 616                                    path: db_entry.path,
 617                                    inode: db_entry.inode as u64,
 618                                    mtime: Some(proto::Timestamp {
 619                                        seconds: db_entry.mtime_seconds as u64,
 620                                        nanos: db_entry.mtime_nanos as u32,
 621                                    }),
 622                                    is_symlink: db_entry.is_symlink,
 623                                    is_ignored: db_entry.is_ignored,
 624                                    is_external: db_entry.is_external,
 625                                    git_status: db_entry.git_status.map(|status| status as i32),
 626                                });
 627                            }
 628                        }
 629                    }
 630
 631                    // Repository Entries
 632                    {
 633                        let repository_entry_filter =
 634                            if let Some(rejoined_worktree) = rejoined_worktree {
 635                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 636                            } else {
 637                                worktree_repository::Column::IsDeleted.eq(false)
 638                            };
 639
 640                        let mut db_repositories = worktree_repository::Entity::find()
 641                            .filter(
 642                                Condition::all()
 643                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 644                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 645                                    .add(repository_entry_filter),
 646                            )
 647                            .stream(&*tx)
 648                            .await?;
 649
 650                        while let Some(db_repository) = db_repositories.next().await {
 651                            let db_repository = db_repository?;
 652                            if db_repository.is_deleted {
 653                                worktree
 654                                    .removed_repositories
 655                                    .push(db_repository.work_directory_id as u64);
 656                            } else {
 657                                worktree.updated_repositories.push(proto::RepositoryEntry {
 658                                    work_directory_id: db_repository.work_directory_id as u64,
 659                                    branch: db_repository.branch,
 660                                });
 661                            }
 662                        }
 663                    }
 664
 665                    worktrees.push(worktree);
 666                }
 667
 668                let language_servers = project
 669                    .find_related(language_server::Entity)
 670                    .all(&*tx)
 671                    .await?
 672                    .into_iter()
 673                    .map(|language_server| proto::LanguageServer {
 674                        id: language_server.id as u64,
 675                        name: language_server.name,
 676                    })
 677                    .collect::<Vec<_>>();
 678
 679                {
 680                    let mut db_settings_files = worktree_settings_file::Entity::find()
 681                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 682                        .stream(&*tx)
 683                        .await?;
 684                    while let Some(db_settings_file) = db_settings_files.next().await {
 685                        let db_settings_file = db_settings_file?;
 686                        if let Some(worktree) = worktrees
 687                            .iter_mut()
 688                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 689                        {
 690                            worktree.settings_files.push(WorktreeSettingsFile {
 691                                path: db_settings_file.path,
 692                                content: db_settings_file.content,
 693                            });
 694                        }
 695                    }
 696                }
 697
 698                let mut collaborators = project
 699                    .find_related(project_collaborator::Entity)
 700                    .all(&*tx)
 701                    .await?;
 702                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 703                    .iter()
 704                    .position(|collaborator| collaborator.user_id == user_id)
 705                {
 706                    collaborators.swap_remove(self_collaborator_ix)
 707                } else {
 708                    continue;
 709                };
 710                let old_connection_id = self_collaborator.connection();
 711                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 712                    connection_id: ActiveValue::set(connection.id as i32),
 713                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 714                    ..self_collaborator.into_active_model()
 715                })
 716                .exec(&*tx)
 717                .await?;
 718
 719                let collaborators = collaborators
 720                    .into_iter()
 721                    .map(|collaborator| ProjectCollaborator {
 722                        connection_id: collaborator.connection(),
 723                        user_id: collaborator.user_id,
 724                        replica_id: collaborator.replica_id,
 725                        is_host: collaborator.is_host,
 726                    })
 727                    .collect::<Vec<_>>();
 728
 729                rejoined_projects.push(RejoinedProject {
 730                    id: project_id,
 731                    old_connection_id,
 732                    collaborators,
 733                    worktrees,
 734                    language_servers,
 735                });
 736            }
 737
 738            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 739            let channel_members = if let Some(channel) = &channel {
 740                self.get_channel_participants(&channel, &tx).await?
 741            } else {
 742                Vec::new()
 743            };
 744
 745            Ok(RejoinedRoom {
 746                room,
 747                channel_id: channel.map(|channel| channel.id),
 748                channel_members,
 749                rejoined_projects,
 750                reshared_projects,
 751            })
 752        })
 753        .await
 754    }
 755
 756    pub async fn leave_room(
 757        &self,
 758        connection: ConnectionId,
 759    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 760        self.optional_room_transaction(|tx| async move {
 761            let leaving_participant = room_participant::Entity::find()
 762                .filter(
 763                    Condition::all()
 764                        .add(
 765                            room_participant::Column::AnsweringConnectionId
 766                                .eq(connection.id as i32),
 767                        )
 768                        .add(
 769                            room_participant::Column::AnsweringConnectionServerId
 770                                .eq(connection.owner_id as i32),
 771                        ),
 772                )
 773                .one(&*tx)
 774                .await?;
 775
 776            if let Some(leaving_participant) = leaving_participant {
 777                // Leave room.
 778                let room_id = leaving_participant.room_id;
 779                room_participant::Entity::delete_by_id(leaving_participant.id)
 780                    .exec(&*tx)
 781                    .await?;
 782
 783                // Cancel pending calls initiated by the leaving user.
 784                let called_participants = room_participant::Entity::find()
 785                    .filter(
 786                        Condition::all()
 787                            .add(
 788                                room_participant::Column::CallingUserId
 789                                    .eq(leaving_participant.user_id),
 790                            )
 791                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 792                    )
 793                    .all(&*tx)
 794                    .await?;
 795                room_participant::Entity::delete_many()
 796                    .filter(
 797                        room_participant::Column::Id
 798                            .is_in(called_participants.iter().map(|participant| participant.id)),
 799                    )
 800                    .exec(&*tx)
 801                    .await?;
 802                let canceled_calls_to_user_ids = called_participants
 803                    .into_iter()
 804                    .map(|participant| participant.user_id)
 805                    .collect();
 806
 807                // Detect left projects.
 808                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 809                enum QueryProjectIds {
 810                    ProjectId,
 811                }
 812                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 813                    .select_only()
 814                    .column_as(
 815                        project_collaborator::Column::ProjectId,
 816                        QueryProjectIds::ProjectId,
 817                    )
 818                    .filter(
 819                        Condition::all()
 820                            .add(
 821                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 822                            )
 823                            .add(
 824                                project_collaborator::Column::ConnectionServerId
 825                                    .eq(connection.owner_id as i32),
 826                            ),
 827                    )
 828                    .into_values::<_, QueryProjectIds>()
 829                    .all(&*tx)
 830                    .await?;
 831                let mut left_projects = HashMap::default();
 832                let mut collaborators = project_collaborator::Entity::find()
 833                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 834                    .stream(&*tx)
 835                    .await?;
 836                while let Some(collaborator) = collaborators.next().await {
 837                    let collaborator = collaborator?;
 838                    let left_project =
 839                        left_projects
 840                            .entry(collaborator.project_id)
 841                            .or_insert(LeftProject {
 842                                id: collaborator.project_id,
 843                                host_user_id: Default::default(),
 844                                connection_ids: Default::default(),
 845                                host_connection_id: None,
 846                            });
 847
 848                    let collaborator_connection_id = collaborator.connection();
 849                    if collaborator_connection_id != connection {
 850                        left_project.connection_ids.push(collaborator_connection_id);
 851                    }
 852
 853                    if collaborator.is_host {
 854                        left_project.host_user_id = Some(collaborator.user_id);
 855                        left_project.host_connection_id = Some(collaborator_connection_id);
 856                    }
 857                }
 858                drop(collaborators);
 859
 860                // Leave projects.
 861                project_collaborator::Entity::delete_many()
 862                    .filter(
 863                        Condition::all()
 864                            .add(
 865                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 866                            )
 867                            .add(
 868                                project_collaborator::Column::ConnectionServerId
 869                                    .eq(connection.owner_id as i32),
 870                            ),
 871                    )
 872                    .exec(&*tx)
 873                    .await?;
 874
 875                follower::Entity::delete_many()
 876                    .filter(
 877                        Condition::all()
 878                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
 879                    )
 880                    .exec(&*tx)
 881                    .await?;
 882
 883                // Unshare projects.
 884                project::Entity::delete_many()
 885                    .filter(
 886                        Condition::all()
 887                            .add(project::Column::RoomId.eq(room_id))
 888                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 889                            .add(
 890                                project::Column::HostConnectionServerId
 891                                    .eq(connection.owner_id as i32),
 892                            ),
 893                    )
 894                    .exec(&*tx)
 895                    .await?;
 896
 897                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 898                let deleted = if room.participants.is_empty() {
 899                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 900                    result.rows_affected > 0
 901                } else {
 902                    false
 903                };
 904
 905                let channel_members = if let Some(channel) = &channel {
 906                    self.get_channel_participants(channel, &tx).await?
 907                } else {
 908                    Vec::new()
 909                };
 910                let left_room = LeftRoom {
 911                    room,
 912                    channel_id: channel.map(|channel| channel.id),
 913                    channel_members,
 914                    left_projects,
 915                    canceled_calls_to_user_ids,
 916                    deleted,
 917                };
 918
 919                if left_room.room.participants.is_empty() {
 920                    self.rooms.remove(&room_id);
 921                }
 922
 923                Ok(Some((room_id, left_room)))
 924            } else {
 925                Ok(None)
 926            }
 927        })
 928        .await
 929    }
 930
 931    /// Updates the location of a participant in the given room.
 932    pub async fn update_room_participant_location(
 933        &self,
 934        room_id: RoomId,
 935        connection: ConnectionId,
 936        location: proto::ParticipantLocation,
 937    ) -> Result<RoomGuard<proto::Room>> {
 938        self.room_transaction(room_id, |tx| async {
 939            let tx = tx;
 940            let location_kind;
 941            let location_project_id;
 942            match location
 943                .variant
 944                .as_ref()
 945                .ok_or_else(|| anyhow!("invalid location"))?
 946            {
 947                proto::participant_location::Variant::SharedProject(project) => {
 948                    location_kind = 0;
 949                    location_project_id = Some(ProjectId::from_proto(project.id));
 950                }
 951                proto::participant_location::Variant::UnsharedProject(_) => {
 952                    location_kind = 1;
 953                    location_project_id = None;
 954                }
 955                proto::participant_location::Variant::External(_) => {
 956                    location_kind = 2;
 957                    location_project_id = None;
 958                }
 959            }
 960
 961            let result = room_participant::Entity::update_many()
 962                .filter(
 963                    Condition::all()
 964                        .add(room_participant::Column::RoomId.eq(room_id))
 965                        .add(
 966                            room_participant::Column::AnsweringConnectionId
 967                                .eq(connection.id as i32),
 968                        )
 969                        .add(
 970                            room_participant::Column::AnsweringConnectionServerId
 971                                .eq(connection.owner_id as i32),
 972                        ),
 973                )
 974                .set(room_participant::ActiveModel {
 975                    location_kind: ActiveValue::set(Some(location_kind)),
 976                    location_project_id: ActiveValue::set(location_project_id),
 977                    ..Default::default()
 978                })
 979                .exec(&*tx)
 980                .await?;
 981
 982            if result.rows_affected == 1 {
 983                let room = self.get_room(room_id, &tx).await?;
 984                Ok(room)
 985            } else {
 986                Err(anyhow!("could not update room participant location"))?
 987            }
 988        })
 989        .await
 990    }
 991
 992    /// Sets the role of a participant in the given room.
 993    pub async fn set_room_participant_role(
 994        &self,
 995        admin_id: UserId,
 996        room_id: RoomId,
 997        user_id: UserId,
 998        role: ChannelRole,
 999    ) -> Result<RoomGuard<proto::Room>> {
1000        self.room_transaction(room_id, |tx| async move {
1001            room_participant::Entity::find()
1002                .filter(
1003                    Condition::all()
1004                        .add(room_participant::Column::RoomId.eq(room_id))
1005                        .add(room_participant::Column::UserId.eq(admin_id))
1006                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1007                )
1008                .one(&*tx)
1009                .await?
1010                .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1011
1012            if role.requires_cla() {
1013                self.check_user_has_signed_cla(user_id, room_id, &tx)
1014                    .await?;
1015            }
1016
1017            let result = room_participant::Entity::update_many()
1018                .filter(
1019                    Condition::all()
1020                        .add(room_participant::Column::RoomId.eq(room_id))
1021                        .add(room_participant::Column::UserId.eq(user_id)),
1022                )
1023                .set(room_participant::ActiveModel {
1024                    role: ActiveValue::set(Some(role)),
1025                    ..Default::default()
1026                })
1027                .exec(&*tx)
1028                .await?;
1029
1030            if result.rows_affected != 1 {
1031                Err(anyhow!("could not update room participant role"))?;
1032            }
1033            self.get_room(room_id, &tx).await
1034        })
1035        .await
1036    }
1037
1038    async fn check_user_has_signed_cla(
1039        &self,
1040        user_id: UserId,
1041        room_id: RoomId,
1042        tx: &DatabaseTransaction,
1043    ) -> Result<()> {
1044        let channel = room::Entity::find_by_id(room_id)
1045            .one(tx)
1046            .await?
1047            .ok_or_else(|| anyhow!("could not find room"))?
1048            .find_related(channel::Entity)
1049            .one(tx)
1050            .await?;
1051
1052        if let Some(channel) = channel {
1053            let requires_zed_cla = channel.requires_zed_cla
1054                || channel::Entity::find()
1055                    .filter(
1056                        channel::Column::Id
1057                            .is_in(channel.ancestors())
1058                            .and(channel::Column::RequiresZedCla.eq(true)),
1059                    )
1060                    .count(tx)
1061                    .await?
1062                    > 0;
1063            if requires_zed_cla {
1064                if contributor::Entity::find()
1065                    .filter(contributor::Column::UserId.eq(user_id))
1066                    .one(tx)
1067                    .await?
1068                    .is_none()
1069                {
1070                    Err(anyhow!("user has not signed the Zed CLA"))?;
1071                }
1072            }
1073        }
1074        Ok(())
1075    }
1076
1077    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1078        self.transaction(|tx| async move {
1079            self.room_connection_lost(connection, &tx).await?;
1080            self.channel_buffer_connection_lost(connection, &tx).await?;
1081            self.channel_chat_connection_lost(connection, &tx).await?;
1082            Ok(())
1083        })
1084        .await
1085    }
1086
1087    pub async fn room_connection_lost(
1088        &self,
1089        connection: ConnectionId,
1090        tx: &DatabaseTransaction,
1091    ) -> Result<()> {
1092        let participant = room_participant::Entity::find()
1093            .filter(
1094                Condition::all()
1095                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1096                    .add(
1097                        room_participant::Column::AnsweringConnectionServerId
1098                            .eq(connection.owner_id as i32),
1099                    ),
1100            )
1101            .one(tx)
1102            .await?;
1103
1104        if let Some(participant) = participant {
1105            room_participant::Entity::update(room_participant::ActiveModel {
1106                answering_connection_lost: ActiveValue::set(true),
1107                ..participant.into_active_model()
1108            })
1109            .exec(tx)
1110            .await?;
1111        }
1112        Ok(())
1113    }
1114
1115    fn build_incoming_call(
1116        room: &proto::Room,
1117        called_user_id: UserId,
1118    ) -> Option<proto::IncomingCall> {
1119        let pending_participant = room
1120            .pending_participants
1121            .iter()
1122            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1123
1124        Some(proto::IncomingCall {
1125            room_id: room.id,
1126            calling_user_id: pending_participant.calling_user_id,
1127            participant_user_ids: room
1128                .participants
1129                .iter()
1130                .map(|participant| participant.user_id)
1131                .collect(),
1132            initial_project: room.participants.iter().find_map(|participant| {
1133                let initial_project_id = pending_participant.initial_project_id?;
1134                participant
1135                    .projects
1136                    .iter()
1137                    .find(|project| project.id == initial_project_id)
1138                    .cloned()
1139            }),
1140        })
1141    }
1142
1143    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1144        let (_, room) = self.get_channel_room(room_id, tx).await?;
1145        Ok(room)
1146    }
1147
1148    pub async fn room_connection_ids(
1149        &self,
1150        room_id: RoomId,
1151        connection_id: ConnectionId,
1152    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1153        self.room_transaction(room_id, |tx| async move {
1154            let mut participants = room_participant::Entity::find()
1155                .filter(room_participant::Column::RoomId.eq(room_id))
1156                .stream(&*tx)
1157                .await?;
1158
1159            let mut is_participant = false;
1160            let mut connection_ids = HashSet::default();
1161            while let Some(participant) = participants.next().await {
1162                let participant = participant?;
1163                if let Some(answering_connection) = participant.answering_connection() {
1164                    if answering_connection == connection_id {
1165                        is_participant = true;
1166                    } else {
1167                        connection_ids.insert(answering_connection);
1168                    }
1169                }
1170            }
1171
1172            if !is_participant {
1173                Err(anyhow!("not a room participant"))?;
1174            }
1175
1176            Ok(connection_ids)
1177        })
1178        .await
1179    }
1180
1181    async fn get_channel_room(
1182        &self,
1183        room_id: RoomId,
1184        tx: &DatabaseTransaction,
1185    ) -> Result<(Option<channel::Model>, proto::Room)> {
1186        let db_room = room::Entity::find_by_id(room_id)
1187            .one(tx)
1188            .await?
1189            .ok_or_else(|| anyhow!("could not find room"))?;
1190
1191        let mut db_participants = db_room
1192            .find_related(room_participant::Entity)
1193            .stream(tx)
1194            .await?;
1195        let mut participants = HashMap::default();
1196        let mut pending_participants = Vec::new();
1197        while let Some(db_participant) = db_participants.next().await {
1198            let db_participant = db_participant?;
1199            if let (
1200                Some(answering_connection_id),
1201                Some(answering_connection_server_id),
1202                Some(participant_index),
1203            ) = (
1204                db_participant.answering_connection_id,
1205                db_participant.answering_connection_server_id,
1206                db_participant.participant_index,
1207            ) {
1208                let location = match (
1209                    db_participant.location_kind,
1210                    db_participant.location_project_id,
1211                ) {
1212                    (Some(0), Some(project_id)) => {
1213                        Some(proto::participant_location::Variant::SharedProject(
1214                            proto::participant_location::SharedProject {
1215                                id: project_id.to_proto(),
1216                            },
1217                        ))
1218                    }
1219                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1220                        Default::default(),
1221                    )),
1222                    _ => Some(proto::participant_location::Variant::External(
1223                        Default::default(),
1224                    )),
1225                };
1226
1227                let answering_connection = ConnectionId {
1228                    owner_id: answering_connection_server_id.0 as u32,
1229                    id: answering_connection_id as u32,
1230                };
1231                participants.insert(
1232                    answering_connection,
1233                    proto::Participant {
1234                        user_id: db_participant.user_id.to_proto(),
1235                        peer_id: Some(answering_connection.into()),
1236                        projects: Default::default(),
1237                        location: Some(proto::ParticipantLocation { variant: location }),
1238                        participant_index: participant_index as u32,
1239                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1240                    },
1241                );
1242            } else {
1243                pending_participants.push(proto::PendingParticipant {
1244                    user_id: db_participant.user_id.to_proto(),
1245                    calling_user_id: db_participant.calling_user_id.to_proto(),
1246                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1247                });
1248            }
1249        }
1250        drop(db_participants);
1251
1252        let mut db_projects = db_room
1253            .find_related(project::Entity)
1254            .find_with_related(worktree::Entity)
1255            .stream(tx)
1256            .await?;
1257
1258        while let Some(row) = db_projects.next().await {
1259            let (db_project, db_worktree) = row?;
1260            let host_connection = db_project.host_connection()?;
1261            if let Some(participant) = participants.get_mut(&host_connection) {
1262                let project = if let Some(project) = participant
1263                    .projects
1264                    .iter_mut()
1265                    .find(|project| project.id == db_project.id.to_proto())
1266                {
1267                    project
1268                } else {
1269                    participant.projects.push(proto::ParticipantProject {
1270                        id: db_project.id.to_proto(),
1271                        worktree_root_names: Default::default(),
1272                    });
1273                    participant.projects.last_mut().unwrap()
1274                };
1275
1276                if let Some(db_worktree) = db_worktree {
1277                    if db_worktree.visible {
1278                        project.worktree_root_names.push(db_worktree.root_name);
1279                    }
1280                }
1281            }
1282        }
1283        drop(db_projects);
1284
1285        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1286        let mut followers = Vec::new();
1287        while let Some(db_follower) = db_followers.next().await {
1288            let db_follower = db_follower?;
1289            followers.push(proto::Follower {
1290                leader_id: Some(db_follower.leader_connection().into()),
1291                follower_id: Some(db_follower.follower_connection().into()),
1292                project_id: db_follower.project_id.to_proto(),
1293            });
1294        }
1295        drop(db_followers);
1296
1297        let channel = if let Some(channel_id) = db_room.channel_id {
1298            Some(self.get_channel_internal(channel_id, tx).await?)
1299        } else {
1300            None
1301        };
1302
1303        Ok((
1304            channel,
1305            proto::Room {
1306                id: db_room.id.to_proto(),
1307                live_kit_room: db_room.live_kit_room,
1308                participants: participants.into_values().collect(),
1309                pending_participants,
1310                followers,
1311            },
1312        ))
1313    }
1314}