room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{
  14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  15};
  16use language::LanguageRegistry;
  17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  18use postage::stream::Stream;
  19use project::Project;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    ParticipantLocationChanged {
  28        participant_id: proto::PeerId,
  29    },
  30    RemoteVideoTracksChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteProjectShared {
  34        owner: Arc<User>,
  35        project_id: u64,
  36        worktree_root_names: Vec<String>,
  37    },
  38    RemoteProjectUnshared {
  39        project_id: u64,
  40    },
  41    Left,
  42}
  43
  44pub struct Room {
  45    id: u64,
  46    live_kit: Option<LiveKitRoom>,
  47    status: RoomStatus,
  48    shared_projects: HashSet<WeakModelHandle<Project>>,
  49    joined_projects: HashSet<WeakModelHandle<Project>>,
  50    local_participant: LocalParticipant,
  51    remote_participants: BTreeMap<u64, RemoteParticipant>,
  52    pending_participants: Vec<Arc<User>>,
  53    participant_user_ids: HashSet<u64>,
  54    pending_call_count: usize,
  55    leave_when_empty: bool,
  56    client: Arc<Client>,
  57    user_store: ModelHandle<UserStore>,
  58    subscriptions: Vec<client::Subscription>,
  59    pending_room_update: Option<Task<()>>,
  60    maintain_connection: Option<Task<Option<()>>>,
  61}
  62
  63impl Entity for Room {
  64    type Event = Event;
  65
  66    fn release(&mut self, _: &mut MutableAppContext) {
  67        if self.status.is_online() {
  68            log::info!("room was released, sending leave message");
  69            let _ = self.client.send(proto::LeaveRoom {});
  70        }
  71    }
  72}
  73
  74impl Room {
  75    fn new(
  76        id: u64,
  77        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  78        client: Arc<Client>,
  79        user_store: ModelHandle<UserStore>,
  80        cx: &mut ModelContext<Self>,
  81    ) -> Self {
  82        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
  83            let room = live_kit_client::Room::new();
  84            let mut status = room.status();
  85            // Consume the initial status of the room.
  86            let _ = status.try_recv();
  87            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
  88                while let Some(status) = status.next().await {
  89                    let this = if let Some(this) = this.upgrade(&cx) {
  90                        this
  91                    } else {
  92                        break;
  93                    };
  94
  95                    if status == live_kit_client::ConnectionState::Disconnected {
  96                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
  97                        break;
  98                    }
  99                }
 100            });
 101
 102            let mut track_changes = room.remote_video_track_updates();
 103            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 104                while let Some(track_change) = track_changes.next().await {
 105                    let this = if let Some(this) = this.upgrade(&cx) {
 106                        this
 107                    } else {
 108                        break;
 109                    };
 110
 111                    this.update(&mut cx, |this, cx| {
 112                        this.remote_video_track_updated(track_change, cx).log_err()
 113                    });
 114                }
 115            });
 116
 117            cx.foreground()
 118                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 119                .detach_and_log_err(cx);
 120
 121            Some(LiveKitRoom {
 122                room,
 123                screen_track: ScreenTrack::None,
 124                next_publish_id: 0,
 125                _maintain_room,
 126                _maintain_tracks,
 127            })
 128        } else {
 129            None
 130        };
 131
 132        let maintain_connection =
 133            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 134
 135        Self {
 136            id,
 137            live_kit: live_kit_room,
 138            status: RoomStatus::Online,
 139            shared_projects: Default::default(),
 140            joined_projects: Default::default(),
 141            participant_user_ids: Default::default(),
 142            local_participant: Default::default(),
 143            remote_participants: Default::default(),
 144            pending_participants: Default::default(),
 145            pending_call_count: 0,
 146            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 147            leave_when_empty: false,
 148            pending_room_update: None,
 149            client,
 150            user_store,
 151            maintain_connection: Some(maintain_connection),
 152        }
 153    }
 154
 155    pub(crate) fn create(
 156        called_user_id: u64,
 157        initial_project: Option<ModelHandle<Project>>,
 158        client: Arc<Client>,
 159        user_store: ModelHandle<UserStore>,
 160        cx: &mut MutableAppContext,
 161    ) -> Task<Result<ModelHandle<Self>>> {
 162        cx.spawn(|mut cx| async move {
 163            let response = client.request(proto::CreateRoom {}).await?;
 164            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 165            let room = cx.add_model(|cx| {
 166                Self::new(
 167                    room_proto.id,
 168                    response.live_kit_connection_info,
 169                    client,
 170                    user_store,
 171                    cx,
 172                )
 173            });
 174
 175            let initial_project_id = if let Some(initial_project) = initial_project {
 176                let initial_project_id = room
 177                    .update(&mut cx, |room, cx| {
 178                        room.share_project(initial_project.clone(), cx)
 179                    })
 180                    .await?;
 181                Some(initial_project_id)
 182            } else {
 183                None
 184            };
 185
 186            match room
 187                .update(&mut cx, |room, cx| {
 188                    room.leave_when_empty = true;
 189                    room.call(called_user_id, initial_project_id, cx)
 190                })
 191                .await
 192            {
 193                Ok(()) => Ok(room),
 194                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 195            }
 196        })
 197    }
 198
 199    pub(crate) fn join(
 200        call: &IncomingCall,
 201        client: Arc<Client>,
 202        user_store: ModelHandle<UserStore>,
 203        cx: &mut MutableAppContext,
 204    ) -> Task<Result<ModelHandle<Self>>> {
 205        let room_id = call.room_id;
 206        cx.spawn(|mut cx| async move {
 207            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 208            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 209            let room = cx.add_model(|cx| {
 210                Self::new(
 211                    room_id,
 212                    response.live_kit_connection_info,
 213                    client,
 214                    user_store,
 215                    cx,
 216                )
 217            });
 218            room.update(&mut cx, |room, cx| {
 219                room.leave_when_empty = true;
 220                room.apply_room_update(room_proto, cx)?;
 221                anyhow::Ok(())
 222            })?;
 223            Ok(room)
 224        })
 225    }
 226
 227    fn should_leave(&self) -> bool {
 228        self.leave_when_empty
 229            && self.pending_room_update.is_none()
 230            && self.pending_participants.is_empty()
 231            && self.remote_participants.is_empty()
 232            && self.pending_call_count == 0
 233    }
 234
 235    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 236        if self.status.is_offline() {
 237            return Err(anyhow!("room is offline"));
 238        }
 239
 240        cx.notify();
 241        cx.emit(Event::Left);
 242        log::info!("leaving room");
 243
 244        for project in self.shared_projects.drain() {
 245            if let Some(project) = project.upgrade(cx) {
 246                project.update(cx, |project, cx| {
 247                    project.unshare(cx).log_err();
 248                });
 249            }
 250        }
 251        for project in self.joined_projects.drain() {
 252            if let Some(project) = project.upgrade(cx) {
 253                project.update(cx, |project, cx| {
 254                    project.disconnected_from_host(cx);
 255                });
 256            }
 257        }
 258
 259        self.status = RoomStatus::Offline;
 260        self.remote_participants.clear();
 261        self.pending_participants.clear();
 262        self.participant_user_ids.clear();
 263        self.subscriptions.clear();
 264        self.live_kit.take();
 265        self.pending_room_update.take();
 266        self.maintain_connection.take();
 267        self.client.send(proto::LeaveRoom {})?;
 268        Ok(())
 269    }
 270
 271    async fn maintain_connection(
 272        this: WeakModelHandle<Self>,
 273        client: Arc<Client>,
 274        mut cx: AsyncAppContext,
 275    ) -> Result<()> {
 276        let mut client_status = client.status();
 277        loop {
 278            let is_connected = client_status
 279                .next()
 280                .await
 281                .map_or(false, |s| s.is_connected());
 282
 283            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 284            if !is_connected || client_status.next().await.is_some() {
 285                log::info!("detected client disconnection");
 286                this.upgrade(&cx)
 287                    .ok_or_else(|| anyhow!("room was dropped"))?
 288                    .update(&mut cx, |this, cx| {
 289                        this.status = RoomStatus::Rejoining;
 290                        cx.notify();
 291                    });
 292
 293                // Wait for client to re-establish a connection to the server.
 294                {
 295                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 296                    let client_reconnection = async {
 297                        let mut remaining_attempts = 3;
 298                        while remaining_attempts > 0 {
 299                            log::info!(
 300                                "waiting for client status change, remaining attempts {}",
 301                                remaining_attempts
 302                            );
 303                            let Some(status) = client_status.next().await else { break };
 304                            if status.is_connected() {
 305                                log::info!("client reconnected, attempting to rejoin room");
 306
 307                                let Some(this) = this.upgrade(&cx) else { break };
 308                                if this
 309                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 310                                    .await
 311                                    .log_err()
 312                                    .is_some()
 313                                {
 314                                    return true;
 315                                } else {
 316                                    remaining_attempts -= 1;
 317                                }
 318                            }
 319                        }
 320                        false
 321                    }
 322                    .fuse();
 323                    futures::pin_mut!(client_reconnection);
 324
 325                    futures::select_biased! {
 326                        reconnected = client_reconnection => {
 327                            if reconnected {
 328                                log::info!("successfully reconnected to room");
 329                                // If we successfully joined the room, go back around the loop
 330                                // waiting for future connection status changes.
 331                                continue;
 332                            }
 333                        }
 334                        _ = reconnection_timeout => {
 335                            log::info!("room reconnection timeout expired");
 336                        }
 337                    }
 338                }
 339
 340                // The client failed to re-establish a connection to the server
 341                // or an error occurred while trying to re-join the room. Either way
 342                // we leave the room and return an error.
 343                if let Some(this) = this.upgrade(&cx) {
 344                    log::info!("reconnection failed, leaving room");
 345                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 346                }
 347                return Err(anyhow!(
 348                    "can't reconnect to room: client failed to re-establish connection"
 349                ));
 350            }
 351        }
 352    }
 353
 354    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 355        let mut projects = HashMap::default();
 356        let mut reshared_projects = Vec::new();
 357        let mut rejoined_projects = Vec::new();
 358        self.shared_projects.retain(|project| {
 359            if let Some(handle) = project.upgrade(cx) {
 360                let project = handle.read(cx);
 361                if let Some(project_id) = project.remote_id() {
 362                    projects.insert(project_id, handle.clone());
 363                    reshared_projects.push(proto::UpdateProject {
 364                        project_id,
 365                        worktrees: project.worktree_metadata_protos(cx),
 366                    });
 367                    return true;
 368                }
 369            }
 370            false
 371        });
 372        self.joined_projects.retain(|project| {
 373            if let Some(handle) = project.upgrade(cx) {
 374                let project = handle.read(cx);
 375                if let Some(project_id) = project.remote_id() {
 376                    projects.insert(project_id, handle.clone());
 377                    rejoined_projects.push(proto::RejoinProject {
 378                        id: project_id,
 379                        worktrees: project
 380                            .worktrees(cx)
 381                            .map(|worktree| {
 382                                let worktree = worktree.read(cx);
 383                                proto::RejoinWorktree {
 384                                    id: worktree.id().to_proto(),
 385                                    scan_id: worktree.completed_scan_id() as u64,
 386                                }
 387                            })
 388                            .collect(),
 389                    });
 390                }
 391                return true;
 392            }
 393            false
 394        });
 395
 396        let response = self.client.request(proto::RejoinRoom {
 397            id: self.id,
 398            reshared_projects,
 399            rejoined_projects,
 400        });
 401
 402        cx.spawn(|this, mut cx| async move {
 403            let response = response.await?;
 404            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 405            this.update(&mut cx, |this, cx| {
 406                this.status = RoomStatus::Online;
 407                this.apply_room_update(room_proto, cx)?;
 408
 409                for reshared_project in response.reshared_projects {
 410                    if let Some(project) = projects.get(&reshared_project.id) {
 411                        project.update(cx, |project, cx| {
 412                            project.reshared(reshared_project, cx).log_err();
 413                        });
 414                    }
 415                }
 416
 417                for rejoined_project in response.rejoined_projects {
 418                    if let Some(project) = projects.get(&rejoined_project.id) {
 419                        project.update(cx, |project, cx| {
 420                            project.rejoined(rejoined_project, cx).log_err();
 421                        });
 422                    }
 423                }
 424
 425                anyhow::Ok(())
 426            })
 427        })
 428    }
 429
 430    pub fn id(&self) -> u64 {
 431        self.id
 432    }
 433
 434    pub fn status(&self) -> RoomStatus {
 435        self.status
 436    }
 437
 438    pub fn local_participant(&self) -> &LocalParticipant {
 439        &self.local_participant
 440    }
 441
 442    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 443        &self.remote_participants
 444    }
 445
 446    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 447        self.remote_participants
 448            .values()
 449            .find(|p| p.peer_id == peer_id)
 450    }
 451
 452    pub fn pending_participants(&self) -> &[Arc<User>] {
 453        &self.pending_participants
 454    }
 455
 456    pub fn contains_participant(&self, user_id: u64) -> bool {
 457        self.participant_user_ids.contains(&user_id)
 458    }
 459
 460    async fn handle_room_updated(
 461        this: ModelHandle<Self>,
 462        envelope: TypedEnvelope<proto::RoomUpdated>,
 463        _: Arc<Client>,
 464        mut cx: AsyncAppContext,
 465    ) -> Result<()> {
 466        let room = envelope
 467            .payload
 468            .room
 469            .ok_or_else(|| anyhow!("invalid room"))?;
 470        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 471    }
 472
 473    fn apply_room_update(
 474        &mut self,
 475        mut room: proto::Room,
 476        cx: &mut ModelContext<Self>,
 477    ) -> Result<()> {
 478        // Filter ourselves out from the room's participants.
 479        let local_participant_ix = room
 480            .participants
 481            .iter()
 482            .position(|participant| Some(participant.user_id) == self.client.user_id());
 483        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 484
 485        let pending_participant_user_ids = room
 486            .pending_participants
 487            .iter()
 488            .map(|p| p.user_id)
 489            .collect::<Vec<_>>();
 490        let remote_participant_user_ids = room
 491            .participants
 492            .iter()
 493            .map(|p| p.user_id)
 494            .collect::<Vec<_>>();
 495        let (remote_participants, pending_participants) =
 496            self.user_store.update(cx, move |user_store, cx| {
 497                (
 498                    user_store.get_users(remote_participant_user_ids, cx),
 499                    user_store.get_users(pending_participant_user_ids, cx),
 500                )
 501            });
 502        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 503            let (remote_participants, pending_participants) =
 504                futures::join!(remote_participants, pending_participants);
 505
 506            this.update(&mut cx, |this, cx| {
 507                this.participant_user_ids.clear();
 508
 509                if let Some(participant) = local_participant {
 510                    this.local_participant.projects = participant.projects;
 511                } else {
 512                    this.local_participant.projects.clear();
 513                }
 514
 515                if let Some(participants) = remote_participants.log_err() {
 516                    for (participant, user) in room.participants.into_iter().zip(participants) {
 517                        let Some(peer_id) = participant.peer_id else { continue };
 518                        this.participant_user_ids.insert(participant.user_id);
 519
 520                        let old_projects = this
 521                            .remote_participants
 522                            .get(&participant.user_id)
 523                            .into_iter()
 524                            .flat_map(|existing| &existing.projects)
 525                            .map(|project| project.id)
 526                            .collect::<HashSet<_>>();
 527                        let new_projects = participant
 528                            .projects
 529                            .iter()
 530                            .map(|project| project.id)
 531                            .collect::<HashSet<_>>();
 532
 533                        for project in &participant.projects {
 534                            if !old_projects.contains(&project.id) {
 535                                cx.emit(Event::RemoteProjectShared {
 536                                    owner: user.clone(),
 537                                    project_id: project.id,
 538                                    worktree_root_names: project.worktree_root_names.clone(),
 539                                });
 540                            }
 541                        }
 542
 543                        for unshared_project_id in old_projects.difference(&new_projects) {
 544                            this.joined_projects.retain(|project| {
 545                                if let Some(project) = project.upgrade(cx) {
 546                                    project.update(cx, |project, cx| {
 547                                        if project.remote_id() == Some(*unshared_project_id) {
 548                                            project.disconnected_from_host(cx);
 549                                            false
 550                                        } else {
 551                                            true
 552                                        }
 553                                    })
 554                                } else {
 555                                    false
 556                                }
 557                            });
 558                            cx.emit(Event::RemoteProjectUnshared {
 559                                project_id: *unshared_project_id,
 560                            });
 561                        }
 562
 563                        let location = ParticipantLocation::from_proto(participant.location)
 564                            .unwrap_or(ParticipantLocation::External);
 565                        if let Some(remote_participant) =
 566                            this.remote_participants.get_mut(&participant.user_id)
 567                        {
 568                            remote_participant.projects = participant.projects;
 569                            remote_participant.peer_id = peer_id;
 570                            if location != remote_participant.location {
 571                                remote_participant.location = location;
 572                                cx.emit(Event::ParticipantLocationChanged {
 573                                    participant_id: peer_id,
 574                                });
 575                            }
 576                        } else {
 577                            this.remote_participants.insert(
 578                                participant.user_id,
 579                                RemoteParticipant {
 580                                    user: user.clone(),
 581                                    peer_id,
 582                                    projects: participant.projects,
 583                                    location,
 584                                    tracks: Default::default(),
 585                                },
 586                            );
 587
 588                            if let Some(live_kit) = this.live_kit.as_ref() {
 589                                let tracks =
 590                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
 591                                for track in tracks {
 592                                    this.remote_video_track_updated(
 593                                        RemoteVideoTrackUpdate::Subscribed(track),
 594                                        cx,
 595                                    )
 596                                    .log_err();
 597                                }
 598                            }
 599                        }
 600                    }
 601
 602                    this.remote_participants.retain(|user_id, participant| {
 603                        if this.participant_user_ids.contains(user_id) {
 604                            true
 605                        } else {
 606                            for project in &participant.projects {
 607                                cx.emit(Event::RemoteProjectUnshared {
 608                                    project_id: project.id,
 609                                });
 610                            }
 611                            false
 612                        }
 613                    });
 614                }
 615
 616                if let Some(pending_participants) = pending_participants.log_err() {
 617                    this.pending_participants = pending_participants;
 618                    for participant in &this.pending_participants {
 619                        this.participant_user_ids.insert(participant.id);
 620                    }
 621                }
 622
 623                this.pending_room_update.take();
 624                if this.should_leave() {
 625                    log::info!("room is empty, leaving");
 626                    let _ = this.leave(cx);
 627                }
 628
 629                this.check_invariants();
 630                cx.notify();
 631            });
 632        }));
 633
 634        cx.notify();
 635        Ok(())
 636    }
 637
 638    fn remote_video_track_updated(
 639        &mut self,
 640        change: RemoteVideoTrackUpdate,
 641        cx: &mut ModelContext<Self>,
 642    ) -> Result<()> {
 643        match change {
 644            RemoteVideoTrackUpdate::Subscribed(track) => {
 645                let user_id = track.publisher_id().parse()?;
 646                let track_id = track.sid().to_string();
 647                let participant = self
 648                    .remote_participants
 649                    .get_mut(&user_id)
 650                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 651                participant.tracks.insert(
 652                    track_id.clone(),
 653                    Arc::new(RemoteVideoTrack {
 654                        live_kit_track: track,
 655                    }),
 656                );
 657                cx.emit(Event::RemoteVideoTracksChanged {
 658                    participant_id: participant.peer_id,
 659                });
 660            }
 661            RemoteVideoTrackUpdate::Unsubscribed {
 662                publisher_id,
 663                track_id,
 664            } => {
 665                let user_id = publisher_id.parse()?;
 666                let participant = self
 667                    .remote_participants
 668                    .get_mut(&user_id)
 669                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 670                participant.tracks.remove(&track_id);
 671                cx.emit(Event::RemoteVideoTracksChanged {
 672                    participant_id: participant.peer_id,
 673                });
 674            }
 675        }
 676
 677        cx.notify();
 678        Ok(())
 679    }
 680
 681    fn check_invariants(&self) {
 682        #[cfg(any(test, feature = "test-support"))]
 683        {
 684            for participant in self.remote_participants.values() {
 685                assert!(self.participant_user_ids.contains(&participant.user.id));
 686                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 687            }
 688
 689            for participant in &self.pending_participants {
 690                assert!(self.participant_user_ids.contains(&participant.id));
 691                assert_ne!(participant.id, self.client.user_id().unwrap());
 692            }
 693
 694            assert_eq!(
 695                self.participant_user_ids.len(),
 696                self.remote_participants.len() + self.pending_participants.len()
 697            );
 698        }
 699    }
 700
 701    pub(crate) fn call(
 702        &mut self,
 703        called_user_id: u64,
 704        initial_project_id: Option<u64>,
 705        cx: &mut ModelContext<Self>,
 706    ) -> Task<Result<()>> {
 707        if self.status.is_offline() {
 708            return Task::ready(Err(anyhow!("room is offline")));
 709        }
 710
 711        cx.notify();
 712        let client = self.client.clone();
 713        let room_id = self.id;
 714        self.pending_call_count += 1;
 715        cx.spawn(|this, mut cx| async move {
 716            let result = client
 717                .request(proto::Call {
 718                    room_id,
 719                    called_user_id,
 720                    initial_project_id,
 721                })
 722                .await;
 723            this.update(&mut cx, |this, cx| {
 724                this.pending_call_count -= 1;
 725                if this.should_leave() {
 726                    this.leave(cx)?;
 727                }
 728                result
 729            })?;
 730            Ok(())
 731        })
 732    }
 733
 734    pub fn join_project(
 735        &mut self,
 736        id: u64,
 737        language_registry: Arc<LanguageRegistry>,
 738        fs: Arc<dyn Fs>,
 739        cx: &mut ModelContext<Self>,
 740    ) -> Task<Result<ModelHandle<Project>>> {
 741        let client = self.client.clone();
 742        let user_store = self.user_store.clone();
 743        cx.spawn(|this, mut cx| async move {
 744            let project =
 745                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 746            this.update(&mut cx, |this, cx| {
 747                this.joined_projects.retain(|project| {
 748                    if let Some(project) = project.upgrade(cx) {
 749                        !project.read(cx).is_read_only()
 750                    } else {
 751                        false
 752                    }
 753                });
 754                this.joined_projects.insert(project.downgrade());
 755            });
 756            Ok(project)
 757        })
 758    }
 759
 760    pub(crate) fn share_project(
 761        &mut self,
 762        project: ModelHandle<Project>,
 763        cx: &mut ModelContext<Self>,
 764    ) -> Task<Result<u64>> {
 765        if let Some(project_id) = project.read(cx).remote_id() {
 766            return Task::ready(Ok(project_id));
 767        }
 768
 769        let request = self.client.request(proto::ShareProject {
 770            room_id: self.id(),
 771            worktrees: project.read(cx).worktree_metadata_protos(cx),
 772        });
 773        cx.spawn(|this, mut cx| async move {
 774            let response = request.await?;
 775
 776            project.update(&mut cx, |project, cx| {
 777                project.shared(response.project_id, cx)
 778            })?;
 779
 780            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 781            this.update(&mut cx, |this, cx| {
 782                this.shared_projects.insert(project.downgrade());
 783                let active_project = this.local_participant.active_project.as_ref();
 784                if active_project.map_or(false, |location| *location == project) {
 785                    this.set_location(Some(&project), cx)
 786                } else {
 787                    Task::ready(Ok(()))
 788                }
 789            })
 790            .await?;
 791
 792            Ok(response.project_id)
 793        })
 794    }
 795
 796    pub(crate) fn unshare_project(
 797        &mut self,
 798        project: ModelHandle<Project>,
 799        cx: &mut ModelContext<Self>,
 800    ) -> Result<()> {
 801        let project_id = match project.read(cx).remote_id() {
 802            Some(project_id) => project_id,
 803            None => return Ok(()),
 804        };
 805
 806        self.client.send(proto::UnshareProject { project_id })?;
 807        project.update(cx, |this, cx| this.unshare(cx))
 808    }
 809
 810    pub(crate) fn set_location(
 811        &mut self,
 812        project: Option<&ModelHandle<Project>>,
 813        cx: &mut ModelContext<Self>,
 814    ) -> Task<Result<()>> {
 815        if self.status.is_offline() {
 816            return Task::ready(Err(anyhow!("room is offline")));
 817        }
 818
 819        let client = self.client.clone();
 820        let room_id = self.id;
 821        let location = if let Some(project) = project {
 822            self.local_participant.active_project = Some(project.downgrade());
 823            if let Some(project_id) = project.read(cx).remote_id() {
 824                proto::participant_location::Variant::SharedProject(
 825                    proto::participant_location::SharedProject { id: project_id },
 826                )
 827            } else {
 828                proto::participant_location::Variant::UnsharedProject(
 829                    proto::participant_location::UnsharedProject {},
 830                )
 831            }
 832        } else {
 833            self.local_participant.active_project = None;
 834            proto::participant_location::Variant::External(proto::participant_location::External {})
 835        };
 836
 837        cx.notify();
 838        cx.foreground().spawn(async move {
 839            client
 840                .request(proto::UpdateParticipantLocation {
 841                    room_id,
 842                    location: Some(proto::ParticipantLocation {
 843                        variant: Some(location),
 844                    }),
 845                })
 846                .await?;
 847            Ok(())
 848        })
 849    }
 850
 851    pub fn is_screen_sharing(&self) -> bool {
 852        self.live_kit.as_ref().map_or(false, |live_kit| {
 853            !matches!(live_kit.screen_track, ScreenTrack::None)
 854        })
 855    }
 856
 857    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 858        if self.status.is_offline() {
 859            return Task::ready(Err(anyhow!("room is offline")));
 860        } else if self.is_screen_sharing() {
 861            return Task::ready(Err(anyhow!("screen was already shared")));
 862        }
 863
 864        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 865            let publish_id = post_inc(&mut live_kit.next_publish_id);
 866            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 867            cx.notify();
 868            (live_kit.room.display_sources(), publish_id)
 869        } else {
 870            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 871        };
 872
 873        cx.spawn_weak(|this, mut cx| async move {
 874            let publish_track = async {
 875                let displays = displays.await?;
 876                let display = displays
 877                    .first()
 878                    .ok_or_else(|| anyhow!("no display found"))?;
 879                let track = LocalVideoTrack::screen_share_for_display(&display);
 880                this.upgrade(&cx)
 881                    .ok_or_else(|| anyhow!("room was dropped"))?
 882                    .read_with(&cx, |this, _| {
 883                        this.live_kit
 884                            .as_ref()
 885                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 886                    })
 887                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 888                    .await
 889            };
 890
 891            let publication = publish_track.await;
 892            this.upgrade(&cx)
 893                .ok_or_else(|| anyhow!("room was dropped"))?
 894                .update(&mut cx, |this, cx| {
 895                    let live_kit = this
 896                        .live_kit
 897                        .as_mut()
 898                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 899
 900                    let canceled = if let ScreenTrack::Pending {
 901                        publish_id: cur_publish_id,
 902                    } = &live_kit.screen_track
 903                    {
 904                        *cur_publish_id != publish_id
 905                    } else {
 906                        true
 907                    };
 908
 909                    match publication {
 910                        Ok(publication) => {
 911                            if canceled {
 912                                live_kit.room.unpublish_track(publication);
 913                            } else {
 914                                live_kit.screen_track = ScreenTrack::Published(publication);
 915                                cx.notify();
 916                            }
 917                            Ok(())
 918                        }
 919                        Err(error) => {
 920                            if canceled {
 921                                Ok(())
 922                            } else {
 923                                live_kit.screen_track = ScreenTrack::None;
 924                                cx.notify();
 925                                Err(error)
 926                            }
 927                        }
 928                    }
 929                })
 930        })
 931    }
 932
 933    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 934        if self.status.is_offline() {
 935            return Err(anyhow!("room is offline"));
 936        }
 937
 938        let live_kit = self
 939            .live_kit
 940            .as_mut()
 941            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 942        match mem::take(&mut live_kit.screen_track) {
 943            ScreenTrack::None => Err(anyhow!("screen was not shared")),
 944            ScreenTrack::Pending { .. } => {
 945                cx.notify();
 946                Ok(())
 947            }
 948            ScreenTrack::Published(track) => {
 949                live_kit.room.unpublish_track(track);
 950                cx.notify();
 951                Ok(())
 952            }
 953        }
 954    }
 955
 956    #[cfg(any(test, feature = "test-support"))]
 957    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
 958        self.live_kit
 959            .as_ref()
 960            .unwrap()
 961            .room
 962            .set_display_sources(sources);
 963    }
 964}
 965
 966struct LiveKitRoom {
 967    room: Arc<live_kit_client::Room>,
 968    screen_track: ScreenTrack,
 969    next_publish_id: usize,
 970    _maintain_room: Task<()>,
 971    _maintain_tracks: Task<()>,
 972}
 973
 974enum ScreenTrack {
 975    None,
 976    Pending { publish_id: usize },
 977    Published(LocalTrackPublication),
 978}
 979
 980impl Default for ScreenTrack {
 981    fn default() -> Self {
 982        Self::None
 983    }
 984}
 985
 986#[derive(Copy, Clone, PartialEq, Eq)]
 987pub enum RoomStatus {
 988    Online,
 989    Rejoining,
 990    Offline,
 991}
 992
 993impl RoomStatus {
 994    pub fn is_offline(&self) -> bool {
 995        matches!(self, RoomStatus::Offline)
 996    }
 997
 998    pub fn is_online(&self) -> bool {
 999        matches!(self, RoomStatus::Online)
1000    }
1001}