rooms.rs

   1use super::*;
   2
   3impl Database {
   4    /// Clears all room participants in rooms attached to a stale server.
   5    pub async fn clear_stale_room_participants(
   6        &self,
   7        room_id: RoomId,
   8        new_server_id: ServerId,
   9    ) -> Result<RoomGuard<RefreshedRoom>> {
  10        self.room_transaction(room_id, |tx| async move {
  11            let stale_participant_filter = Condition::all()
  12                .add(room_participant::Column::RoomId.eq(room_id))
  13                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  14                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  15
  16            let stale_participant_user_ids = room_participant::Entity::find()
  17                .filter(stale_participant_filter.clone())
  18                .all(&*tx)
  19                .await?
  20                .into_iter()
  21                .map(|participant| participant.user_id)
  22                .collect::<Vec<_>>();
  23
  24            // Delete participants who failed to reconnect and cancel their calls.
  25            let mut canceled_calls_to_user_ids = Vec::new();
  26            room_participant::Entity::delete_many()
  27                .filter(stale_participant_filter)
  28                .exec(&*tx)
  29                .await?;
  30            let called_participants = room_participant::Entity::find()
  31                .filter(
  32                    Condition::all()
  33                        .add(
  34                            room_participant::Column::CallingUserId
  35                                .is_in(stale_participant_user_ids.iter().copied()),
  36                        )
  37                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  38                )
  39                .all(&*tx)
  40                .await?;
  41            room_participant::Entity::delete_many()
  42                .filter(
  43                    room_participant::Column::Id
  44                        .is_in(called_participants.iter().map(|participant| participant.id)),
  45                )
  46                .exec(&*tx)
  47                .await?;
  48            canceled_calls_to_user_ids.extend(
  49                called_participants
  50                    .into_iter()
  51                    .map(|participant| participant.user_id),
  52            );
  53
  54            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  55            let channel_members;
  56            if let Some(channel) = &channel {
  57                channel_members = self.get_channel_participants(channel, &tx).await?;
  58            } else {
  59                channel_members = Vec::new();
  60
  61                // Delete the room if it becomes empty.
  62                if room.participants.is_empty() {
  63                    project::Entity::delete_many()
  64                        .filter(project::Column::RoomId.eq(room_id))
  65                        .exec(&*tx)
  66                        .await?;
  67                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  68                }
  69            };
  70
  71            Ok(RefreshedRoom {
  72                room,
  73                channel_id: channel.map(|channel| channel.id),
  74                channel_members,
  75                stale_participant_user_ids,
  76                canceled_calls_to_user_ids,
  77            })
  78        })
  79        .await
  80    }
  81
  82    /// Returns the incoming calls for user with the given ID.
  83    pub async fn incoming_call_for_user(
  84        &self,
  85        user_id: UserId,
  86    ) -> Result<Option<proto::IncomingCall>> {
  87        self.transaction(|tx| async move {
  88            let pending_participant = room_participant::Entity::find()
  89                .filter(
  90                    room_participant::Column::UserId
  91                        .eq(user_id)
  92                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  93                )
  94                .one(&*tx)
  95                .await?;
  96
  97            if let Some(pending_participant) = pending_participant {
  98                let room = self.get_room(pending_participant.room_id, &tx).await?;
  99                Ok(Self::build_incoming_call(&room, user_id))
 100            } else {
 101                Ok(None)
 102            }
 103        })
 104        .await
 105    }
 106
 107    /// Creates a new room.
 108    pub async fn create_room(
 109        &self,
 110        user_id: UserId,
 111        connection: ConnectionId,
 112        live_kit_room: &str,
 113    ) -> Result<proto::Room> {
 114        self.transaction(|tx| async move {
 115            let room = room::ActiveModel {
 116                live_kit_room: ActiveValue::set(live_kit_room.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            room_participant::ActiveModel {
 122                room_id: ActiveValue::set(room.id),
 123                user_id: ActiveValue::set(user_id),
 124                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 125                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 126                    connection.owner_id as i32,
 127                ))),
 128                answering_connection_lost: ActiveValue::set(false),
 129                calling_user_id: ActiveValue::set(user_id),
 130                calling_connection_id: ActiveValue::set(connection.id as i32),
 131                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 132                    connection.owner_id as i32,
 133                ))),
 134                participant_index: ActiveValue::set(Some(0)),
 135                role: ActiveValue::set(Some(ChannelRole::Admin)),
 136
 137                id: ActiveValue::NotSet,
 138                location_kind: ActiveValue::NotSet,
 139                location_project_id: ActiveValue::NotSet,
 140                initial_project_id: ActiveValue::NotSet,
 141            }
 142            .insert(&*tx)
 143            .await?;
 144
 145            let room = self.get_room(room.id, &tx).await?;
 146            Ok(room)
 147        })
 148        .await
 149    }
 150
 151    pub async fn call(
 152        &self,
 153        room_id: RoomId,
 154        calling_user_id: UserId,
 155        calling_connection: ConnectionId,
 156        called_user_id: UserId,
 157        initial_project_id: Option<ProjectId>,
 158    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 159        self.room_transaction(room_id, |tx| async move {
 160            let caller = room_participant::Entity::find()
 161                .filter(
 162                    room_participant::Column::UserId
 163                        .eq(calling_user_id)
 164                        .and(room_participant::Column::RoomId.eq(room_id)),
 165                )
 166                .one(&*tx)
 167                .await?
 168                .ok_or_else(|| anyhow!("user is not in the room"))?;
 169
 170            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 171                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 172                ChannelRole::Guest => ChannelRole::Guest,
 173                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 174            };
 175
 176            room_participant::ActiveModel {
 177                room_id: ActiveValue::set(room_id),
 178                user_id: ActiveValue::set(called_user_id),
 179                answering_connection_lost: ActiveValue::set(false),
 180                participant_index: ActiveValue::NotSet,
 181                calling_user_id: ActiveValue::set(calling_user_id),
 182                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 183                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 184                    calling_connection.owner_id as i32,
 185                ))),
 186                initial_project_id: ActiveValue::set(initial_project_id),
 187                role: ActiveValue::set(Some(called_user_role)),
 188
 189                id: ActiveValue::NotSet,
 190                answering_connection_id: ActiveValue::NotSet,
 191                answering_connection_server_id: ActiveValue::NotSet,
 192                location_kind: ActiveValue::NotSet,
 193                location_project_id: ActiveValue::NotSet,
 194            }
 195            .insert(&*tx)
 196            .await?;
 197
 198            let room = self.get_room(room_id, &tx).await?;
 199            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 200                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 201            Ok((room, incoming_call))
 202        })
 203        .await
 204    }
 205
 206    pub async fn call_failed(
 207        &self,
 208        room_id: RoomId,
 209        called_user_id: UserId,
 210    ) -> Result<RoomGuard<proto::Room>> {
 211        self.room_transaction(room_id, |tx| async move {
 212            room_participant::Entity::delete_many()
 213                .filter(
 214                    room_participant::Column::RoomId
 215                        .eq(room_id)
 216                        .and(room_participant::Column::UserId.eq(called_user_id)),
 217                )
 218                .exec(&*tx)
 219                .await?;
 220            let room = self.get_room(room_id, &tx).await?;
 221            Ok(room)
 222        })
 223        .await
 224    }
 225
 226    pub async fn decline_call(
 227        &self,
 228        expected_room_id: Option<RoomId>,
 229        user_id: UserId,
 230    ) -> Result<Option<RoomGuard<proto::Room>>> {
 231        self.optional_room_transaction(|tx| async move {
 232            let mut filter = Condition::all()
 233                .add(room_participant::Column::UserId.eq(user_id))
 234                .add(room_participant::Column::AnsweringConnectionId.is_null());
 235            if let Some(room_id) = expected_room_id {
 236                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 237            }
 238            let participant = room_participant::Entity::find()
 239                .filter(filter)
 240                .one(&*tx)
 241                .await?;
 242
 243            let participant = if let Some(participant) = participant {
 244                participant
 245            } else if expected_room_id.is_some() {
 246                return Err(anyhow!("could not find call to decline"))?;
 247            } else {
 248                return Ok(None);
 249            };
 250
 251            let room_id = participant.room_id;
 252            room_participant::Entity::delete(participant.into_active_model())
 253                .exec(&*tx)
 254                .await?;
 255
 256            let room = self.get_room(room_id, &tx).await?;
 257            Ok(Some((room_id, room)))
 258        })
 259        .await
 260    }
 261
 262    pub async fn cancel_call(
 263        &self,
 264        room_id: RoomId,
 265        calling_connection: ConnectionId,
 266        called_user_id: UserId,
 267    ) -> Result<RoomGuard<proto::Room>> {
 268        self.room_transaction(room_id, |tx| async move {
 269            let participant = room_participant::Entity::find()
 270                .filter(
 271                    Condition::all()
 272                        .add(room_participant::Column::UserId.eq(called_user_id))
 273                        .add(room_participant::Column::RoomId.eq(room_id))
 274                        .add(
 275                            room_participant::Column::CallingConnectionId
 276                                .eq(calling_connection.id as i32),
 277                        )
 278                        .add(
 279                            room_participant::Column::CallingConnectionServerId
 280                                .eq(calling_connection.owner_id as i32),
 281                        )
 282                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 283                )
 284                .one(&*tx)
 285                .await?
 286                .ok_or_else(|| anyhow!("no call to cancel"))?;
 287
 288            room_participant::Entity::delete(participant.into_active_model())
 289                .exec(&*tx)
 290                .await?;
 291
 292            let room = self.get_room(room_id, &tx).await?;
 293            Ok(room)
 294        })
 295        .await
 296    }
 297
 298    pub async fn join_room(
 299        &self,
 300        room_id: RoomId,
 301        user_id: UserId,
 302        connection: ConnectionId,
 303    ) -> Result<RoomGuard<JoinRoom>> {
 304        self.room_transaction(room_id, |tx| async move {
 305            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 306            enum QueryChannelId {
 307                ChannelId,
 308            }
 309
 310            let channel_id: Option<ChannelId> = room::Entity::find()
 311                .select_only()
 312                .column(room::Column::ChannelId)
 313                .filter(room::Column::Id.eq(room_id))
 314                .into_values::<_, QueryChannelId>()
 315                .one(&*tx)
 316                .await?
 317                .ok_or_else(|| anyhow!("no such room"))?;
 318
 319            if channel_id.is_some() {
 320                Err(anyhow!("tried to join channel call directly"))?
 321            }
 322
 323            let participant_index = self
 324                .get_next_participant_index_internal(room_id, &*tx)
 325                .await?;
 326
 327            let result = room_participant::Entity::update_many()
 328                .filter(
 329                    Condition::all()
 330                        .add(room_participant::Column::RoomId.eq(room_id))
 331                        .add(room_participant::Column::UserId.eq(user_id))
 332                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 333                )
 334                .set(room_participant::ActiveModel {
 335                    participant_index: ActiveValue::Set(Some(participant_index)),
 336                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 337                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 338                        connection.owner_id as i32,
 339                    ))),
 340                    answering_connection_lost: ActiveValue::set(false),
 341                    ..Default::default()
 342                })
 343                .exec(&*tx)
 344                .await?;
 345            if result.rows_affected == 0 {
 346                Err(anyhow!("room does not exist or was already joined"))?;
 347            }
 348
 349            let room = self.get_room(room_id, &tx).await?;
 350            Ok(JoinRoom {
 351                room,
 352                channel_id: None,
 353                channel_members: vec![],
 354            })
 355        })
 356        .await
 357    }
 358
 359    async fn get_next_participant_index_internal(
 360        &self,
 361        room_id: RoomId,
 362        tx: &DatabaseTransaction,
 363    ) -> Result<i32> {
 364        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 365        enum QueryParticipantIndices {
 366            ParticipantIndex,
 367        }
 368        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 369            .filter(
 370                room_participant::Column::RoomId
 371                    .eq(room_id)
 372                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 373            )
 374            .select_only()
 375            .column(room_participant::Column::ParticipantIndex)
 376            .into_values::<_, QueryParticipantIndices>()
 377            .all(&*tx)
 378            .await?;
 379
 380        let mut participant_index = 0;
 381        while existing_participant_indices.contains(&participant_index) {
 382            participant_index += 1;
 383        }
 384
 385        Ok(participant_index)
 386    }
 387
 388    /// Returns the channel ID for the given room, if it has one.
 389    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 390        self.transaction(|tx| async move {
 391            let room: Option<room::Model> = room::Entity::find()
 392                .filter(room::Column::Id.eq(room_id))
 393                .one(&*tx)
 394                .await?;
 395
 396            Ok(room.and_then(|room| room.channel_id))
 397        })
 398        .await
 399    }
 400
 401    pub(crate) async fn join_channel_room_internal(
 402        &self,
 403        room_id: RoomId,
 404        user_id: UserId,
 405        connection: ConnectionId,
 406        role: ChannelRole,
 407        tx: &DatabaseTransaction,
 408    ) -> Result<JoinRoom> {
 409        let participant_index = self
 410            .get_next_participant_index_internal(room_id, &*tx)
 411            .await?;
 412
 413        room_participant::Entity::insert_many([room_participant::ActiveModel {
 414            room_id: ActiveValue::set(room_id),
 415            user_id: ActiveValue::set(user_id),
 416            answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 417            answering_connection_server_id: ActiveValue::set(Some(ServerId(
 418                connection.owner_id as i32,
 419            ))),
 420            answering_connection_lost: ActiveValue::set(false),
 421            calling_user_id: ActiveValue::set(user_id),
 422            calling_connection_id: ActiveValue::set(connection.id as i32),
 423            calling_connection_server_id: ActiveValue::set(Some(ServerId(
 424                connection.owner_id as i32,
 425            ))),
 426            participant_index: ActiveValue::Set(Some(participant_index)),
 427            role: ActiveValue::set(Some(role)),
 428            id: ActiveValue::NotSet,
 429            location_kind: ActiveValue::NotSet,
 430            location_project_id: ActiveValue::NotSet,
 431            initial_project_id: ActiveValue::NotSet,
 432        }])
 433        .on_conflict(
 434            OnConflict::columns([room_participant::Column::UserId])
 435                .update_columns([
 436                    room_participant::Column::AnsweringConnectionId,
 437                    room_participant::Column::AnsweringConnectionServerId,
 438                    room_participant::Column::AnsweringConnectionLost,
 439                    room_participant::Column::ParticipantIndex,
 440                    room_participant::Column::Role,
 441                ])
 442                .to_owned(),
 443        )
 444        .exec(&*tx)
 445        .await?;
 446
 447        let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 448        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 449        let channel_members = self.get_channel_participants(&channel, &*tx).await?;
 450        Ok(JoinRoom {
 451            room,
 452            channel_id: Some(channel.id),
 453            channel_members,
 454        })
 455    }
 456
 457    pub async fn rejoin_room(
 458        &self,
 459        rejoin_room: proto::RejoinRoom,
 460        user_id: UserId,
 461        connection: ConnectionId,
 462    ) -> Result<RoomGuard<RejoinedRoom>> {
 463        let room_id = RoomId::from_proto(rejoin_room.id);
 464        self.room_transaction(room_id, |tx| async {
 465            let tx = tx;
 466            let participant_update = room_participant::Entity::update_many()
 467                .filter(
 468                    Condition::all()
 469                        .add(room_participant::Column::RoomId.eq(room_id))
 470                        .add(room_participant::Column::UserId.eq(user_id))
 471                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 472                        .add(
 473                            Condition::any()
 474                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
 475                                .add(
 476                                    room_participant::Column::AnsweringConnectionServerId
 477                                        .ne(connection.owner_id as i32),
 478                                ),
 479                        ),
 480                )
 481                .set(room_participant::ActiveModel {
 482                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 483                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 484                        connection.owner_id as i32,
 485                    ))),
 486                    answering_connection_lost: ActiveValue::set(false),
 487                    ..Default::default()
 488                })
 489                .exec(&*tx)
 490                .await?;
 491            if participant_update.rows_affected == 0 {
 492                return Err(anyhow!("room does not exist or was already joined"))?;
 493            }
 494
 495            let mut reshared_projects = Vec::new();
 496            for reshared_project in &rejoin_room.reshared_projects {
 497                let project_id = ProjectId::from_proto(reshared_project.project_id);
 498                let project = project::Entity::find_by_id(project_id)
 499                    .one(&*tx)
 500                    .await?
 501                    .ok_or_else(|| anyhow!("project does not exist"))?;
 502                if project.host_user_id != user_id {
 503                    return Err(anyhow!("no such project"))?;
 504                }
 505
 506                let mut collaborators = project
 507                    .find_related(project_collaborator::Entity)
 508                    .all(&*tx)
 509                    .await?;
 510                let host_ix = collaborators
 511                    .iter()
 512                    .position(|collaborator| {
 513                        collaborator.user_id == user_id && collaborator.is_host
 514                    })
 515                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 516                let host = collaborators.swap_remove(host_ix);
 517                let old_connection_id = host.connection();
 518
 519                project::Entity::update(project::ActiveModel {
 520                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 521                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 522                        connection.owner_id as i32,
 523                    ))),
 524                    ..project.into_active_model()
 525                })
 526                .exec(&*tx)
 527                .await?;
 528                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 529                    connection_id: ActiveValue::set(connection.id as i32),
 530                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 531                    ..host.into_active_model()
 532                })
 533                .exec(&*tx)
 534                .await?;
 535
 536                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 537                    .await?;
 538
 539                reshared_projects.push(ResharedProject {
 540                    id: project_id,
 541                    old_connection_id,
 542                    collaborators: collaborators
 543                        .iter()
 544                        .map(|collaborator| ProjectCollaborator {
 545                            connection_id: collaborator.connection(),
 546                            user_id: collaborator.user_id,
 547                            replica_id: collaborator.replica_id,
 548                            is_host: collaborator.is_host,
 549                        })
 550                        .collect(),
 551                    worktrees: reshared_project.worktrees.clone(),
 552                });
 553            }
 554
 555            project::Entity::delete_many()
 556                .filter(
 557                    Condition::all()
 558                        .add(project::Column::RoomId.eq(room_id))
 559                        .add(project::Column::HostUserId.eq(user_id))
 560                        .add(
 561                            project::Column::Id
 562                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 563                        ),
 564                )
 565                .exec(&*tx)
 566                .await?;
 567
 568            let mut rejoined_projects = Vec::new();
 569            for rejoined_project in &rejoin_room.rejoined_projects {
 570                let project_id = ProjectId::from_proto(rejoined_project.id);
 571                let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
 572                    continue;
 573                };
 574
 575                let mut worktrees = Vec::new();
 576                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
 577                for db_worktree in db_worktrees {
 578                    let mut worktree = RejoinedWorktree {
 579                        id: db_worktree.id as u64,
 580                        abs_path: db_worktree.abs_path,
 581                        root_name: db_worktree.root_name,
 582                        visible: db_worktree.visible,
 583                        updated_entries: Default::default(),
 584                        removed_entries: Default::default(),
 585                        updated_repositories: Default::default(),
 586                        removed_repositories: Default::default(),
 587                        diagnostic_summaries: Default::default(),
 588                        settings_files: Default::default(),
 589                        scan_id: db_worktree.scan_id as u64,
 590                        completed_scan_id: db_worktree.completed_scan_id as u64,
 591                    };
 592
 593                    let rejoined_worktree = rejoined_project
 594                        .worktrees
 595                        .iter()
 596                        .find(|worktree| worktree.id == db_worktree.id as u64);
 597
 598                    // File entries
 599                    {
 600                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 601                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 602                        } else {
 603                            worktree_entry::Column::IsDeleted.eq(false)
 604                        };
 605
 606                        let mut db_entries = worktree_entry::Entity::find()
 607                            .filter(
 608                                Condition::all()
 609                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
 610                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 611                                    .add(entry_filter),
 612                            )
 613                            .stream(&*tx)
 614                            .await?;
 615
 616                        while let Some(db_entry) = db_entries.next().await {
 617                            let db_entry = db_entry?;
 618                            if db_entry.is_deleted {
 619                                worktree.removed_entries.push(db_entry.id as u64);
 620                            } else {
 621                                worktree.updated_entries.push(proto::Entry {
 622                                    id: db_entry.id as u64,
 623                                    is_dir: db_entry.is_dir,
 624                                    path: db_entry.path,
 625                                    inode: db_entry.inode as u64,
 626                                    mtime: Some(proto::Timestamp {
 627                                        seconds: db_entry.mtime_seconds as u64,
 628                                        nanos: db_entry.mtime_nanos as u32,
 629                                    }),
 630                                    is_symlink: db_entry.is_symlink,
 631                                    is_ignored: db_entry.is_ignored,
 632                                    is_external: db_entry.is_external,
 633                                    git_status: db_entry.git_status.map(|status| status as i32),
 634                                });
 635                            }
 636                        }
 637                    }
 638
 639                    // Repository Entries
 640                    {
 641                        let repository_entry_filter =
 642                            if let Some(rejoined_worktree) = rejoined_worktree {
 643                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 644                            } else {
 645                                worktree_repository::Column::IsDeleted.eq(false)
 646                            };
 647
 648                        let mut db_repositories = worktree_repository::Entity::find()
 649                            .filter(
 650                                Condition::all()
 651                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
 652                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 653                                    .add(repository_entry_filter),
 654                            )
 655                            .stream(&*tx)
 656                            .await?;
 657
 658                        while let Some(db_repository) = db_repositories.next().await {
 659                            let db_repository = db_repository?;
 660                            if db_repository.is_deleted {
 661                                worktree
 662                                    .removed_repositories
 663                                    .push(db_repository.work_directory_id as u64);
 664                            } else {
 665                                worktree.updated_repositories.push(proto::RepositoryEntry {
 666                                    work_directory_id: db_repository.work_directory_id as u64,
 667                                    branch: db_repository.branch,
 668                                });
 669                            }
 670                        }
 671                    }
 672
 673                    worktrees.push(worktree);
 674                }
 675
 676                let language_servers = project
 677                    .find_related(language_server::Entity)
 678                    .all(&*tx)
 679                    .await?
 680                    .into_iter()
 681                    .map(|language_server| proto::LanguageServer {
 682                        id: language_server.id as u64,
 683                        name: language_server.name,
 684                    })
 685                    .collect::<Vec<_>>();
 686
 687                {
 688                    let mut db_settings_files = worktree_settings_file::Entity::find()
 689                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 690                        .stream(&*tx)
 691                        .await?;
 692                    while let Some(db_settings_file) = db_settings_files.next().await {
 693                        let db_settings_file = db_settings_file?;
 694                        if let Some(worktree) = worktrees
 695                            .iter_mut()
 696                            .find(|w| w.id == db_settings_file.worktree_id as u64)
 697                        {
 698                            worktree.settings_files.push(WorktreeSettingsFile {
 699                                path: db_settings_file.path,
 700                                content: db_settings_file.content,
 701                            });
 702                        }
 703                    }
 704                }
 705
 706                let mut collaborators = project
 707                    .find_related(project_collaborator::Entity)
 708                    .all(&*tx)
 709                    .await?;
 710                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 711                    .iter()
 712                    .position(|collaborator| collaborator.user_id == user_id)
 713                {
 714                    collaborators.swap_remove(self_collaborator_ix)
 715                } else {
 716                    continue;
 717                };
 718                let old_connection_id = self_collaborator.connection();
 719                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 720                    connection_id: ActiveValue::set(connection.id as i32),
 721                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 722                    ..self_collaborator.into_active_model()
 723                })
 724                .exec(&*tx)
 725                .await?;
 726
 727                let collaborators = collaborators
 728                    .into_iter()
 729                    .map(|collaborator| ProjectCollaborator {
 730                        connection_id: collaborator.connection(),
 731                        user_id: collaborator.user_id,
 732                        replica_id: collaborator.replica_id,
 733                        is_host: collaborator.is_host,
 734                    })
 735                    .collect::<Vec<_>>();
 736
 737                rejoined_projects.push(RejoinedProject {
 738                    id: project_id,
 739                    old_connection_id,
 740                    collaborators,
 741                    worktrees,
 742                    language_servers,
 743                });
 744            }
 745
 746            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 747            let channel_members = if let Some(channel) = &channel {
 748                self.get_channel_participants(&channel, &tx).await?
 749            } else {
 750                Vec::new()
 751            };
 752
 753            Ok(RejoinedRoom {
 754                room,
 755                channel_id: channel.map(|channel| channel.id),
 756                channel_members,
 757                rejoined_projects,
 758                reshared_projects,
 759            })
 760        })
 761        .await
 762    }
 763
 764    pub async fn leave_room(
 765        &self,
 766        connection: ConnectionId,
 767    ) -> Result<Option<RoomGuard<LeftRoom>>> {
 768        self.optional_room_transaction(|tx| async move {
 769            let leaving_participant = room_participant::Entity::find()
 770                .filter(
 771                    Condition::all()
 772                        .add(
 773                            room_participant::Column::AnsweringConnectionId
 774                                .eq(connection.id as i32),
 775                        )
 776                        .add(
 777                            room_participant::Column::AnsweringConnectionServerId
 778                                .eq(connection.owner_id as i32),
 779                        ),
 780                )
 781                .one(&*tx)
 782                .await?;
 783
 784            if let Some(leaving_participant) = leaving_participant {
 785                // Leave room.
 786                let room_id = leaving_participant.room_id;
 787                room_participant::Entity::delete_by_id(leaving_participant.id)
 788                    .exec(&*tx)
 789                    .await?;
 790
 791                // Cancel pending calls initiated by the leaving user.
 792                let called_participants = room_participant::Entity::find()
 793                    .filter(
 794                        Condition::all()
 795                            .add(
 796                                room_participant::Column::CallingUserId
 797                                    .eq(leaving_participant.user_id),
 798                            )
 799                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 800                    )
 801                    .all(&*tx)
 802                    .await?;
 803                room_participant::Entity::delete_many()
 804                    .filter(
 805                        room_participant::Column::Id
 806                            .is_in(called_participants.iter().map(|participant| participant.id)),
 807                    )
 808                    .exec(&*tx)
 809                    .await?;
 810                let canceled_calls_to_user_ids = called_participants
 811                    .into_iter()
 812                    .map(|participant| participant.user_id)
 813                    .collect();
 814
 815                // Detect left projects.
 816                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 817                enum QueryProjectIds {
 818                    ProjectId,
 819                }
 820                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 821                    .select_only()
 822                    .column_as(
 823                        project_collaborator::Column::ProjectId,
 824                        QueryProjectIds::ProjectId,
 825                    )
 826                    .filter(
 827                        Condition::all()
 828                            .add(
 829                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 830                            )
 831                            .add(
 832                                project_collaborator::Column::ConnectionServerId
 833                                    .eq(connection.owner_id as i32),
 834                            ),
 835                    )
 836                    .into_values::<_, QueryProjectIds>()
 837                    .all(&*tx)
 838                    .await?;
 839                let mut left_projects = HashMap::default();
 840                let mut collaborators = project_collaborator::Entity::find()
 841                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 842                    .stream(&*tx)
 843                    .await?;
 844                while let Some(collaborator) = collaborators.next().await {
 845                    let collaborator = collaborator?;
 846                    let left_project =
 847                        left_projects
 848                            .entry(collaborator.project_id)
 849                            .or_insert(LeftProject {
 850                                id: collaborator.project_id,
 851                                host_user_id: Default::default(),
 852                                connection_ids: Default::default(),
 853                                host_connection_id: None,
 854                            });
 855
 856                    let collaborator_connection_id = collaborator.connection();
 857                    if collaborator_connection_id != connection {
 858                        left_project.connection_ids.push(collaborator_connection_id);
 859                    }
 860
 861                    if collaborator.is_host {
 862                        left_project.host_user_id = collaborator.user_id;
 863                        left_project.host_connection_id = Some(collaborator_connection_id);
 864                    }
 865                }
 866                drop(collaborators);
 867
 868                // Leave projects.
 869                project_collaborator::Entity::delete_many()
 870                    .filter(
 871                        Condition::all()
 872                            .add(
 873                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 874                            )
 875                            .add(
 876                                project_collaborator::Column::ConnectionServerId
 877                                    .eq(connection.owner_id as i32),
 878                            ),
 879                    )
 880                    .exec(&*tx)
 881                    .await?;
 882
 883                follower::Entity::delete_many()
 884                    .filter(
 885                        Condition::all()
 886                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
 887                    )
 888                    .exec(&*tx)
 889                    .await?;
 890
 891                // Unshare projects.
 892                project::Entity::delete_many()
 893                    .filter(
 894                        Condition::all()
 895                            .add(project::Column::RoomId.eq(room_id))
 896                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 897                            .add(
 898                                project::Column::HostConnectionServerId
 899                                    .eq(connection.owner_id as i32),
 900                            ),
 901                    )
 902                    .exec(&*tx)
 903                    .await?;
 904
 905                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 906                let deleted = if room.participants.is_empty() {
 907                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 908                    result.rows_affected > 0
 909                } else {
 910                    false
 911                };
 912
 913                let channel_members = if let Some(channel) = &channel {
 914                    self.get_channel_participants(channel, &tx).await?
 915                } else {
 916                    Vec::new()
 917                };
 918                let left_room = LeftRoom {
 919                    room,
 920                    channel_id: channel.map(|channel| channel.id),
 921                    channel_members,
 922                    left_projects,
 923                    canceled_calls_to_user_ids,
 924                    deleted,
 925                };
 926
 927                if left_room.room.participants.is_empty() {
 928                    self.rooms.remove(&room_id);
 929                }
 930
 931                Ok(Some((room_id, left_room)))
 932            } else {
 933                Ok(None)
 934            }
 935        })
 936        .await
 937    }
 938
 939    /// Updates the location of a participant in the given room.
 940    pub async fn update_room_participant_location(
 941        &self,
 942        room_id: RoomId,
 943        connection: ConnectionId,
 944        location: proto::ParticipantLocation,
 945    ) -> Result<RoomGuard<proto::Room>> {
 946        self.room_transaction(room_id, |tx| async {
 947            let tx = tx;
 948            let location_kind;
 949            let location_project_id;
 950            match location
 951                .variant
 952                .as_ref()
 953                .ok_or_else(|| anyhow!("invalid location"))?
 954            {
 955                proto::participant_location::Variant::SharedProject(project) => {
 956                    location_kind = 0;
 957                    location_project_id = Some(ProjectId::from_proto(project.id));
 958                }
 959                proto::participant_location::Variant::UnsharedProject(_) => {
 960                    location_kind = 1;
 961                    location_project_id = None;
 962                }
 963                proto::participant_location::Variant::External(_) => {
 964                    location_kind = 2;
 965                    location_project_id = None;
 966                }
 967            }
 968
 969            let result = room_participant::Entity::update_many()
 970                .filter(
 971                    Condition::all()
 972                        .add(room_participant::Column::RoomId.eq(room_id))
 973                        .add(
 974                            room_participant::Column::AnsweringConnectionId
 975                                .eq(connection.id as i32),
 976                        )
 977                        .add(
 978                            room_participant::Column::AnsweringConnectionServerId
 979                                .eq(connection.owner_id as i32),
 980                        ),
 981                )
 982                .set(room_participant::ActiveModel {
 983                    location_kind: ActiveValue::set(Some(location_kind)),
 984                    location_project_id: ActiveValue::set(location_project_id),
 985                    ..Default::default()
 986                })
 987                .exec(&*tx)
 988                .await?;
 989
 990            if result.rows_affected == 1 {
 991                let room = self.get_room(room_id, &tx).await?;
 992                Ok(room)
 993            } else {
 994                Err(anyhow!("could not update room participant location"))?
 995            }
 996        })
 997        .await
 998    }
 999
1000    /// Sets the role of a participant in the given room.
1001    pub async fn set_room_participant_role(
1002        &self,
1003        admin_id: UserId,
1004        room_id: RoomId,
1005        user_id: UserId,
1006        role: ChannelRole,
1007    ) -> Result<RoomGuard<proto::Room>> {
1008        self.room_transaction(room_id, |tx| async move {
1009            room_participant::Entity::find()
1010                .filter(
1011                    Condition::all()
1012                        .add(room_participant::Column::RoomId.eq(room_id))
1013                        .add(room_participant::Column::UserId.eq(admin_id))
1014                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1015                )
1016                .one(&*tx)
1017                .await?
1018                .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1019
1020            if role.requires_cla() {
1021                self.check_user_has_signed_cla(user_id, room_id, &*tx)
1022                    .await?;
1023            }
1024
1025            let result = room_participant::Entity::update_many()
1026                .filter(
1027                    Condition::all()
1028                        .add(room_participant::Column::RoomId.eq(room_id))
1029                        .add(room_participant::Column::UserId.eq(user_id)),
1030                )
1031                .set(room_participant::ActiveModel {
1032                    role: ActiveValue::set(Some(ChannelRole::from(role))),
1033                    ..Default::default()
1034                })
1035                .exec(&*tx)
1036                .await?;
1037
1038            if result.rows_affected != 1 {
1039                Err(anyhow!("could not update room participant role"))?;
1040            }
1041            Ok(self.get_room(room_id, &tx).await?)
1042        })
1043        .await
1044    }
1045
1046    async fn check_user_has_signed_cla(
1047        &self,
1048        user_id: UserId,
1049        room_id: RoomId,
1050        tx: &DatabaseTransaction,
1051    ) -> Result<()> {
1052        let channel = room::Entity::find_by_id(room_id)
1053            .one(&*tx)
1054            .await?
1055            .ok_or_else(|| anyhow!("could not find room"))?
1056            .find_related(channel::Entity)
1057            .one(&*tx)
1058            .await?;
1059
1060        if let Some(channel) = channel {
1061            let requires_zed_cla = channel.requires_zed_cla
1062                || channel::Entity::find()
1063                    .filter(
1064                        channel::Column::Id
1065                            .is_in(channel.ancestors())
1066                            .and(channel::Column::RequiresZedCla.eq(true)),
1067                    )
1068                    .count(&*tx)
1069                    .await?
1070                    > 0;
1071            if requires_zed_cla {
1072                if contributor::Entity::find()
1073                    .filter(contributor::Column::UserId.eq(user_id))
1074                    .one(&*tx)
1075                    .await?
1076                    .is_none()
1077                {
1078                    Err(anyhow!("user has not signed the Zed CLA"))?;
1079                }
1080            }
1081        }
1082        Ok(())
1083    }
1084
1085    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1086        self.transaction(|tx| async move {
1087            self.room_connection_lost(connection, &*tx).await?;
1088            self.channel_buffer_connection_lost(connection, &*tx)
1089                .await?;
1090            self.channel_chat_connection_lost(connection, &*tx).await?;
1091            Ok(())
1092        })
1093        .await
1094    }
1095
1096    pub async fn room_connection_lost(
1097        &self,
1098        connection: ConnectionId,
1099        tx: &DatabaseTransaction,
1100    ) -> Result<()> {
1101        let participant = room_participant::Entity::find()
1102            .filter(
1103                Condition::all()
1104                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1105                    .add(
1106                        room_participant::Column::AnsweringConnectionServerId
1107                            .eq(connection.owner_id as i32),
1108                    ),
1109            )
1110            .one(&*tx)
1111            .await?;
1112
1113        if let Some(participant) = participant {
1114            room_participant::Entity::update(room_participant::ActiveModel {
1115                answering_connection_lost: ActiveValue::set(true),
1116                ..participant.into_active_model()
1117            })
1118            .exec(&*tx)
1119            .await?;
1120        }
1121        Ok(())
1122    }
1123
1124    fn build_incoming_call(
1125        room: &proto::Room,
1126        called_user_id: UserId,
1127    ) -> Option<proto::IncomingCall> {
1128        let pending_participant = room
1129            .pending_participants
1130            .iter()
1131            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1132
1133        Some(proto::IncomingCall {
1134            room_id: room.id,
1135            calling_user_id: pending_participant.calling_user_id,
1136            participant_user_ids: room
1137                .participants
1138                .iter()
1139                .map(|participant| participant.user_id)
1140                .collect(),
1141            initial_project: room.participants.iter().find_map(|participant| {
1142                let initial_project_id = pending_participant.initial_project_id?;
1143                participant
1144                    .projects
1145                    .iter()
1146                    .find(|project| project.id == initial_project_id)
1147                    .cloned()
1148            }),
1149        })
1150    }
1151
1152    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1153        let (_, room) = self.get_channel_room(room_id, tx).await?;
1154        Ok(room)
1155    }
1156
1157    pub async fn room_connection_ids(
1158        &self,
1159        room_id: RoomId,
1160        connection_id: ConnectionId,
1161    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1162        self.room_transaction(room_id, |tx| async move {
1163            let mut participants = room_participant::Entity::find()
1164                .filter(room_participant::Column::RoomId.eq(room_id))
1165                .stream(&*tx)
1166                .await?;
1167
1168            let mut is_participant = false;
1169            let mut connection_ids = HashSet::default();
1170            while let Some(participant) = participants.next().await {
1171                let participant = participant?;
1172                if let Some(answering_connection) = participant.answering_connection() {
1173                    if answering_connection == connection_id {
1174                        is_participant = true;
1175                    } else {
1176                        connection_ids.insert(answering_connection);
1177                    }
1178                }
1179            }
1180
1181            if !is_participant {
1182                Err(anyhow!("not a room participant"))?;
1183            }
1184
1185            Ok(connection_ids)
1186        })
1187        .await
1188    }
1189
1190    async fn get_channel_room(
1191        &self,
1192        room_id: RoomId,
1193        tx: &DatabaseTransaction,
1194    ) -> Result<(Option<channel::Model>, proto::Room)> {
1195        let db_room = room::Entity::find_by_id(room_id)
1196            .one(tx)
1197            .await?
1198            .ok_or_else(|| anyhow!("could not find room"))?;
1199
1200        let mut db_participants = db_room
1201            .find_related(room_participant::Entity)
1202            .stream(tx)
1203            .await?;
1204        let mut participants = HashMap::default();
1205        let mut pending_participants = Vec::new();
1206        while let Some(db_participant) = db_participants.next().await {
1207            let db_participant = db_participant?;
1208            if let (
1209                Some(answering_connection_id),
1210                Some(answering_connection_server_id),
1211                Some(participant_index),
1212            ) = (
1213                db_participant.answering_connection_id,
1214                db_participant.answering_connection_server_id,
1215                db_participant.participant_index,
1216            ) {
1217                let location = match (
1218                    db_participant.location_kind,
1219                    db_participant.location_project_id,
1220                ) {
1221                    (Some(0), Some(project_id)) => {
1222                        Some(proto::participant_location::Variant::SharedProject(
1223                            proto::participant_location::SharedProject {
1224                                id: project_id.to_proto(),
1225                            },
1226                        ))
1227                    }
1228                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1229                        Default::default(),
1230                    )),
1231                    _ => Some(proto::participant_location::Variant::External(
1232                        Default::default(),
1233                    )),
1234                };
1235
1236                let answering_connection = ConnectionId {
1237                    owner_id: answering_connection_server_id.0 as u32,
1238                    id: answering_connection_id as u32,
1239                };
1240                participants.insert(
1241                    answering_connection,
1242                    proto::Participant {
1243                        user_id: db_participant.user_id.to_proto(),
1244                        peer_id: Some(answering_connection.into()),
1245                        projects: Default::default(),
1246                        location: Some(proto::ParticipantLocation { variant: location }),
1247                        participant_index: participant_index as u32,
1248                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1249                    },
1250                );
1251            } else {
1252                pending_participants.push(proto::PendingParticipant {
1253                    user_id: db_participant.user_id.to_proto(),
1254                    calling_user_id: db_participant.calling_user_id.to_proto(),
1255                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1256                });
1257            }
1258        }
1259        drop(db_participants);
1260
1261        let mut db_projects = db_room
1262            .find_related(project::Entity)
1263            .find_with_related(worktree::Entity)
1264            .stream(tx)
1265            .await?;
1266
1267        while let Some(row) = db_projects.next().await {
1268            let (db_project, db_worktree) = row?;
1269            let host_connection = db_project.host_connection()?;
1270            if let Some(participant) = participants.get_mut(&host_connection) {
1271                let project = if let Some(project) = participant
1272                    .projects
1273                    .iter_mut()
1274                    .find(|project| project.id == db_project.id.to_proto())
1275                {
1276                    project
1277                } else {
1278                    participant.projects.push(proto::ParticipantProject {
1279                        id: db_project.id.to_proto(),
1280                        worktree_root_names: Default::default(),
1281                    });
1282                    participant.projects.last_mut().unwrap()
1283                };
1284
1285                if let Some(db_worktree) = db_worktree {
1286                    if db_worktree.visible {
1287                        project.worktree_root_names.push(db_worktree.root_name);
1288                    }
1289                }
1290            }
1291        }
1292        drop(db_projects);
1293
1294        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1295        let mut followers = Vec::new();
1296        while let Some(db_follower) = db_followers.next().await {
1297            let db_follower = db_follower?;
1298            followers.push(proto::Follower {
1299                leader_id: Some(db_follower.leader_connection().into()),
1300                follower_id: Some(db_follower.follower_connection().into()),
1301                project_id: db_follower.project_id.to_proto(),
1302            });
1303        }
1304        drop(db_followers);
1305
1306        let channel = if let Some(channel_id) = db_room.channel_id {
1307            Some(self.get_channel_internal(channel_id, &*tx).await?)
1308        } else {
1309            None
1310        };
1311
1312        Ok((
1313            channel,
1314            proto::Room {
1315                id: db_room.id.to_proto(),
1316                live_kit_room: db_room.live_kit_room,
1317                participants: participants.into_values().collect(),
1318                pending_participants,
1319                followers,
1320            },
1321        ))
1322    }
1323}