rooms.rs

   1use anyhow::Context as _;
   2
   3use super::*;
   4
   5impl Database {
   6    /// Clears all room participants in rooms attached to a stale server.
   7    pub async fn clear_stale_room_participants(
   8        &self,
   9        room_id: RoomId,
  10        new_server_id: ServerId,
  11    ) -> Result<TransactionGuard<RefreshedRoom>> {
  12        self.room_transaction(room_id, |tx| async move {
  13            let stale_participant_filter = Condition::all()
  14                .add(room_participant::Column::RoomId.eq(room_id))
  15                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
  16                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
  17
  18            let stale_participant_user_ids = room_participant::Entity::find()
  19                .filter(stale_participant_filter.clone())
  20                .all(&*tx)
  21                .await?
  22                .into_iter()
  23                .map(|participant| participant.user_id)
  24                .collect::<Vec<_>>();
  25
  26            // Delete participants who failed to reconnect and cancel their calls.
  27            let mut canceled_calls_to_user_ids = Vec::new();
  28            room_participant::Entity::delete_many()
  29                .filter(stale_participant_filter)
  30                .exec(&*tx)
  31                .await?;
  32            let called_participants = room_participant::Entity::find()
  33                .filter(
  34                    Condition::all()
  35                        .add(
  36                            room_participant::Column::CallingUserId
  37                                .is_in(stale_participant_user_ids.iter().copied()),
  38                        )
  39                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
  40                )
  41                .all(&*tx)
  42                .await?;
  43            room_participant::Entity::delete_many()
  44                .filter(
  45                    room_participant::Column::Id
  46                        .is_in(called_participants.iter().map(|participant| participant.id)),
  47                )
  48                .exec(&*tx)
  49                .await?;
  50            canceled_calls_to_user_ids.extend(
  51                called_participants
  52                    .into_iter()
  53                    .map(|participant| participant.user_id),
  54            );
  55
  56            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
  57            if channel.is_none() {
  58                // Delete the room if it becomes empty.
  59                if room.participants.is_empty() {
  60                    project::Entity::delete_many()
  61                        .filter(project::Column::RoomId.eq(room_id))
  62                        .exec(&*tx)
  63                        .await?;
  64                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
  65                }
  66            };
  67
  68            Ok(RefreshedRoom {
  69                room,
  70                channel,
  71                stale_participant_user_ids,
  72                canceled_calls_to_user_ids,
  73            })
  74        })
  75        .await
  76    }
  77
  78    /// Returns the incoming calls for user with the given ID.
  79    pub async fn incoming_call_for_user(
  80        &self,
  81        user_id: UserId,
  82    ) -> Result<Option<proto::IncomingCall>> {
  83        self.transaction(|tx| async move {
  84            let pending_participant = room_participant::Entity::find()
  85                .filter(
  86                    room_participant::Column::UserId
  87                        .eq(user_id)
  88                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
  89                )
  90                .one(&*tx)
  91                .await?;
  92
  93            if let Some(pending_participant) = pending_participant {
  94                let room = self.get_room(pending_participant.room_id, &tx).await?;
  95                Ok(Self::build_incoming_call(&room, user_id))
  96            } else {
  97                Ok(None)
  98            }
  99        })
 100        .await
 101    }
 102
 103    /// Creates a new room.
 104    pub async fn create_room(
 105        &self,
 106        user_id: UserId,
 107        connection: ConnectionId,
 108        livekit_room: &str,
 109    ) -> Result<proto::Room> {
 110        self.transaction(|tx| async move {
 111            let room = room::ActiveModel {
 112                live_kit_room: ActiveValue::set(livekit_room.into()),
 113                ..Default::default()
 114            }
 115            .insert(&*tx)
 116            .await?;
 117            room_participant::ActiveModel {
 118                room_id: ActiveValue::set(room.id),
 119                user_id: ActiveValue::set(user_id),
 120                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 121                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 122                    connection.owner_id as i32,
 123                ))),
 124                answering_connection_lost: ActiveValue::set(false),
 125                calling_user_id: ActiveValue::set(user_id),
 126                calling_connection_id: ActiveValue::set(connection.id as i32),
 127                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 128                    connection.owner_id as i32,
 129                ))),
 130                participant_index: ActiveValue::set(Some(0)),
 131                role: ActiveValue::set(Some(ChannelRole::Admin)),
 132
 133                id: ActiveValue::NotSet,
 134                location_kind: ActiveValue::NotSet,
 135                location_project_id: ActiveValue::NotSet,
 136                initial_project_id: ActiveValue::NotSet,
 137            }
 138            .insert(&*tx)
 139            .await?;
 140
 141            let room = self.get_room(room.id, &tx).await?;
 142            Ok(room)
 143        })
 144        .await
 145    }
 146
 147    pub async fn call(
 148        &self,
 149        room_id: RoomId,
 150        calling_user_id: UserId,
 151        calling_connection: ConnectionId,
 152        called_user_id: UserId,
 153        initial_project_id: Option<ProjectId>,
 154    ) -> Result<TransactionGuard<(proto::Room, proto::IncomingCall)>> {
 155        self.room_transaction(room_id, |tx| async move {
 156            let caller = room_participant::Entity::find()
 157                .filter(
 158                    room_participant::Column::UserId
 159                        .eq(calling_user_id)
 160                        .and(room_participant::Column::RoomId.eq(room_id)),
 161                )
 162                .one(&*tx)
 163                .await?
 164                .context("user is not in the room")?;
 165
 166            let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
 167                ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
 168                ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
 169                ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
 170            };
 171
 172            room_participant::ActiveModel {
 173                room_id: ActiveValue::set(room_id),
 174                user_id: ActiveValue::set(called_user_id),
 175                answering_connection_lost: ActiveValue::set(false),
 176                participant_index: ActiveValue::NotSet,
 177                calling_user_id: ActiveValue::set(calling_user_id),
 178                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
 179                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 180                    calling_connection.owner_id as i32,
 181                ))),
 182                initial_project_id: ActiveValue::set(initial_project_id),
 183                role: ActiveValue::set(Some(called_user_role)),
 184
 185                id: ActiveValue::NotSet,
 186                answering_connection_id: ActiveValue::NotSet,
 187                answering_connection_server_id: ActiveValue::NotSet,
 188                location_kind: ActiveValue::NotSet,
 189                location_project_id: ActiveValue::NotSet,
 190            }
 191            .insert(&*tx)
 192            .await?;
 193
 194            let room = self.get_room(room_id, &tx).await?;
 195            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 196                .context("failed to build incoming call")?;
 197            Ok((room, incoming_call))
 198        })
 199        .await
 200    }
 201
 202    pub async fn call_failed(
 203        &self,
 204        room_id: RoomId,
 205        called_user_id: UserId,
 206    ) -> Result<TransactionGuard<proto::Room>> {
 207        self.room_transaction(room_id, |tx| async move {
 208            room_participant::Entity::delete_many()
 209                .filter(
 210                    room_participant::Column::RoomId
 211                        .eq(room_id)
 212                        .and(room_participant::Column::UserId.eq(called_user_id)),
 213                )
 214                .exec(&*tx)
 215                .await?;
 216            let room = self.get_room(room_id, &tx).await?;
 217            Ok(room)
 218        })
 219        .await
 220    }
 221
 222    pub async fn decline_call(
 223        &self,
 224        expected_room_id: Option<RoomId>,
 225        user_id: UserId,
 226    ) -> Result<Option<TransactionGuard<proto::Room>>> {
 227        self.optional_room_transaction(|tx| async move {
 228            let mut filter = Condition::all()
 229                .add(room_participant::Column::UserId.eq(user_id))
 230                .add(room_participant::Column::AnsweringConnectionId.is_null());
 231            if let Some(room_id) = expected_room_id {
 232                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
 233            }
 234            let participant = room_participant::Entity::find()
 235                .filter(filter)
 236                .one(&*tx)
 237                .await?;
 238
 239            let participant = if let Some(participant) = participant {
 240                participant
 241            } else if expected_room_id.is_some() {
 242                return Err(anyhow!("could not find call to decline"))?;
 243            } else {
 244                return Ok(None);
 245            };
 246
 247            let room_id = participant.room_id;
 248            room_participant::Entity::delete(participant.into_active_model())
 249                .exec(&*tx)
 250                .await?;
 251
 252            let room = self.get_room(room_id, &tx).await?;
 253            Ok(Some((room_id, room)))
 254        })
 255        .await
 256    }
 257
 258    pub async fn cancel_call(
 259        &self,
 260        room_id: RoomId,
 261        calling_connection: ConnectionId,
 262        called_user_id: UserId,
 263    ) -> Result<TransactionGuard<proto::Room>> {
 264        self.room_transaction(room_id, |tx| async move {
 265            let participant = room_participant::Entity::find()
 266                .filter(
 267                    Condition::all()
 268                        .add(room_participant::Column::UserId.eq(called_user_id))
 269                        .add(room_participant::Column::RoomId.eq(room_id))
 270                        .add(
 271                            room_participant::Column::CallingConnectionId
 272                                .eq(calling_connection.id as i32),
 273                        )
 274                        .add(
 275                            room_participant::Column::CallingConnectionServerId
 276                                .eq(calling_connection.owner_id as i32),
 277                        )
 278                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 279                )
 280                .one(&*tx)
 281                .await?
 282                .context("no call to cancel")?;
 283
 284            room_participant::Entity::delete(participant.into_active_model())
 285                .exec(&*tx)
 286                .await?;
 287
 288            let room = self.get_room(room_id, &tx).await?;
 289            Ok(room)
 290        })
 291        .await
 292    }
 293
 294    pub async fn join_room(
 295        &self,
 296        room_id: RoomId,
 297        user_id: UserId,
 298        connection: ConnectionId,
 299    ) -> Result<TransactionGuard<JoinRoom>> {
 300        self.room_transaction(room_id, |tx| async move {
 301            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 302            enum QueryChannelId {
 303                ChannelId,
 304            }
 305
 306            let channel_id: Option<ChannelId> = room::Entity::find()
 307                .select_only()
 308                .column(room::Column::ChannelId)
 309                .filter(room::Column::Id.eq(room_id))
 310                .into_values::<_, QueryChannelId>()
 311                .one(&*tx)
 312                .await?
 313                .context("no such room")?;
 314
 315            if channel_id.is_some() {
 316                Err(anyhow!("tried to join channel call directly"))?
 317            }
 318
 319            let participant_index = self
 320                .get_next_participant_index_internal(room_id, &tx)
 321                .await?;
 322
 323            let result = room_participant::Entity::update_many()
 324                .filter(
 325                    Condition::all()
 326                        .add(room_participant::Column::RoomId.eq(room_id))
 327                        .add(room_participant::Column::UserId.eq(user_id))
 328                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 329                )
 330                .set(room_participant::ActiveModel {
 331                    participant_index: ActiveValue::Set(Some(participant_index)),
 332                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 333                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 334                        connection.owner_id as i32,
 335                    ))),
 336                    answering_connection_lost: ActiveValue::set(false),
 337                    ..Default::default()
 338                })
 339                .exec(&*tx)
 340                .await?;
 341            if result.rows_affected == 0 {
 342                Err(anyhow!("room does not exist or was already joined"))?;
 343            }
 344
 345            let room = self.get_room(room_id, &tx).await?;
 346            Ok(JoinRoom {
 347                room,
 348                channel: None,
 349            })
 350        })
 351        .await
 352    }
 353
 354    pub async fn stale_room_connection(&self, user_id: UserId) -> Result<Option<ConnectionId>> {
 355        self.transaction(|tx| async move {
 356            let participant = room_participant::Entity::find()
 357                .filter(room_participant::Column::UserId.eq(user_id))
 358                .one(&*tx)
 359                .await?;
 360            Ok(participant.and_then(|p| p.answering_connection()))
 361        })
 362        .await
 363    }
 364
 365    async fn get_next_participant_index_internal(
 366        &self,
 367        room_id: RoomId,
 368        tx: &DatabaseTransaction,
 369    ) -> Result<i32> {
 370        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 371        enum QueryParticipantIndices {
 372            ParticipantIndex,
 373        }
 374        let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
 375            .filter(
 376                room_participant::Column::RoomId
 377                    .eq(room_id)
 378                    .and(room_participant::Column::ParticipantIndex.is_not_null()),
 379            )
 380            .select_only()
 381            .column(room_participant::Column::ParticipantIndex)
 382            .into_values::<_, QueryParticipantIndices>()
 383            .all(tx)
 384            .await?;
 385
 386        let mut participant_index = 0;
 387        while existing_participant_indices.contains(&participant_index) {
 388            participant_index += 1;
 389        }
 390
 391        Ok(participant_index)
 392    }
 393
 394    /// Returns the channel ID for the given room, if it has one.
 395    pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
 396        self.transaction(|tx| async move {
 397            let room: Option<room::Model> = room::Entity::find()
 398                .filter(room::Column::Id.eq(room_id))
 399                .one(&*tx)
 400                .await?;
 401
 402            Ok(room.and_then(|room| room.channel_id))
 403        })
 404        .await
 405    }
 406
 407    pub(crate) async fn join_channel_room_internal(
 408        &self,
 409        room_id: RoomId,
 410        user_id: UserId,
 411        connection: ConnectionId,
 412        role: ChannelRole,
 413        tx: &DatabaseTransaction,
 414    ) -> Result<JoinRoom> {
 415        let participant_index = self
 416            .get_next_participant_index_internal(room_id, tx)
 417            .await?;
 418
 419        // If someone has been invited into the room, accept the invite instead of inserting
 420        let result = room_participant::Entity::update_many()
 421            .filter(
 422                Condition::all()
 423                    .add(room_participant::Column::RoomId.eq(room_id))
 424                    .add(room_participant::Column::UserId.eq(user_id))
 425                    .add(room_participant::Column::AnsweringConnectionId.is_null()),
 426            )
 427            .set(room_participant::ActiveModel {
 428                participant_index: ActiveValue::Set(Some(participant_index)),
 429                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 430                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 431                    connection.owner_id as i32,
 432                ))),
 433                answering_connection_lost: ActiveValue::set(false),
 434                ..Default::default()
 435            })
 436            .exec(tx)
 437            .await?;
 438
 439        if result.rows_affected == 0 {
 440            room_participant::Entity::insert(room_participant::ActiveModel {
 441                room_id: ActiveValue::set(room_id),
 442                user_id: ActiveValue::set(user_id),
 443                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 444                answering_connection_server_id: ActiveValue::set(Some(ServerId(
 445                    connection.owner_id as i32,
 446                ))),
 447                answering_connection_lost: ActiveValue::set(false),
 448                calling_user_id: ActiveValue::set(user_id),
 449                calling_connection_id: ActiveValue::set(connection.id as i32),
 450                calling_connection_server_id: ActiveValue::set(Some(ServerId(
 451                    connection.owner_id as i32,
 452                ))),
 453                participant_index: ActiveValue::Set(Some(participant_index)),
 454                role: ActiveValue::set(Some(role)),
 455                id: ActiveValue::NotSet,
 456                location_kind: ActiveValue::NotSet,
 457                location_project_id: ActiveValue::NotSet,
 458                initial_project_id: ActiveValue::NotSet,
 459            })
 460            .exec(tx)
 461            .await?;
 462        }
 463
 464        let (channel, room) = self.get_channel_room(room_id, tx).await?;
 465        let channel = channel.context("no channel for room")?;
 466        Ok(JoinRoom {
 467            room,
 468            channel: Some(channel),
 469        })
 470    }
 471
 472    pub async fn rejoin_room(
 473        &self,
 474        rejoin_room: proto::RejoinRoom,
 475        user_id: UserId,
 476        connection: ConnectionId,
 477    ) -> Result<TransactionGuard<RejoinedRoom>> {
 478        let room_id = RoomId::from_proto(rejoin_room.id);
 479        self.room_transaction(room_id, |tx| async {
 480            let tx = tx;
 481            let participant_update = room_participant::Entity::update_many()
 482                .filter(
 483                    Condition::all()
 484                        .add(room_participant::Column::RoomId.eq(room_id))
 485                        .add(room_participant::Column::UserId.eq(user_id))
 486                        .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
 487                )
 488                .set(room_participant::ActiveModel {
 489                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
 490                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
 491                        connection.owner_id as i32,
 492                    ))),
 493                    answering_connection_lost: ActiveValue::set(false),
 494                    ..Default::default()
 495                })
 496                .exec(&*tx)
 497                .await?;
 498            if participant_update.rows_affected == 0 {
 499                return Err(anyhow!("room does not exist or was already joined"))?;
 500            }
 501
 502            let mut reshared_projects = Vec::new();
 503            for reshared_project in &rejoin_room.reshared_projects {
 504                let project_id = ProjectId::from_proto(reshared_project.project_id);
 505                let project = project::Entity::find_by_id(project_id)
 506                    .one(&*tx)
 507                    .await?
 508                    .context("project does not exist")?;
 509                if project.host_user_id != Some(user_id) {
 510                    return Err(anyhow!("no such project"))?;
 511                }
 512
 513                let mut collaborators = project
 514                    .find_related(project_collaborator::Entity)
 515                    .all(&*tx)
 516                    .await?;
 517                let host_ix = collaborators
 518                    .iter()
 519                    .position(|collaborator| {
 520                        collaborator.user_id == user_id && collaborator.is_host
 521                    })
 522                    .context("host not found among collaborators")?;
 523                let host = collaborators.swap_remove(host_ix);
 524                let old_connection_id = host.connection();
 525
 526                project::Entity::update(project::ActiveModel {
 527                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
 528                    host_connection_server_id: ActiveValue::set(Some(ServerId(
 529                        connection.owner_id as i32,
 530                    ))),
 531                    ..project.into_active_model()
 532                })
 533                .exec(&*tx)
 534                .await?;
 535                project_collaborator::Entity::update(project_collaborator::ActiveModel {
 536                    connection_id: ActiveValue::set(connection.id as i32),
 537                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 538                    ..host.into_active_model()
 539                })
 540                .exec(&*tx)
 541                .await?;
 542
 543                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
 544                    .await?;
 545
 546                reshared_projects.push(ResharedProject {
 547                    id: project_id,
 548                    old_connection_id,
 549                    collaborators: collaborators
 550                        .iter()
 551                        .map(|collaborator| ProjectCollaborator {
 552                            connection_id: collaborator.connection(),
 553                            user_id: collaborator.user_id,
 554                            replica_id: collaborator.replica_id,
 555                            is_host: collaborator.is_host,
 556                            committer_name: collaborator.committer_name.clone(),
 557                            committer_email: collaborator.committer_email.clone(),
 558                        })
 559                        .collect(),
 560                    worktrees: reshared_project.worktrees.clone(),
 561                });
 562            }
 563
 564            project::Entity::delete_many()
 565                .filter(
 566                    Condition::all()
 567                        .add(project::Column::RoomId.eq(room_id))
 568                        .add(project::Column::HostUserId.eq(user_id))
 569                        .add(
 570                            project::Column::Id
 571                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
 572                        ),
 573                )
 574                .exec(&*tx)
 575                .await?;
 576
 577            let mut rejoined_projects = Vec::new();
 578            for rejoined_project in &rejoin_room.rejoined_projects {
 579                if let Some(rejoined_project) = self
 580                    .rejoin_project_internal(&tx, rejoined_project, user_id, connection)
 581                    .await?
 582                {
 583                    rejoined_projects.push(rejoined_project);
 584                }
 585            }
 586
 587            let (channel, room) = self.get_channel_room(room_id, &tx).await?;
 588
 589            Ok(RejoinedRoom {
 590                room,
 591                channel,
 592                rejoined_projects,
 593                reshared_projects,
 594            })
 595        })
 596        .await
 597    }
 598
 599    pub async fn rejoin_project_internal(
 600        &self,
 601        tx: &DatabaseTransaction,
 602        rejoined_project: &proto::RejoinProject,
 603        user_id: UserId,
 604        connection: ConnectionId,
 605    ) -> Result<Option<RejoinedProject>> {
 606        let project_id = ProjectId::from_proto(rejoined_project.id);
 607        let Some(project) = project::Entity::find_by_id(project_id).one(tx).await? else {
 608            return Ok(None);
 609        };
 610
 611        let mut worktrees = Vec::new();
 612        let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
 613        let db_repos = project
 614            .find_related(project_repository::Entity)
 615            .all(tx)
 616            .await?;
 617
 618        for db_worktree in db_worktrees {
 619            let mut worktree = RejoinedWorktree {
 620                id: db_worktree.id as u64,
 621                abs_path: db_worktree.abs_path,
 622                root_name: db_worktree.root_name,
 623                visible: db_worktree.visible,
 624                updated_entries: Default::default(),
 625                removed_entries: Default::default(),
 626                updated_repositories: Default::default(),
 627                removed_repositories: Default::default(),
 628                diagnostic_summaries: Default::default(),
 629                settings_files: Default::default(),
 630                scan_id: db_worktree.scan_id as u64,
 631                completed_scan_id: db_worktree.completed_scan_id as u64,
 632                root_repo_common_dir: db_worktree.root_repo_common_dir,
 633            };
 634
 635            let rejoined_worktree = rejoined_project
 636                .worktrees
 637                .iter()
 638                .find(|worktree| worktree.id == db_worktree.id as u64);
 639
 640            // File entries
 641            {
 642                let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
 643                    worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
 644                } else {
 645                    worktree_entry::Column::IsDeleted.eq(false)
 646                };
 647
 648                let mut db_entries = worktree_entry::Entity::find()
 649                    .filter(
 650                        Condition::all()
 651                            .add(worktree_entry::Column::ProjectId.eq(project.id))
 652                            .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
 653                            .add(entry_filter),
 654                    )
 655                    .stream(tx)
 656                    .await?;
 657
 658                while let Some(db_entry) = db_entries.next().await {
 659                    let db_entry = db_entry?;
 660                    if db_entry.is_deleted {
 661                        worktree.removed_entries.push(db_entry.id as u64);
 662                    } else {
 663                        worktree.updated_entries.push(proto::Entry {
 664                            id: db_entry.id as u64,
 665                            is_dir: db_entry.is_dir,
 666                            path: db_entry.path,
 667                            inode: db_entry.inode as u64,
 668                            mtime: Some(proto::Timestamp {
 669                                seconds: db_entry.mtime_seconds as u64,
 670                                nanos: db_entry.mtime_nanos as u32,
 671                            }),
 672                            canonical_path: db_entry.canonical_path,
 673                            is_ignored: db_entry.is_ignored,
 674                            is_external: db_entry.is_external,
 675                            is_hidden: db_entry.is_hidden,
 676                            // This is only used in the summarization backlog, so if it's None,
 677                            // that just means we won't be able to detect when to resummarize
 678                            // based on total number of backlogged bytes - instead, we'd go
 679                            // on number of files only. That shouldn't be a huge deal in practice.
 680                            size: None,
 681                            is_fifo: db_entry.is_fifo,
 682                        });
 683                    }
 684                }
 685            }
 686
 687            worktrees.push(worktree);
 688        }
 689
 690        let mut removed_repositories = Vec::new();
 691        let mut updated_repositories = Vec::new();
 692        for db_repo in db_repos {
 693            let rejoined_repository = rejoined_project
 694                .repositories
 695                .iter()
 696                .find(|repo| repo.id == db_repo.id as u64);
 697
 698            let repository_filter = if let Some(rejoined_repository) = rejoined_repository {
 699                project_repository::Column::ScanId.gt(rejoined_repository.scan_id)
 700            } else {
 701                project_repository::Column::IsDeleted.eq(false)
 702            };
 703
 704            let db_repositories = project_repository::Entity::find()
 705                .filter(
 706                    Condition::all()
 707                        .add(project_repository::Column::ProjectId.eq(project.id))
 708                        .add(repository_filter),
 709                )
 710                .all(tx)
 711                .await?;
 712
 713            for db_repository in db_repositories.into_iter() {
 714                if db_repository.is_deleted {
 715                    removed_repositories.push(db_repository.id as u64);
 716                } else {
 717                    let status_entry_filter = if let Some(rejoined_repository) = rejoined_repository
 718                    {
 719                        project_repository_statuses::Column::ScanId.gt(rejoined_repository.scan_id)
 720                    } else {
 721                        project_repository_statuses::Column::IsDeleted.eq(false)
 722                    };
 723
 724                    let mut db_statuses = project_repository_statuses::Entity::find()
 725                        .filter(
 726                            Condition::all()
 727                                .add(project_repository_statuses::Column::ProjectId.eq(project.id))
 728                                .add(
 729                                    project_repository_statuses::Column::RepositoryId
 730                                        .eq(db_repository.id),
 731                                )
 732                                .add(status_entry_filter),
 733                        )
 734                        .stream(tx)
 735                        .await?;
 736                    let mut removed_statuses = Vec::new();
 737                    let mut updated_statuses = Vec::new();
 738
 739                    while let Some(db_status) = db_statuses.next().await {
 740                        let db_status: project_repository_statuses::Model = db_status?;
 741                        if db_status.is_deleted {
 742                            removed_statuses.push(db_status.repo_path.clone());
 743                        } else {
 744                            updated_statuses.push(db_status_to_proto(db_status)?);
 745                        }
 746                    }
 747
 748                    let current_merge_conflicts = db_repository
 749                        .current_merge_conflicts
 750                        .as_ref()
 751                        .map(|conflicts| serde_json::from_str(conflicts))
 752                        .transpose()?
 753                        .unwrap_or_default();
 754
 755                    let branch_summary = db_repository
 756                        .branch_summary
 757                        .as_ref()
 758                        .map(|branch_summary| serde_json::from_str(branch_summary))
 759                        .transpose()?
 760                        .unwrap_or_default();
 761
 762                    let head_commit_details = db_repository
 763                        .head_commit_details
 764                        .as_ref()
 765                        .map(|head_commit_details| serde_json::from_str(head_commit_details))
 766                        .transpose()?
 767                        .unwrap_or_default();
 768
 769                    let entry_ids = serde_json::from_str(&db_repository.entry_ids)
 770                        .context("failed to deserialize repository's entry ids")?;
 771
 772                    if let Some(legacy_worktree_id) = db_repository.legacy_worktree_id {
 773                        if let Some(worktree) = worktrees
 774                            .iter_mut()
 775                            .find(|worktree| worktree.id as i64 == legacy_worktree_id)
 776                        {
 777                            worktree.updated_repositories.push(proto::RepositoryEntry {
 778                                repository_id: db_repository.id as u64,
 779                                updated_statuses,
 780                                removed_statuses,
 781                                current_merge_conflicts,
 782                                branch_summary,
 783                            });
 784                        }
 785                    } else {
 786                        updated_repositories.push(proto::UpdateRepository {
 787                            entry_ids,
 788                            updated_statuses,
 789                            removed_statuses,
 790                            current_merge_conflicts,
 791                            branch_summary,
 792                            head_commit_details,
 793                            branch_list: Vec::new(),
 794                            project_id: project_id.to_proto(),
 795                            id: db_repository.id as u64,
 796                            abs_path: db_repository.abs_path.clone(),
 797                            scan_id: db_repository.scan_id as u64,
 798                            is_last_update: true,
 799                            merge_message: db_repository.merge_message,
 800                            stash_entries: Vec::new(),
 801                            remote_upstream_url: db_repository.remote_upstream_url.clone(),
 802                            remote_origin_url: db_repository.remote_origin_url.clone(),
 803                            original_repo_abs_path: Some(db_repository.abs_path),
 804                            linked_worktrees: db_repository
 805                                .linked_worktrees
 806                                .as_deref()
 807                                .and_then(|s| serde_json::from_str(s).ok())
 808                                .unwrap_or_default(),
 809                        });
 810                    }
 811                }
 812            }
 813        }
 814
 815        let language_servers = project
 816            .find_related(language_server::Entity)
 817            .all(tx)
 818            .await?
 819            .into_iter()
 820            .map(|language_server| LanguageServer {
 821                server: proto::LanguageServer {
 822                    id: language_server.id as u64,
 823                    name: language_server.name,
 824                    worktree_id: language_server.worktree_id.map(|id| id as u64),
 825                },
 826                capabilities: language_server.capabilities,
 827            })
 828            .collect::<Vec<_>>();
 829
 830        {
 831            let mut db_settings_files = worktree_settings_file::Entity::find()
 832                .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
 833                .stream(tx)
 834                .await?;
 835            while let Some(db_settings_file) = db_settings_files.next().await {
 836                let db_settings_file = db_settings_file?;
 837                if let Some(worktree) = worktrees
 838                    .iter_mut()
 839                    .find(|w| w.id == db_settings_file.worktree_id as u64)
 840                {
 841                    worktree.settings_files.push(WorktreeSettingsFile {
 842                        path: db_settings_file.path,
 843                        content: db_settings_file.content,
 844                        kind: db_settings_file.kind,
 845                        outside_worktree: db_settings_file.outside_worktree,
 846                    });
 847                }
 848            }
 849        }
 850
 851        let mut collaborators = project
 852            .find_related(project_collaborator::Entity)
 853            .all(tx)
 854            .await?;
 855        let self_collaborator = if let Some(self_collaborator_ix) = collaborators
 856            .iter()
 857            .position(|collaborator| collaborator.user_id == user_id)
 858        {
 859            collaborators.swap_remove(self_collaborator_ix)
 860        } else {
 861            return Ok(None);
 862        };
 863        let old_connection_id = self_collaborator.connection();
 864        project_collaborator::Entity::update(project_collaborator::ActiveModel {
 865            connection_id: ActiveValue::set(connection.id as i32),
 866            connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
 867            ..self_collaborator.into_active_model()
 868        })
 869        .exec(tx)
 870        .await?;
 871
 872        let collaborators = collaborators
 873            .into_iter()
 874            .map(|collaborator| ProjectCollaborator {
 875                connection_id: collaborator.connection(),
 876                user_id: collaborator.user_id,
 877                replica_id: collaborator.replica_id,
 878                is_host: collaborator.is_host,
 879                committer_name: collaborator.committer_name,
 880                committer_email: collaborator.committer_email,
 881            })
 882            .collect::<Vec<_>>();
 883
 884        Ok(Some(RejoinedProject {
 885            id: project_id,
 886            old_connection_id,
 887            collaborators,
 888            updated_repositories,
 889            removed_repositories,
 890            worktrees,
 891            language_servers,
 892        }))
 893    }
 894
 895    pub async fn leave_room(
 896        &self,
 897        connection: ConnectionId,
 898    ) -> Result<Option<TransactionGuard<LeftRoom>>> {
 899        self.optional_room_transaction(|tx| async move {
 900            let leaving_participant = room_participant::Entity::find()
 901                .filter(
 902                    Condition::all()
 903                        .add(
 904                            room_participant::Column::AnsweringConnectionId
 905                                .eq(connection.id as i32),
 906                        )
 907                        .add(
 908                            room_participant::Column::AnsweringConnectionServerId
 909                                .eq(connection.owner_id as i32),
 910                        ),
 911                )
 912                .one(&*tx)
 913                .await?;
 914
 915            if let Some(leaving_participant) = leaving_participant {
 916                // Leave room.
 917                let room_id = leaving_participant.room_id;
 918                room_participant::Entity::delete_by_id(leaving_participant.id)
 919                    .exec(&*tx)
 920                    .await?;
 921
 922                // Cancel pending calls initiated by the leaving user.
 923                let called_participants = room_participant::Entity::find()
 924                    .filter(
 925                        Condition::all()
 926                            .add(
 927                                room_participant::Column::CallingUserId
 928                                    .eq(leaving_participant.user_id),
 929                            )
 930                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
 931                    )
 932                    .all(&*tx)
 933                    .await?;
 934                room_participant::Entity::delete_many()
 935                    .filter(
 936                        room_participant::Column::Id
 937                            .is_in(called_participants.iter().map(|participant| participant.id)),
 938                    )
 939                    .exec(&*tx)
 940                    .await?;
 941                let canceled_calls_to_user_ids = called_participants
 942                    .into_iter()
 943                    .map(|participant| participant.user_id)
 944                    .collect();
 945
 946                // Detect left projects.
 947                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 948                enum QueryProjectIds {
 949                    ProjectId,
 950                }
 951                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
 952                    .select_only()
 953                    .column_as(
 954                        project_collaborator::Column::ProjectId,
 955                        QueryProjectIds::ProjectId,
 956                    )
 957                    .filter(
 958                        Condition::all()
 959                            .add(
 960                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
 961                            )
 962                            .add(
 963                                project_collaborator::Column::ConnectionServerId
 964                                    .eq(connection.owner_id as i32),
 965                            ),
 966                    )
 967                    .into_values::<_, QueryProjectIds>()
 968                    .all(&*tx)
 969                    .await?;
 970
 971                let mut left_projects = HashMap::default();
 972                let mut collaborators = project_collaborator::Entity::find()
 973                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
 974                    .stream(&*tx)
 975                    .await?;
 976
 977                while let Some(collaborator) = collaborators.next().await {
 978                    let collaborator = collaborator?;
 979                    let left_project =
 980                        left_projects
 981                            .entry(collaborator.project_id)
 982                            .or_insert(LeftProject {
 983                                id: collaborator.project_id,
 984                                connection_ids: Default::default(),
 985                                should_unshare: false,
 986                            });
 987
 988                    let collaborator_connection_id = collaborator.connection();
 989                    if collaborator_connection_id != connection {
 990                        left_project.connection_ids.push(collaborator_connection_id);
 991                    }
 992
 993                    if collaborator.is_host && collaborator.connection() == connection {
 994                        left_project.should_unshare = true;
 995                    }
 996                }
 997                drop(collaborators);
 998
 999                // Leave projects.
1000                project_collaborator::Entity::delete_many()
1001                    .filter(
1002                        Condition::all()
1003                            .add(
1004                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1005                            )
1006                            .add(
1007                                project_collaborator::Column::ConnectionServerId
1008                                    .eq(connection.owner_id as i32),
1009                            ),
1010                    )
1011                    .exec(&*tx)
1012                    .await?;
1013
1014                follower::Entity::delete_many()
1015                    .filter(
1016                        Condition::all()
1017                            .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
1018                    )
1019                    .exec(&*tx)
1020                    .await?;
1021
1022                // Unshare projects.
1023                project::Entity::delete_many()
1024                    .filter(
1025                        Condition::all()
1026                            .add(project::Column::RoomId.eq(room_id))
1027                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1028                            .add(
1029                                project::Column::HostConnectionServerId
1030                                    .eq(connection.owner_id as i32),
1031                            ),
1032                    )
1033                    .exec(&*tx)
1034                    .await?;
1035
1036                let (channel, room) = self.get_channel_room(room_id, &tx).await?;
1037                let deleted = if room.participants.is_empty() {
1038                    let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1039                    result.rows_affected > 0
1040                } else {
1041                    false
1042                };
1043
1044                let left_room = LeftRoom {
1045                    room,
1046                    channel,
1047                    left_projects,
1048                    canceled_calls_to_user_ids,
1049                    deleted,
1050                };
1051
1052                if left_room.room.participants.is_empty() {
1053                    self.rooms.remove(&room_id);
1054                }
1055
1056                Ok(Some((room_id, left_room)))
1057            } else {
1058                Ok(None)
1059            }
1060        })
1061        .await
1062    }
1063
1064    /// Updates the location of a participant in the given room.
1065    pub async fn update_room_participant_location(
1066        &self,
1067        room_id: RoomId,
1068        connection: ConnectionId,
1069        location: proto::ParticipantLocation,
1070    ) -> Result<TransactionGuard<proto::Room>> {
1071        self.room_transaction(room_id, |tx| async {
1072            let tx = tx;
1073            let location_kind;
1074            let location_project_id;
1075            match location.variant.as_ref().context("invalid location")? {
1076                proto::participant_location::Variant::SharedProject(project) => {
1077                    location_kind = 0;
1078                    location_project_id = Some(ProjectId::from_proto(project.id));
1079                }
1080                proto::participant_location::Variant::UnsharedProject(_) => {
1081                    location_kind = 1;
1082                    location_project_id = None;
1083                }
1084                proto::participant_location::Variant::External(_) => {
1085                    location_kind = 2;
1086                    location_project_id = None;
1087                }
1088            }
1089
1090            let result = room_participant::Entity::update_many()
1091                .filter(
1092                    Condition::all()
1093                        .add(room_participant::Column::RoomId.eq(room_id))
1094                        .add(
1095                            room_participant::Column::AnsweringConnectionId
1096                                .eq(connection.id as i32),
1097                        )
1098                        .add(
1099                            room_participant::Column::AnsweringConnectionServerId
1100                                .eq(connection.owner_id as i32),
1101                        ),
1102                )
1103                .set(room_participant::ActiveModel {
1104                    location_kind: ActiveValue::set(Some(location_kind)),
1105                    location_project_id: ActiveValue::set(location_project_id),
1106                    ..Default::default()
1107                })
1108                .exec(&*tx)
1109                .await?;
1110
1111            if result.rows_affected == 1 {
1112                let room = self.get_room(room_id, &tx).await?;
1113                Ok(room)
1114            } else {
1115                Err(anyhow!("could not update room participant location"))?
1116            }
1117        })
1118        .await
1119    }
1120
1121    /// Sets the role of a participant in the given room.
1122    pub async fn set_room_participant_role(
1123        &self,
1124        admin_id: UserId,
1125        room_id: RoomId,
1126        user_id: UserId,
1127        role: ChannelRole,
1128    ) -> Result<TransactionGuard<proto::Room>> {
1129        self.room_transaction(room_id, |tx| async move {
1130            room_participant::Entity::find()
1131                .filter(
1132                    Condition::all()
1133                        .add(room_participant::Column::RoomId.eq(room_id))
1134                        .add(room_participant::Column::UserId.eq(admin_id))
1135                        .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1136                )
1137                .one(&*tx)
1138                .await?
1139                .context("only admins can set participant role")?;
1140
1141            if role.requires_cla() {
1142                self.check_user_has_signed_cla(user_id, room_id, &tx)
1143                    .await?;
1144            }
1145
1146            let result = room_participant::Entity::update_many()
1147                .filter(
1148                    Condition::all()
1149                        .add(room_participant::Column::RoomId.eq(room_id))
1150                        .add(room_participant::Column::UserId.eq(user_id)),
1151                )
1152                .set(room_participant::ActiveModel {
1153                    role: ActiveValue::set(Some(role)),
1154                    ..Default::default()
1155                })
1156                .exec(&*tx)
1157                .await?;
1158
1159            if result.rows_affected != 1 {
1160                Err(anyhow!("could not update room participant role"))?;
1161            }
1162            self.get_room(room_id, &tx).await
1163        })
1164        .await
1165    }
1166
1167    async fn check_user_has_signed_cla(
1168        &self,
1169        user_id: UserId,
1170        room_id: RoomId,
1171        tx: &DatabaseTransaction,
1172    ) -> Result<()> {
1173        let channel = room::Entity::find_by_id(room_id)
1174            .one(tx)
1175            .await?
1176            .context("could not find room")?
1177            .find_related(channel::Entity)
1178            .one(tx)
1179            .await?;
1180
1181        if let Some(channel) = channel {
1182            let requires_zed_cla = channel.requires_zed_cla
1183                || channel::Entity::find()
1184                    .filter(
1185                        channel::Column::Id
1186                            .is_in(channel.ancestors())
1187                            .and(channel::Column::RequiresZedCla.eq(true)),
1188                    )
1189                    .count(tx)
1190                    .await?
1191                    > 0;
1192            if requires_zed_cla
1193                && contributor::Entity::find()
1194                    .filter(contributor::Column::UserId.eq(user_id))
1195                    .one(tx)
1196                    .await?
1197                    .is_none()
1198            {
1199                Err(anyhow!("user has not signed the Zed CLA"))?;
1200            }
1201        }
1202        Ok(())
1203    }
1204
1205    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1206        self.transaction(|tx| async move {
1207            self.room_connection_lost(connection, &tx).await?;
1208            self.channel_buffer_connection_lost(connection, &tx).await?;
1209            Ok(())
1210        })
1211        .await
1212    }
1213
1214    pub async fn room_connection_lost(
1215        &self,
1216        connection: ConnectionId,
1217        tx: &DatabaseTransaction,
1218    ) -> Result<()> {
1219        let participant = room_participant::Entity::find()
1220            .filter(
1221                Condition::all()
1222                    .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1223                    .add(
1224                        room_participant::Column::AnsweringConnectionServerId
1225                            .eq(connection.owner_id as i32),
1226                    ),
1227            )
1228            .one(tx)
1229            .await?;
1230
1231        if let Some(participant) = participant {
1232            room_participant::Entity::update(room_participant::ActiveModel {
1233                answering_connection_lost: ActiveValue::set(true),
1234                ..participant.into_active_model()
1235            })
1236            .exec(tx)
1237            .await?;
1238        }
1239        Ok(())
1240    }
1241
1242    fn build_incoming_call(
1243        room: &proto::Room,
1244        called_user_id: UserId,
1245    ) -> Option<proto::IncomingCall> {
1246        let pending_participant = room
1247            .pending_participants
1248            .iter()
1249            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1250
1251        Some(proto::IncomingCall {
1252            room_id: room.id,
1253            calling_user_id: pending_participant.calling_user_id,
1254            participant_user_ids: room
1255                .participants
1256                .iter()
1257                .map(|participant| participant.user_id)
1258                .collect(),
1259            initial_project: room.participants.iter().find_map(|participant| {
1260                let initial_project_id = pending_participant.initial_project_id?;
1261                participant
1262                    .projects
1263                    .iter()
1264                    .find(|project| project.id == initial_project_id)
1265                    .cloned()
1266            }),
1267        })
1268    }
1269
1270    pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1271        let (_, room) = self.get_channel_room(room_id, tx).await?;
1272        Ok(room)
1273    }
1274
1275    pub async fn room_connection_ids(
1276        &self,
1277        room_id: RoomId,
1278        connection_id: ConnectionId,
1279    ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1280        self.room_transaction(room_id, |tx| async move {
1281            let mut participants = room_participant::Entity::find()
1282                .filter(room_participant::Column::RoomId.eq(room_id))
1283                .stream(&*tx)
1284                .await?;
1285
1286            let mut is_participant = false;
1287            let mut connection_ids = HashSet::default();
1288            while let Some(participant) = participants.next().await {
1289                let participant = participant?;
1290                if let Some(answering_connection) = participant.answering_connection() {
1291                    if answering_connection == connection_id {
1292                        is_participant = true;
1293                    } else {
1294                        connection_ids.insert(answering_connection);
1295                    }
1296                }
1297            }
1298
1299            if !is_participant {
1300                Err(anyhow!("not a room participant"))?;
1301            }
1302
1303            Ok(connection_ids)
1304        })
1305        .await
1306    }
1307
1308    async fn get_channel_room(
1309        &self,
1310        room_id: RoomId,
1311        tx: &DatabaseTransaction,
1312    ) -> Result<(Option<channel::Model>, proto::Room)> {
1313        let db_room = room::Entity::find_by_id(room_id)
1314            .one(tx)
1315            .await?
1316            .context("could not find room")?;
1317
1318        let mut db_participants = db_room
1319            .find_related(room_participant::Entity)
1320            .stream(tx)
1321            .await?;
1322        let mut participants = HashMap::default();
1323        let mut pending_participants = Vec::new();
1324        while let Some(db_participant) = db_participants.next().await {
1325            let db_participant = db_participant?;
1326            if let (
1327                Some(answering_connection_id),
1328                Some(answering_connection_server_id),
1329                Some(participant_index),
1330            ) = (
1331                db_participant.answering_connection_id,
1332                db_participant.answering_connection_server_id,
1333                db_participant.participant_index,
1334            ) {
1335                let location = match (
1336                    db_participant.location_kind,
1337                    db_participant.location_project_id,
1338                ) {
1339                    (Some(0), Some(project_id)) => {
1340                        Some(proto::participant_location::Variant::SharedProject(
1341                            proto::participant_location::SharedProject {
1342                                id: project_id.to_proto(),
1343                            },
1344                        ))
1345                    }
1346                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1347                        Default::default(),
1348                    )),
1349                    _ => Some(proto::participant_location::Variant::External(
1350                        Default::default(),
1351                    )),
1352                };
1353
1354                let answering_connection = ConnectionId {
1355                    owner_id: answering_connection_server_id.0 as u32,
1356                    id: answering_connection_id as u32,
1357                };
1358                participants.insert(
1359                    answering_connection,
1360                    proto::Participant {
1361                        user_id: db_participant.user_id.to_proto(),
1362                        peer_id: Some(answering_connection.into()),
1363                        projects: Default::default(),
1364                        location: Some(proto::ParticipantLocation { variant: location }),
1365                        participant_index: participant_index as u32,
1366                        role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1367                    },
1368                );
1369            } else {
1370                pending_participants.push(proto::PendingParticipant {
1371                    user_id: db_participant.user_id.to_proto(),
1372                    calling_user_id: db_participant.calling_user_id.to_proto(),
1373                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1374                });
1375            }
1376        }
1377        drop(db_participants);
1378
1379        let db_projects = db_room
1380            .find_related(project::Entity)
1381            .find_with_related(worktree::Entity)
1382            .all(tx)
1383            .await?;
1384
1385        for (db_project, db_worktrees) in db_projects {
1386            let host_connection = db_project.host_connection()?;
1387            if let Some(participant) = participants.get_mut(&host_connection) {
1388                participant.projects.push(proto::ParticipantProject {
1389                    id: db_project.id.to_proto(),
1390                    worktree_root_names: Default::default(),
1391                });
1392                let project = participant.projects.last_mut().unwrap();
1393
1394                for db_worktree in db_worktrees {
1395                    if db_worktree.visible {
1396                        project.worktree_root_names.push(db_worktree.root_name);
1397                    }
1398                }
1399            }
1400        }
1401
1402        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1403        let mut followers = Vec::new();
1404        while let Some(db_follower) = db_followers.next().await {
1405            let db_follower = db_follower?;
1406            followers.push(proto::Follower {
1407                leader_id: Some(db_follower.leader_connection().into()),
1408                follower_id: Some(db_follower.follower_connection().into()),
1409                project_id: db_follower.project_id.to_proto(),
1410            });
1411        }
1412        drop(db_followers);
1413
1414        let channel = if let Some(channel_id) = db_room.channel_id {
1415            Some(self.get_channel_internal(channel_id, tx).await?)
1416        } else {
1417            None
1418        };
1419
1420        Ok((
1421            channel,
1422            proto::Room {
1423                id: db_room.id.to_proto(),
1424                livekit_room: db_room.live_kit_room,
1425                participants: participants.into_values().collect(),
1426                pending_participants,
1427                followers,
1428            },
1429        ))
1430    }
1431}