rooms.rs

   1use super::*;
   2
   3impl Database {
   4    /// Clears all room participants in rooms attached to a stale server.
   5    pub async fn clear_stale_room_participants(
   6        &self,
   7        room_id: RoomId,
   8        new_server_id: ServerId,
   9    ) -> Result<TransactionGuard<RefreshedRoom>> {
  10        self.room_transaction(room_id, |tx| async move {
  11            let stale_participant_filter = Condition::all()
  12                .add(room_participant::Column::RoomId.eq(room_id))
  13                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  14                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  15
  16            let stale_participant_user_ids = room_participant::Entity::find()
  17                .filter(stale_participant_filter.clone())
  18                .all(&*tx)
  19                .await?
  20                .into_iter()
  21                .map(|participant| participant.user_id)
  22                .collect::<Vec<_>>();
  23
  24            // Delete participants who failed to reconnect and cancel their calls.
  25            let mut canceled_calls_to_user_ids = Vec::new();
  26            room_participant::Entity::delete_many()
  27                .filter(stale_participant_filter)
  28                .exec(&*tx)
  29                .await?;
  30            let called_participants = room_participant::Entity::find()
  31                .filter(
  32                    Condition::all()
  33                        .add(
  34                            room_participant::Column::CallingUserId
  35                                .is_in(stale_participant_user_ids.iter().copied()),
  36                        )
  37                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  38                )
  39                .all(&*tx)
  40                .await?;
  41            room_participant::Entity::delete_many()
  42                .filter(
  43                    room_participant::Column::Id
  44                        .is_in(called_participants.iter().map(|participant| participant.id)),
  45                )
  46                .exec(&*tx)
  47                .await?;
  48            canceled_calls_to_user_ids.extend(
  49                called_participants
  50                    .into_iter()
  51                    .map(|participant| participant.user_id),
  52            );
  53
  54            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  55            if channel.is_none() {
  56                // Delete the room if it becomes empty.
  57                if room.participants.is_empty() {
  58                    project::Entity::delete_many()
  59                        .filter(project::Column::RoomId.eq(room_id))
  60                        .exec(&*tx)
  61                        .await?;
  62                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  63                }
  64            };
  65
  66            Ok(RefreshedRoom {
  67                room,
  68                channel,
  69                stale_participant_user_ids,
  70                canceled_calls_to_user_ids,
  71            })
  72        })
  73        .await
  74    }
  75
  76    /// Returns the incoming calls for user with the given ID.
  77    pub async fn incoming_call_for_user(
  78        &self,
  79        user_id: UserId,
  80    ) -> Result<Option<proto::IncomingCall>> {
  81        self.transaction(|tx| async move {
  82            let pending_participant = room_participant::Entity::find()
  83                .filter(
  84                    room_participant::Column::UserId
  85                        .eq(user_id)
  86                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  87                )
  88                .one(&*tx)
  89                .await?;
  90
  91            if let Some(pending_participant) = pending_participant {
  92                let room = self.get_room(pending_participant.room_id, &tx).await?;
  93                Ok(Self::build_incoming_call(&room, user_id))
  94            } else {
  95                Ok(None)
  96            }
  97        })
  98        .await
  99    }
 100
 101    /// Creates a new room.
 102    pub async fn create_room(
 103        &self,
 104        user_id: UserId,
 105        connection: ConnectionId,
 106        live_kit_room: &str,
 107    ) -> Result<proto::Room> {
 108        self.transaction(|tx| async move {
 109            let room = room::ActiveModel {
 110                live_kit_room: ActiveValue::set(live_kit_room.into()),
 111                ..Default::default()
 112            }
 113            .insert(&*tx)
 114            .await?;
 115            room_participant::ActiveModel {
 116                room_id: ActiveValue::set(room.id),
 117                user_id: ActiveValue::set(user_id),
 118                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 119                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 120                    connection.owner_id as i32,
 121                ))),
 122                answering_connection_lost: ActiveValue::set(false),
 123                calling_user_id: ActiveValue::set(user_id),
 124                calling_connection_id: ActiveValue::set(connection.id as i32),
 125                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 126                    connection.owner_id as i32,
 127                ))),
 128                participant_index: ActiveValue::set(Some(0)),
 129                role: ActiveValue::set(Some(ChannelRole::Admin)),
 130
 131                id: ActiveValue::NotSet,
 132                location_kind: ActiveValue::NotSet,
 133                location_project_id: ActiveValue::NotSet,
 134                initial_project_id: ActiveValue::NotSet,
 135            }
 136            .insert(&*tx)
 137            .await?;
 138
 139            let room = self.get_room(room.id, &tx).await?;
 140            Ok(room)
 141        })
 142        .await
 143    }
 144
 145    pub async fn call(
 146        &self,
 147        room_id: RoomId,
 148        calling_user_id: UserId,
 149        calling_connection: ConnectionId,
 150        called_user_id: UserId,
 151        initial_project_id: Option<ProjectId>,
 152    ) -> Result<TransactionGuard<(proto::Room, proto::IncomingCall)>> {
 153        self.room_transaction(room_id, |tx| async move {
 154            let caller = room_participant::Entity::find()
 155                .filter(
 156                    room_participant::Column::UserId
 157                        .eq(calling_user_id)
 158                        .and(room_participant::Column::RoomId.eq(room_id)),
 159                )
 160                .one(&*tx)
 161                .await?
 162                .ok_or_else(|| anyhow!("user is not in the room"))?;
 163
 164            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 165                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 166                ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
 167                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 168            };
 169
 170            room_participant::ActiveModel {
 171                room_id: ActiveValue::set(room_id),
 172                user_id: ActiveValue::set(called_user_id),
 173                answering_connection_lost: ActiveValue::set(false),
 174                participant_index: ActiveValue::NotSet,
 175                calling_user_id: ActiveValue::set(calling_user_id),
 176                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 177                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 178                    calling_connection.owner_id as i32,
 179                ))),
 180                initial_project_id: ActiveValue::set(initial_project_id),
 181                role: ActiveValue::set(Some(called_user_role)),
 182
 183                id: ActiveValue::NotSet,
 184                answering_connection_id: ActiveValue::NotSet,
 185                answering_connection_server_id: ActiveValue::NotSet,
 186                location_kind: ActiveValue::NotSet,
 187                location_project_id: ActiveValue::NotSet,
 188            }
 189            .insert(&*tx)
 190            .await?;
 191
 192            let room = self.get_room(room_id, &tx).await?;
 193            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 194                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 195            Ok((room, incoming_call))
 196        })
 197        .await
 198    }
 199
 200    pub async fn call_failed(
 201        &self,
 202        room_id: RoomId,
 203        called_user_id: UserId,
 204    ) -> Result<TransactionGuard<proto::Room>> {
 205        self.room_transaction(room_id, |tx| async move {
 206            room_participant::Entity::delete_many()
 207                .filter(
 208                    room_participant::Column::RoomId
 209                        .eq(room_id)
 210                        .and(room_participant::Column::UserId.eq(called_user_id)),
 211                )
 212                .exec(&*tx)
 213                .await?;
 214            let room = self.get_room(room_id, &tx).await?;
 215            Ok(room)
 216        })
 217        .await
 218    }
 219
 220    pub async fn decline_call(
 221        &self,
 222        expected_room_id: Option<RoomId>,
 223        user_id: UserId,
 224    ) -> Result<Option<TransactionGuard<proto::Room>>> {
 225        self.optional_room_transaction(|tx| async move {
 226            let mut filter = Condition::all()
 227                .add(room_participant::Column::UserId.eq(user_id))
 228                .add(room_participant::Column::AnsweringConnectionId.is_null());
 229            if let Some(room_id) = expected_room_id {
 230                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 231            }
 232            let participant = room_participant::Entity::find()
 233                .filter(filter)
 234                .one(&*tx)
 235                .await?;
 236
 237            let participant = if let Some(participant) = participant {
 238                participant
 239            } else if expected_room_id.is_some() {
 240                return Err(anyhow!("could not find call to decline"))?;
 241            } else {
 242                return Ok(None);
 243            };
 244
 245            let room_id = participant.room_id;
 246            room_participant::Entity::delete(participant.into_active_model())
 247                .exec(&*tx)
 248                .await?;
 249
 250            let room = self.get_room(room_id, &tx).await?;
 251            Ok(Some((room_id, room)))
 252        })
 253        .await
 254    }
 255
 256    pub async fn cancel_call(
 257        &self,
 258        room_id: RoomId,
 259        calling_connection: ConnectionId,
 260        called_user_id: UserId,
 261    ) -> Result<TransactionGuard<proto::Room>> {
 262        self.room_transaction(room_id, |tx| async move {
 263            let participant = room_participant::Entity::find()
 264                .filter(
 265                    Condition::all()
 266                        .add(room_participant::Column::UserId.eq(called_user_id))
 267                        .add(room_participant::Column::RoomId.eq(room_id))
 268                        .add(
 269                            room_participant::Column::CallingConnectionId
 270                                .eq(calling_connection.id as i32),
 271                        )
 272                        .add(
 273                            room_participant::Column::CallingConnectionServerId
 274                                .eq(calling_connection.owner_id as i32),
 275                        )
 276                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 277                )
 278                .one(&*tx)
 279                .await?
 280                .ok_or_else(|| anyhow!("no call to cancel"))?;
 281
 282            room_participant::Entity::delete(participant.into_active_model())
 283                .exec(&*tx)
 284                .await?;
 285
 286            let room = self.get_room(room_id, &tx).await?;
 287            Ok(room)
 288        })
 289        .await
 290    }
 291
 292    pub async fn join_room(
 293        &self,
 294        room_id: RoomId,
 295        user_id: UserId,
 296        connection: ConnectionId,
 297    ) -> Result<TransactionGuard<JoinRoom>> {
 298        self.room_transaction(room_id, |tx| async move {
 299            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 300            enum QueryChannelId {
 301                ChannelId,
 302            }
 303
 304            let channel_id: Option<ChannelId> = room::Entity::find()
 305                .select_only()
 306                .column(room::Column::ChannelId)
 307                .filter(room::Column::Id.eq(room_id))
 308                .into_values::<_, QueryChannelId>()
 309                .one(&*tx)
 310                .await?
 311                .ok_or_else(|| anyhow!("no such room"))?;
 312
 313            if channel_id.is_some() {
 314                Err(anyhow!("tried to join channel call directly"))?
 315            }
 316
 317            let participant_index = self
 318                .get_next_participant_index_internal(room_id, &tx)
 319                .await?;
 320
 321            let result = room_participant::Entity::update_many()
 322                .filter(
 323                    Condition::all()
 324                        .add(room_participant::Column::RoomId.eq(room_id))
 325                        .add(room_participant::Column::UserId.eq(user_id))
 326                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 327                )
 328                .set(room_participant::ActiveModel {
 329                    participant_index: ActiveValue::Set(Some(participant_index)),
 330                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 331                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 332                        connection.owner_id as i32,
 333                    ))),
 334                    answering_connection_lost: ActiveValue::set(false),
 335                    ..Default::default()
 336                })
 337                .exec(&*tx)
 338                .await?;
 339            if result.rows_affected == 0 {
 340                Err(anyhow!("room does not exist or was already joined"))?;
 341            }
 342
 343            let room = self.get_room(room_id, &tx).await?;
 344            Ok(JoinRoom {
 345                room,
 346                channel: None,
 347            })
 348        })
 349        .await
 350    }
 351
 352    pub async fn stale_room_connection(&self, user_id: UserId) -> Result<Option<ConnectionId>> {
 353        self.transaction(|tx| async move {
 354            let participant = room_participant::Entity::find()
 355                .filter(room_participant::Column::UserId.eq(user_id))
 356                .one(&*tx)
 357                .await?;
 358            Ok(participant.and_then(|p| p.answering_connection()))
 359        })
 360        .await
 361    }
 362
 363    async fn get_next_participant_index_internal(
 364        &self,
 365        room_id: RoomId,
 366        tx: &DatabaseTransaction,
 367    ) -> Result<i32> {
 368        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 369        enum QueryParticipantIndices {
 370            ParticipantIndex,
 371        }
 372        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 373            .filter(
 374                room_participant::Column::RoomId
 375                    .eq(room_id)
 376                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 377            )
 378            .select_only()
 379            .column(room_participant::Column::ParticipantIndex)
 380            .into_values::<_, QueryParticipantIndices>()
 381            .all(tx)
 382            .await?;
 383
 384        let mut participant_index = 0;
 385        while existing_participant_indices.contains(&participant_index) {
 386            participant_index += 1;
 387        }
 388
 389        Ok(participant_index)
 390    }
 391
 392    /// Returns the channel ID for the given room, if it has one.
 393    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 394        self.transaction(|tx| async move {
 395            let room: Option<room::Model> = room::Entity::find()
 396                .filter(room::Column::Id.eq(room_id))
 397                .one(&*tx)
 398                .await?;
 399
 400            Ok(room.and_then(|room| room.channel_id))
 401        })
 402        .await
 403    }
 404
 405    pub(crate) async fn join_channel_room_internal(
 406        &self,
 407        room_id: RoomId,
 408        user_id: UserId,
 409        connection: ConnectionId,
 410        role: ChannelRole,
 411        tx: &DatabaseTransaction,
 412    ) -> Result<JoinRoom> {
 413        let participant_index = self
 414            .get_next_participant_index_internal(room_id, tx)
 415            .await?;
 416
 417        // If someone has been invited into the room, accept the invite instead of inserting
 418        let result = room_participant::Entity::update_many()
 419            .filter(
 420                Condition::all()
 421                    .add(room_participant::Column::RoomId.eq(room_id))
 422                    .add(room_participant::Column::UserId.eq(user_id))
 423                    .add(room_participant::Column::AnsweringConnectionId.is_null()),
 424            )
 425            .set(room_participant::ActiveModel {
 426                participant_index: ActiveValue::Set(Some(participant_index)),
 427                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 428                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 429                    connection.owner_id as i32,
 430                ))),
 431                answering_connection_lost: ActiveValue::set(false),
 432                ..Default::default()
 433            })
 434            .exec(tx)
 435            .await?;
 436
 437        if result.rows_affected == 0 {
 438            room_participant::Entity::insert(room_participant::ActiveModel {
 439                room_id: ActiveValue::set(room_id),
 440                user_id: ActiveValue::set(user_id),
 441                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 442                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 443                    connection.owner_id as i32,
 444                ))),
 445                answering_connection_lost: ActiveValue::set(false),
 446                calling_user_id: ActiveValue::set(user_id),
 447                calling_connection_id: ActiveValue::set(connection.id as i32),
 448                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 449                    connection.owner_id as i32,
 450                ))),
 451                participant_index: ActiveValue::Set(Some(participant_index)),
 452                role: ActiveValue::set(Some(role)),
 453                id: ActiveValue::NotSet,
 454                location_kind: ActiveValue::NotSet,
 455                location_project_id: ActiveValue::NotSet,
 456                initial_project_id: ActiveValue::NotSet,
 457            })
 458            .exec(tx)
 459            .await?;
 460        }
 461
 462        let (channel, room) = self.get_channel_room(room_id, tx).await?;
 463        let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
 464        Ok(JoinRoom {
 465            room,
 466            channel: Some(channel),
 467        })
 468    }
 469
 470    pub async fn rejoin_room(
 471        &self,
 472        rejoin_room: proto::RejoinRoom,
 473        user_id: UserId,
 474        connection: ConnectionId,
 475    ) -> Result<TransactionGuard<RejoinedRoom>> {
 476        let room_id = RoomId::from_proto(rejoin_room.id);
 477        self.room_transaction(room_id, |tx| async {
 478            let tx = tx;
 479            let participant_update = room_participant::Entity::update_many()
 480                .filter(
 481                    Condition::all()
 482                        .add(room_participant::Column::RoomId.eq(room_id))
 483                        .add(room_participant::Column::UserId.eq(user_id))
 484                        .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
 485                )
 486                .set(room_participant::ActiveModel {
 487                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 488                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 489                        connection.owner_id as i32,
 490                    ))),
 491                    answering_connection_lost: ActiveValue::set(false),
 492                    ..Default::default()
 493                })
 494                .exec(&*tx)
 495                .await?;
 496            if participant_update.rows_affected == 0 {
 497                return Err(anyhow!("room does not exist or was already joined"))?;
 498            }
 499
 500            let mut reshared_projects = Vec::new();
 501            for reshared_project in &rejoin_room.reshared_projects {
 502                let project_id = ProjectId::from_proto(reshared_project.project_id);
 503                let project = project::Entity::find_by_id(project_id)
 504                    .one(&*tx)
 505                    .await?
 506                    .ok_or_else(|| anyhow!("project does not exist"))?;
 507                if project.host_user_id != Some(user_id) {
 508                    return Err(anyhow!("no such project"))?;
 509                }
 510
 511                let mut collaborators = project
 512                    .find_related(project_collaborator::Entity)
 513                    .all(&*tx)
 514                    .await?;
 515                let host_ix = collaborators
 516                    .iter()
 517                    .position(|collaborator| {
 518                        collaborator.user_id == user_id && collaborator.is_host
 519                    })
 520                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
 521                let host = collaborators.swap_remove(host_ix);
 522                let old_connection_id = host.connection();
 523
 524                project::Entity::update(project::ActiveModel {
 525                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 526                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 527                        connection.owner_id as i32,
 528                    ))),
 529                    ..project.into_active_model()
 530                })
 531                .exec(&*tx)
 532                .await?;
 533                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 534                    connection_id: ActiveValue::set(connection.id as i32),
 535                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 536                    ..host.into_active_model()
 537                })
 538                .exec(&*tx)
 539                .await?;
 540
 541                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 542                    .await?;
 543
 544                reshared_projects.push(ResharedProject {
 545                    id: project_id,
 546                    old_connection_id,
 547                    collaborators: collaborators
 548                        .iter()
 549                        .map(|collaborator| ProjectCollaborator {
 550                            connection_id: collaborator.connection(),
 551                            user_id: collaborator.user_id,
 552                            replica_id: collaborator.replica_id,
 553                            is_host: collaborator.is_host,
 554                        })
 555                        .collect(),
 556                    worktrees: reshared_project.worktrees.clone(),
 557                });
 558            }
 559
 560            project::Entity::delete_many()
 561                .filter(
 562                    Condition::all()
 563                        .add(project::Column::RoomId.eq(room_id))
 564                        .add(project::Column::HostUserId.eq(user_id))
 565                        .add(
 566                            project::Column::Id
 567                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 568                        ),
 569                )
 570                .exec(&*tx)
 571                .await?;
 572
 573            let mut rejoined_projects = Vec::new();
 574            for rejoined_project in &rejoin_room.rejoined_projects {
 575                if let Some(rejoined_project) = self
 576                    .rejoin_project_internal(&tx, rejoined_project, user_id, connection)
 577                    .await?
 578                {
 579                    rejoined_projects.push(rejoined_project);
 580                }
 581            }
 582
 583            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 584
 585            Ok(RejoinedRoom {
 586                room,
 587                channel,
 588                rejoined_projects,
 589                reshared_projects,
 590            })
 591        })
 592        .await
 593    }
 594
 595    pub async fn rejoin_project_internal(
 596        &self,
 597        tx: &DatabaseTransaction,
 598        rejoined_project: &proto::RejoinProject,
 599        user_id: UserId,
 600        connection: ConnectionId,
 601    ) -> Result<Option<RejoinedProject>> {
 602        let project_id = ProjectId::from_proto(rejoined_project.id);
 603        let Some(project) = project::Entity::find_by_id(project_id).one(tx).await? else {
 604            return Ok(None);
 605        };
 606
 607        let mut worktrees = Vec::new();
 608        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 609        for db_worktree in db_worktrees {
 610            let mut worktree = RejoinedWorktree {
 611                id: db_worktree.id as u64,
 612                abs_path: db_worktree.abs_path,
 613                root_name: db_worktree.root_name,
 614                visible: db_worktree.visible,
 615                updated_entries: Default::default(),
 616                removed_entries: Default::default(),
 617                updated_repositories: Default::default(),
 618                removed_repositories: Default::default(),
 619                diagnostic_summaries: Default::default(),
 620                settings_files: Default::default(),
 621                scan_id: db_worktree.scan_id as u64,
 622                completed_scan_id: db_worktree.completed_scan_id as u64,
 623            };
 624
 625            let rejoined_worktree = rejoined_project
 626                .worktrees
 627                .iter()
 628                .find(|worktree| worktree.id == db_worktree.id as u64);
 629
 630            // File entries
 631            {
 632                let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 633                    worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 634                } else {
 635                    worktree_entry::Column::IsDeleted.eq(false)
 636                };
 637
 638                let mut db_entries = worktree_entry::Entity::find()
 639                    .filter(
 640                        Condition::all()
 641                            .add(worktree_entry::Column::ProjectId.eq(project.id))
 642                            .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 643                            .add(entry_filter),
 644                    )
 645                    .stream(tx)
 646                    .await?;
 647
 648                while let Some(db_entry) = db_entries.next().await {
 649                    let db_entry = db_entry?;
 650                    if db_entry.is_deleted {
 651                        worktree.removed_entries.push(db_entry.id as u64);
 652                    } else {
 653                        worktree.updated_entries.push(proto::Entry {
 654                            id: db_entry.id as u64,
 655                            is_dir: db_entry.is_dir,
 656                            path: db_entry.path,
 657                            inode: db_entry.inode as u64,
 658                            mtime: Some(proto::Timestamp {
 659                                seconds: db_entry.mtime_seconds as u64,
 660                                nanos: db_entry.mtime_nanos as u32,
 661                            }),
 662                            is_symlink: db_entry.is_symlink,
 663                            is_ignored: db_entry.is_ignored,
 664                            is_external: db_entry.is_external,
 665                            git_status: db_entry.git_status.map(|status| status as i32),
 666                            // This is only used in the summarization backlog, so if it's None,
 667                            // that just means we won't be able to detect when to resummarize
 668                            // based on total number of backlogged bytes - instead, we'd go
 669                            // on number of files only. That shouldn't be a huge deal in practice.
 670                            size: None,
 671                            is_fifo: db_entry.is_fifo,
 672                        });
 673                    }
 674                }
 675            }
 676
 677            // Repository Entries
 678            {
 679                let repository_entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 680                    worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
 681                } else {
 682                    worktree_repository::Column::IsDeleted.eq(false)
 683                };
 684
 685                let mut db_repositories = worktree_repository::Entity::find()
 686                    .filter(
 687                        Condition::all()
 688                            .add(worktree_repository::Column::ProjectId.eq(project.id))
 689                            .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
 690                            .add(repository_entry_filter),
 691                    )
 692                    .stream(tx)
 693                    .await?;
 694
 695                while let Some(db_repository) = db_repositories.next().await {
 696                    let db_repository = db_repository?;
 697                    if db_repository.is_deleted {
 698                        worktree
 699                            .removed_repositories
 700                            .push(db_repository.work_directory_id as u64);
 701                    } else {
 702                        worktree.updated_repositories.push(proto::RepositoryEntry {
 703                            work_directory_id: db_repository.work_directory_id as u64,
 704                            branch: db_repository.branch,
 705                        });
 706                    }
 707                }
 708            }
 709
 710            worktrees.push(worktree);
 711        }
 712
 713        let language_servers = project
 714            .find_related(language_server::Entity)
 715            .all(tx)
 716            .await?
 717            .into_iter()
 718            .map(|language_server| proto::LanguageServer {
 719                id: language_server.id as u64,
 720                name: language_server.name,
 721                worktree_id: None,
 722            })
 723            .collect::<Vec<_>>();
 724
 725        {
 726            let mut db_settings_files = worktree_settings_file::Entity::find()
 727                .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 728                .stream(tx)
 729                .await?;
 730            while let Some(db_settings_file) = db_settings_files.next().await {
 731                let db_settings_file = db_settings_file?;
 732                if let Some(worktree) = worktrees
 733                    .iter_mut()
 734                    .find(|w| w.id == db_settings_file.worktree_id as u64)
 735                {
 736                    worktree.settings_files.push(WorktreeSettingsFile {
 737                        path: db_settings_file.path,
 738                        content: db_settings_file.content,
 739                        kind: db_settings_file.kind,
 740                    });
 741                }
 742            }
 743        }
 744
 745        let mut collaborators = project
 746            .find_related(project_collaborator::Entity)
 747            .all(tx)
 748            .await?;
 749        let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 750            .iter()
 751            .position(|collaborator| collaborator.user_id == user_id)
 752        {
 753            collaborators.swap_remove(self_collaborator_ix)
 754        } else {
 755            return Ok(None);
 756        };
 757        let old_connection_id = self_collaborator.connection();
 758        project_collaborator::Entity::update(project_collaborator::ActiveModel {
 759            connection_id: ActiveValue::set(connection.id as i32),
 760            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 761            ..self_collaborator.into_active_model()
 762        })
 763        .exec(tx)
 764        .await?;
 765
 766        let collaborators = collaborators
 767            .into_iter()
 768            .map(|collaborator| ProjectCollaborator {
 769                connection_id: collaborator.connection(),
 770                user_id: collaborator.user_id,
 771                replica_id: collaborator.replica_id,
 772                is_host: collaborator.is_host,
 773            })
 774            .collect::<Vec<_>>();
 775
 776        Ok(Some(RejoinedProject {
 777            id: project_id,
 778            old_connection_id,
 779            collaborators,
 780            worktrees,
 781            language_servers,
 782        }))
 783    }
 784
 785    pub async fn leave_room(
 786        &self,
 787        connection: ConnectionId,
 788    ) -> Result<Option<TransactionGuard<LeftRoom>>> {
 789        self.optional_room_transaction(|tx| async move {
 790            let leaving_participant = room_participant::Entity::find()
 791                .filter(
 792                    Condition::all()
 793                        .add(
 794                            room_participant::Column::AnsweringConnectionId
 795                                .eq(connection.id as i32),
 796                        )
 797                        .add(
 798                            room_participant::Column::AnsweringConnectionServerId
 799                                .eq(connection.owner_id as i32),
 800                        ),
 801                )
 802                .one(&*tx)
 803                .await?;
 804
 805            if let Some(leaving_participant) = leaving_participant {
 806                // Leave room.
 807                let room_id = leaving_participant.room_id;
 808                room_participant::Entity::delete_by_id(leaving_participant.id)
 809                    .exec(&*tx)
 810                    .await?;
 811
 812                // Cancel pending calls initiated by the leaving user.
 813                let called_participants = room_participant::Entity::find()
 814                    .filter(
 815                        Condition::all()
 816                            .add(
 817                                room_participant::Column::CallingUserId
 818                                    .eq(leaving_participant.user_id),
 819                            )
 820                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 821                    )
 822                    .all(&*tx)
 823                    .await?;
 824                room_participant::Entity::delete_many()
 825                    .filter(
 826                        room_participant::Column::Id
 827                            .is_in(called_participants.iter().map(|participant| participant.id)),
 828                    )
 829                    .exec(&*tx)
 830                    .await?;
 831                let canceled_calls_to_user_ids = called_participants
 832                    .into_iter()
 833                    .map(|participant| participant.user_id)
 834                    .collect();
 835
 836                // Detect left projects.
 837                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 838                enum QueryProjectIds {
 839                    ProjectId,
 840                }
 841                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 842                    .select_only()
 843                    .column_as(
 844                        project_collaborator::Column::ProjectId,
 845                        QueryProjectIds::ProjectId,
 846                    )
 847                    .filter(
 848                        Condition::all()
 849                            .add(
 850                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 851                            )
 852                            .add(
 853                                project_collaborator::Column::ConnectionServerId
 854                                    .eq(connection.owner_id as i32),
 855                            ),
 856                    )
 857                    .into_values::<_, QueryProjectIds>()
 858                    .all(&*tx)
 859                    .await?;
 860
 861                // if any project in the room has a remote-project-id that belongs to a dev server that this user owns.
 862                let dev_server_projects_for_user = self
 863                    .dev_server_project_ids_for_user(leaving_participant.user_id, &tx)
 864                    .await?;
 865
 866                let dev_server_projects_to_unshare = project::Entity::find()
 867                    .filter(
 868                        Condition::all()
 869                            .add(project::Column::RoomId.eq(room_id))
 870                            .add(
 871                                project::Column::DevServerProjectId
 872                                    .is_in(dev_server_projects_for_user.clone()),
 873                            ),
 874                    )
 875                    .all(&*tx)
 876                    .await?
 877                    .into_iter()
 878                    .map(|project| project.id)
 879                    .collect::<HashSet<_>>();
 880                let mut left_projects = HashMap::default();
 881                let mut collaborators = project_collaborator::Entity::find()
 882                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 883                    .stream(&*tx)
 884                    .await?;
 885
 886                while let Some(collaborator) = collaborators.next().await {
 887                    let collaborator = collaborator?;
 888                    let left_project =
 889                        left_projects
 890                            .entry(collaborator.project_id)
 891                            .or_insert(LeftProject {
 892                                id: collaborator.project_id,
 893                                connection_ids: Default::default(),
 894                                should_unshare: false,
 895                            });
 896
 897                    let collaborator_connection_id = collaborator.connection();
 898                    if collaborator_connection_id != connection {
 899                        left_project.connection_ids.push(collaborator_connection_id);
 900                    }
 901
 902                    if (collaborator.is_host && collaborator.connection() == connection)
 903                        || dev_server_projects_to_unshare.contains(&collaborator.project_id)
 904                    {
 905                        left_project.should_unshare = true;
 906                    }
 907                }
 908                drop(collaborators);
 909
 910                // Leave projects.
 911                project_collaborator::Entity::delete_many()
 912                    .filter(
 913                        Condition::all()
 914                            .add(
 915                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 916                            )
 917                            .add(
 918                                project_collaborator::Column::ConnectionServerId
 919                                    .eq(connection.owner_id as i32),
 920                            ),
 921                    )
 922                    .exec(&*tx)
 923                    .await?;
 924
 925                follower::Entity::delete_many()
 926                    .filter(
 927                        Condition::all()
 928                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
 929                    )
 930                    .exec(&*tx)
 931                    .await?;
 932
 933                // Unshare projects.
 934                project::Entity::delete_many()
 935                    .filter(
 936                        Condition::all()
 937                            .add(project::Column::RoomId.eq(room_id))
 938                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
 939                            .add(
 940                                project::Column::HostConnectionServerId
 941                                    .eq(connection.owner_id as i32),
 942                            ),
 943                    )
 944                    .exec(&*tx)
 945                    .await?;
 946
 947                if !dev_server_projects_to_unshare.is_empty() {
 948                    project::Entity::update_many()
 949                        .filter(project::Column::Id.is_in(dev_server_projects_to_unshare))
 950                        .set(project::ActiveModel {
 951                            room_id: ActiveValue::Set(None),
 952                            ..Default::default()
 953                        })
 954                        .exec(&*tx)
 955                        .await?;
 956                }
 957
 958                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 959                let deleted = if room.participants.is_empty() {
 960                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 961                    result.rows_affected > 0
 962                } else {
 963                    false
 964                };
 965
 966                let left_room = LeftRoom {
 967                    room,
 968                    channel,
 969                    left_projects,
 970                    canceled_calls_to_user_ids,
 971                    deleted,
 972                };
 973
 974                if left_room.room.participants.is_empty() {
 975                    self.rooms.remove(&room_id);
 976                }
 977
 978                Ok(Some((room_id, left_room)))
 979            } else {
 980                Ok(None)
 981            }
 982        })
 983        .await
 984    }
 985
 986    /// Updates the location of a participant in the given room.
 987    pub async fn update_room_participant_location(
 988        &self,
 989        room_id: RoomId,
 990        connection: ConnectionId,
 991        location: proto::ParticipantLocation,
 992    ) -> Result<TransactionGuard<proto::Room>> {
 993        self.room_transaction(room_id, |tx| async {
 994            let tx = tx;
 995            let location_kind;
 996            let location_project_id;
 997            match location
 998                .variant
 999                .as_ref()
1000                .ok_or_else(|| anyhow!("invalid location"))?
1001            {
1002                proto::participant_location::Variant::SharedProject(project) => {
1003                    location_kind = 0;
1004                    location_project_id = Some(ProjectId::from_proto(project.id));
1005                }
1006                proto::participant_location::Variant::UnsharedProject(_) => {
1007                    location_kind = 1;
1008                    location_project_id = None;
1009                }
1010                proto::participant_location::Variant::External(_) => {
1011                    location_kind = 2;
1012                    location_project_id = None;
1013                }
1014            }
1015
1016            let result = room_participant::Entity::update_many()
1017                .filter(
1018                    Condition::all()
1019                        .add(room_participant::Column::RoomId.eq(room_id))
1020                        .add(
1021                            room_participant::Column::AnsweringConnectionId
1022                                .eq(connection.id as i32),
1023                        )
1024                        .add(
1025                            room_participant::Column::AnsweringConnectionServerId
1026                                .eq(connection.owner_id as i32),
1027                        ),
1028                )
1029                .set(room_participant::ActiveModel {
1030                    location_kind: ActiveValue::set(Some(location_kind)),
1031                    location_project_id: ActiveValue::set(location_project_id),
1032                    ..Default::default()
1033                })
1034                .exec(&*tx)
1035                .await?;
1036
1037            if result.rows_affected == 1 {
1038                let room = self.get_room(room_id, &tx).await?;
1039                Ok(room)
1040            } else {
1041                Err(anyhow!("could not update room participant location"))?
1042            }
1043        })
1044        .await
1045    }
1046
1047    /// Sets the role of a participant in the given room.
1048    pub async fn set_room_participant_role(
1049        &self,
1050        admin_id: UserId,
1051        room_id: RoomId,
1052        user_id: UserId,
1053        role: ChannelRole,
1054    ) -> Result<TransactionGuard<proto::Room>> {
1055        self.room_transaction(room_id, |tx| async move {
1056            room_participant::Entity::find()
1057                .filter(
1058                    Condition::all()
1059                        .add(room_participant::Column::RoomId.eq(room_id))
1060                        .add(room_participant::Column::UserId.eq(admin_id))
1061                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1062                )
1063                .one(&*tx)
1064                .await?
1065                .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1066
1067            if role.requires_cla() {
1068                self.check_user_has_signed_cla(user_id, room_id, &tx)
1069                    .await?;
1070            }
1071
1072            let result = room_participant::Entity::update_many()
1073                .filter(
1074                    Condition::all()
1075                        .add(room_participant::Column::RoomId.eq(room_id))
1076                        .add(room_participant::Column::UserId.eq(user_id)),
1077                )
1078                .set(room_participant::ActiveModel {
1079                    role: ActiveValue::set(Some(role)),
1080                    ..Default::default()
1081                })
1082                .exec(&*tx)
1083                .await?;
1084
1085            if result.rows_affected != 1 {
1086                Err(anyhow!("could not update room participant role"))?;
1087            }
1088            self.get_room(room_id, &tx).await
1089        })
1090        .await
1091    }
1092
1093    async fn check_user_has_signed_cla(
1094        &self,
1095        user_id: UserId,
1096        room_id: RoomId,
1097        tx: &DatabaseTransaction,
1098    ) -> Result<()> {
1099        let channel = room::Entity::find_by_id(room_id)
1100            .one(tx)
1101            .await?
1102            .ok_or_else(|| anyhow!("could not find room"))?
1103            .find_related(channel::Entity)
1104            .one(tx)
1105            .await?;
1106
1107        if let Some(channel) = channel {
1108            let requires_zed_cla = channel.requires_zed_cla
1109                || channel::Entity::find()
1110                    .filter(
1111                        channel::Column::Id
1112                            .is_in(channel.ancestors())
1113                            .and(channel::Column::RequiresZedCla.eq(true)),
1114                    )
1115                    .count(tx)
1116                    .await?
1117                    > 0;
1118            if requires_zed_cla
1119                && contributor::Entity::find()
1120                    .filter(contributor::Column::UserId.eq(user_id))
1121                    .one(tx)
1122                    .await?
1123                    .is_none()
1124            {
1125                Err(anyhow!("user has not signed the Zed CLA"))?;
1126            }
1127        }
1128        Ok(())
1129    }
1130
1131    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1132        self.transaction(|tx| async move {
1133            self.room_connection_lost(connection, &tx).await?;
1134            self.channel_buffer_connection_lost(connection, &tx).await?;
1135            self.channel_chat_connection_lost(connection, &tx).await?;
1136            Ok(())
1137        })
1138        .await
1139    }
1140
1141    pub async fn room_connection_lost(
1142        &self,
1143        connection: ConnectionId,
1144        tx: &DatabaseTransaction,
1145    ) -> Result<()> {
1146        let participant = room_participant::Entity::find()
1147            .filter(
1148                Condition::all()
1149                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1150                    .add(
1151                        room_participant::Column::AnsweringConnectionServerId
1152                            .eq(connection.owner_id as i32),
1153                    ),
1154            )
1155            .one(tx)
1156            .await?;
1157
1158        if let Some(participant) = participant {
1159            room_participant::Entity::update(room_participant::ActiveModel {
1160                answering_connection_lost: ActiveValue::set(true),
1161                ..participant.into_active_model()
1162            })
1163            .exec(tx)
1164            .await?;
1165        }
1166        Ok(())
1167    }
1168
1169    fn build_incoming_call(
1170        room: &proto::Room,
1171        called_user_id: UserId,
1172    ) -> Option<proto::IncomingCall> {
1173        let pending_participant = room
1174            .pending_participants
1175            .iter()
1176            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1177
1178        Some(proto::IncomingCall {
1179            room_id: room.id,
1180            calling_user_id: pending_participant.calling_user_id,
1181            participant_user_ids: room
1182                .participants
1183                .iter()
1184                .map(|participant| participant.user_id)
1185                .collect(),
1186            initial_project: room.participants.iter().find_map(|participant| {
1187                let initial_project_id = pending_participant.initial_project_id?;
1188                participant
1189                    .projects
1190                    .iter()
1191                    .find(|project| project.id == initial_project_id)
1192                    .cloned()
1193            }),
1194        })
1195    }
1196
1197    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1198        let (_, room) = self.get_channel_room(room_id, tx).await?;
1199        Ok(room)
1200    }
1201
1202    pub async fn room_connection_ids(
1203        &self,
1204        room_id: RoomId,
1205        connection_id: ConnectionId,
1206    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1207        self.room_transaction(room_id, |tx| async move {
1208            let mut participants = room_participant::Entity::find()
1209                .filter(room_participant::Column::RoomId.eq(room_id))
1210                .stream(&*tx)
1211                .await?;
1212
1213            let mut is_participant = false;
1214            let mut connection_ids = HashSet::default();
1215            while let Some(participant) = participants.next().await {
1216                let participant = participant?;
1217                if let Some(answering_connection) = participant.answering_connection() {
1218                    if answering_connection == connection_id {
1219                        is_participant = true;
1220                    } else {
1221                        connection_ids.insert(answering_connection);
1222                    }
1223                }
1224            }
1225
1226            if !is_participant {
1227                Err(anyhow!("not a room participant"))?;
1228            }
1229
1230            Ok(connection_ids)
1231        })
1232        .await
1233    }
1234
1235    async fn get_channel_room(
1236        &self,
1237        room_id: RoomId,
1238        tx: &DatabaseTransaction,
1239    ) -> Result<(Option<channel::Model>, proto::Room)> {
1240        let db_room = room::Entity::find_by_id(room_id)
1241            .one(tx)
1242            .await?
1243            .ok_or_else(|| anyhow!("could not find room"))?;
1244
1245        let mut db_participants = db_room
1246            .find_related(room_participant::Entity)
1247            .stream(tx)
1248            .await?;
1249        let mut participants = HashMap::default();
1250        let mut pending_participants = Vec::new();
1251        while let Some(db_participant) = db_participants.next().await {
1252            let db_participant = db_participant?;
1253            if let (
1254                Some(answering_connection_id),
1255                Some(answering_connection_server_id),
1256                Some(participant_index),
1257            ) = (
1258                db_participant.answering_connection_id,
1259                db_participant.answering_connection_server_id,
1260                db_participant.participant_index,
1261            ) {
1262                let location = match (
1263                    db_participant.location_kind,
1264                    db_participant.location_project_id,
1265                ) {
1266                    (Some(0), Some(project_id)) => {
1267                        Some(proto::participant_location::Variant::SharedProject(
1268                            proto::participant_location::SharedProject {
1269                                id: project_id.to_proto(),
1270                            },
1271                        ))
1272                    }
1273                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1274                        Default::default(),
1275                    )),
1276                    _ => Some(proto::participant_location::Variant::External(
1277                        Default::default(),
1278                    )),
1279                };
1280
1281                let answering_connection = ConnectionId {
1282                    owner_id: answering_connection_server_id.0 as u32,
1283                    id: answering_connection_id as u32,
1284                };
1285                participants.insert(
1286                    answering_connection,
1287                    proto::Participant {
1288                        user_id: db_participant.user_id.to_proto(),
1289                        peer_id: Some(answering_connection.into()),
1290                        projects: Default::default(),
1291                        location: Some(proto::ParticipantLocation { variant: location }),
1292                        participant_index: participant_index as u32,
1293                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1294                    },
1295                );
1296            } else {
1297                pending_participants.push(proto::PendingParticipant {
1298                    user_id: db_participant.user_id.to_proto(),
1299                    calling_user_id: db_participant.calling_user_id.to_proto(),
1300                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1301                });
1302            }
1303        }
1304        drop(db_participants);
1305
1306        let db_projects = db_room
1307            .find_related(project::Entity)
1308            .find_with_related(worktree::Entity)
1309            .all(tx)
1310            .await?;
1311
1312        for (db_project, db_worktrees) in db_projects {
1313            let host_connection = db_project.host_connection()?;
1314            if let Some(participant) = participants.get_mut(&host_connection) {
1315                participant.projects.push(proto::ParticipantProject {
1316                    id: db_project.id.to_proto(),
1317                    worktree_root_names: Default::default(),
1318                });
1319                let project = participant.projects.last_mut().unwrap();
1320
1321                for db_worktree in db_worktrees {
1322                    if db_worktree.visible {
1323                        project.worktree_root_names.push(db_worktree.root_name);
1324                    }
1325                }
1326            } else if let Some(dev_server_project_id) = db_project.dev_server_project_id {
1327                let host = self
1328                    .owner_for_dev_server_project(dev_server_project_id, tx)
1329                    .await?;
1330                if let Some((_, participant)) = participants
1331                    .iter_mut()
1332                    .find(|(_, v)| v.user_id == host.to_proto())
1333                {
1334                    participant.projects.push(proto::ParticipantProject {
1335                        id: db_project.id.to_proto(),
1336                        worktree_root_names: Default::default(),
1337                    });
1338                    let project = participant.projects.last_mut().unwrap();
1339
1340                    for db_worktree in db_worktrees {
1341                        if db_worktree.visible {
1342                            project.worktree_root_names.push(db_worktree.root_name);
1343                        }
1344                    }
1345                }
1346            }
1347        }
1348
1349        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1350        let mut followers = Vec::new();
1351        while let Some(db_follower) = db_followers.next().await {
1352            let db_follower = db_follower?;
1353            followers.push(proto::Follower {
1354                leader_id: Some(db_follower.leader_connection().into()),
1355                follower_id: Some(db_follower.follower_connection().into()),
1356                project_id: db_follower.project_id.to_proto(),
1357            });
1358        }
1359        drop(db_followers);
1360
1361        let channel = if let Some(channel_id) = db_room.channel_id {
1362            Some(self.get_channel_internal(channel_id, tx).await?)
1363        } else {
1364            None
1365        };
1366
1367        Ok((
1368            channel,
1369            proto::Room {
1370                id: db_room.id.to_proto(),
1371                live_kit_room: db_room.live_kit_room,
1372                participants: participants.into_values().collect(),
1373                pending_participants,
1374                followers,
1375            },
1376        ))
1377    }
1378}