room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{
  14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  15};
  16use language::LanguageRegistry;
  17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  18use postage::stream::Stream;
  19use project::Project;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    ParticipantLocationChanged {
  28        participant_id: proto::PeerId,
  29    },
  30    RemoteVideoTracksChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteProjectShared {
  34        owner: Arc<User>,
  35        project_id: u64,
  36        worktree_root_names: Vec<String>,
  37    },
  38    RemoteProjectUnshared {
  39        project_id: u64,
  40    },
  41    Left,
  42}
  43
  44pub struct Room {
  45    id: u64,
  46    live_kit: Option<LiveKitRoom>,
  47    status: RoomStatus,
  48    shared_projects: HashSet<WeakModelHandle<Project>>,
  49    joined_projects: HashSet<WeakModelHandle<Project>>,
  50    local_participant: LocalParticipant,
  51    remote_participants: BTreeMap<u64, RemoteParticipant>,
  52    pending_participants: Vec<Arc<User>>,
  53    participant_user_ids: HashSet<u64>,
  54    pending_call_count: usize,
  55    leave_when_empty: bool,
  56    client: Arc<Client>,
  57    user_store: ModelHandle<UserStore>,
  58    subscriptions: Vec<client::Subscription>,
  59    pending_room_update: Option<Task<()>>,
  60    maintain_connection: Option<Task<Option<()>>>,
  61}
  62
  63impl Entity for Room {
  64    type Event = Event;
  65
  66    fn release(&mut self, _: &mut MutableAppContext) {
  67        if self.status.is_online() {
  68            log::info!("room was released, sending leave message");
  69            let _ = self.client.send(proto::LeaveRoom {});
  70        }
  71    }
  72}
  73
  74impl Room {
  75    fn new(
  76        id: u64,
  77        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  78        client: Arc<Client>,
  79        user_store: ModelHandle<UserStore>,
  80        cx: &mut ModelContext<Self>,
  81    ) -> Self {
  82        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
  83            let room = live_kit_client::Room::new();
  84            let mut status = room.status();
  85            // Consume the initial status of the room.
  86            let _ = status.try_recv();
  87            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
  88                while let Some(status) = status.next().await {
  89                    let this = if let Some(this) = this.upgrade(&cx) {
  90                        this
  91                    } else {
  92                        break;
  93                    };
  94
  95                    if status == live_kit_client::ConnectionState::Disconnected {
  96                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
  97                        break;
  98                    }
  99                }
 100            });
 101
 102            let mut track_changes = room.remote_video_track_updates();
 103            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 104                while let Some(track_change) = track_changes.next().await {
 105                    let this = if let Some(this) = this.upgrade(&cx) {
 106                        this
 107                    } else {
 108                        break;
 109                    };
 110
 111                    this.update(&mut cx, |this, cx| {
 112                        this.remote_video_track_updated(track_change, cx).log_err()
 113                    });
 114                }
 115            });
 116
 117            cx.foreground()
 118                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 119                .detach_and_log_err(cx);
 120
 121            Some(LiveKitRoom {
 122                room,
 123                screen_track: ScreenTrack::None,
 124                next_publish_id: 0,
 125                _maintain_room,
 126                _maintain_tracks,
 127            })
 128        } else {
 129            None
 130        };
 131
 132        let maintain_connection =
 133            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 134
 135        Self {
 136            id,
 137            live_kit: live_kit_room,
 138            status: RoomStatus::Online,
 139            shared_projects: Default::default(),
 140            joined_projects: Default::default(),
 141            participant_user_ids: Default::default(),
 142            local_participant: Default::default(),
 143            remote_participants: Default::default(),
 144            pending_participants: Default::default(),
 145            pending_call_count: 0,
 146            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 147            leave_when_empty: false,
 148            pending_room_update: None,
 149            client,
 150            user_store,
 151            maintain_connection: Some(maintain_connection),
 152        }
 153    }
 154
 155    pub(crate) fn create(
 156        called_user_id: u64,
 157        initial_project: Option<ModelHandle<Project>>,
 158        client: Arc<Client>,
 159        user_store: ModelHandle<UserStore>,
 160        cx: &mut MutableAppContext,
 161    ) -> Task<Result<ModelHandle<Self>>> {
 162        cx.spawn(|mut cx| async move {
 163            let response = client.request(proto::CreateRoom {}).await?;
 164            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 165            let room = cx.add_model(|cx| {
 166                Self::new(
 167                    room_proto.id,
 168                    response.live_kit_connection_info,
 169                    client,
 170                    user_store,
 171                    cx,
 172                )
 173            });
 174
 175            let initial_project_id = if let Some(initial_project) = initial_project {
 176                let initial_project_id = room
 177                    .update(&mut cx, |room, cx| {
 178                        room.share_project(initial_project.clone(), cx)
 179                    })
 180                    .await?;
 181                Some(initial_project_id)
 182            } else {
 183                None
 184            };
 185
 186            match room
 187                .update(&mut cx, |room, cx| {
 188                    room.leave_when_empty = true;
 189                    room.call(called_user_id, initial_project_id, cx)
 190                })
 191                .await
 192            {
 193                Ok(()) => Ok(room),
 194                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 195            }
 196        })
 197    }
 198
 199    pub(crate) fn join(
 200        call: &IncomingCall,
 201        client: Arc<Client>,
 202        user_store: ModelHandle<UserStore>,
 203        cx: &mut MutableAppContext,
 204    ) -> Task<Result<ModelHandle<Self>>> {
 205        let room_id = call.room_id;
 206        cx.spawn(|mut cx| async move {
 207            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 208            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 209            let room = cx.add_model(|cx| {
 210                Self::new(
 211                    room_id,
 212                    response.live_kit_connection_info,
 213                    client,
 214                    user_store,
 215                    cx,
 216                )
 217            });
 218            room.update(&mut cx, |room, cx| {
 219                room.leave_when_empty = true;
 220                room.apply_room_update(room_proto, cx)?;
 221                anyhow::Ok(())
 222            })?;
 223            Ok(room)
 224        })
 225    }
 226
 227    fn should_leave(&self) -> bool {
 228        self.leave_when_empty
 229            && self.pending_room_update.is_none()
 230            && self.pending_participants.is_empty()
 231            && self.remote_participants.is_empty()
 232            && self.pending_call_count == 0
 233    }
 234
 235    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 236        if self.status.is_offline() {
 237            return Err(anyhow!("room is offline"));
 238        }
 239
 240        cx.notify();
 241        cx.emit(Event::Left);
 242        log::info!("leaving room");
 243
 244        for project in self.shared_projects.drain() {
 245            if let Some(project) = project.upgrade(cx) {
 246                project.update(cx, |project, cx| {
 247                    project.unshare(cx).log_err();
 248                });
 249            }
 250        }
 251        for project in self.joined_projects.drain() {
 252            if let Some(project) = project.upgrade(cx) {
 253                project.update(cx, |project, cx| {
 254                    project.disconnected_from_host(cx);
 255                });
 256            }
 257        }
 258
 259        self.status = RoomStatus::Offline;
 260        self.remote_participants.clear();
 261        self.pending_participants.clear();
 262        self.participant_user_ids.clear();
 263        self.subscriptions.clear();
 264        self.live_kit.take();
 265        self.pending_room_update.take();
 266        self.maintain_connection.take();
 267        self.client.send(proto::LeaveRoom {})?;
 268        Ok(())
 269    }
 270
 271    async fn maintain_connection(
 272        this: WeakModelHandle<Self>,
 273        client: Arc<Client>,
 274        mut cx: AsyncAppContext,
 275    ) -> Result<()> {
 276        let mut client_status = client.status();
 277        loop {
 278            let is_connected = client_status
 279                .next()
 280                .await
 281                .map_or(false, |s| s.is_connected());
 282
 283            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 284            if !is_connected || client_status.next().await.is_some() {
 285                log::info!("detected client disconnection");
 286                this.upgrade(&cx)
 287                    .ok_or_else(|| anyhow!("room was dropped"))?
 288                    .update(&mut cx, |this, cx| {
 289                        this.status = RoomStatus::Rejoining;
 290                        cx.notify();
 291                    });
 292
 293                // Wait for client to re-establish a connection to the server.
 294                {
 295                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 296                    let client_reconnection = async {
 297                        let mut remaining_attempts = 3;
 298                        while remaining_attempts > 0 {
 299                            log::info!(
 300                                "waiting for client status change, remaining attempts {}",
 301                                remaining_attempts
 302                            );
 303                            let Some(status) = client_status.next().await else { break };
 304                            if status.is_connected() {
 305                                log::info!("client reconnected, attempting to rejoin room");
 306
 307                                let Some(this) = this.upgrade(&cx) else { break };
 308                                if this
 309                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 310                                    .await
 311                                    .log_err()
 312                                    .is_some()
 313                                {
 314                                    return true;
 315                                } else {
 316                                    remaining_attempts -= 1;
 317                                }
 318                            }
 319                        }
 320                        false
 321                    }
 322                    .fuse();
 323                    futures::pin_mut!(client_reconnection);
 324
 325                    futures::select_biased! {
 326                        reconnected = client_reconnection => {
 327                            if reconnected {
 328                                log::info!("successfully reconnected to room");
 329                                // If we successfully joined the room, go back around the loop
 330                                // waiting for future connection status changes.
 331                                continue;
 332                            }
 333                        }
 334                        _ = reconnection_timeout => {
 335                            log::info!("room reconnection timeout expired");
 336                        }
 337                    }
 338                }
 339
 340                // The client failed to re-establish a connection to the server
 341                // or an error occurred while trying to re-join the room. Either way
 342                // we leave the room and return an error.
 343                if let Some(this) = this.upgrade(&cx) {
 344                    log::info!("reconnection failed, leaving room");
 345                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 346                }
 347                return Err(anyhow!(
 348                    "can't reconnect to room: client failed to re-establish connection"
 349                ));
 350            }
 351        }
 352    }
 353
 354    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 355        let mut projects = HashMap::default();
 356        let mut reshared_projects = Vec::new();
 357        let mut rejoined_projects = Vec::new();
 358        self.shared_projects.retain(|project| {
 359            if let Some(handle) = project.upgrade(cx) {
 360                let project = handle.read(cx);
 361                if let Some(project_id) = project.remote_id() {
 362                    projects.insert(project_id, handle.clone());
 363                    reshared_projects.push(proto::UpdateProject {
 364                        project_id,
 365                        worktrees: project.worktree_metadata_protos(cx),
 366                    });
 367                    return true;
 368                }
 369            }
 370            false
 371        });
 372        self.joined_projects.retain(|project| {
 373            if let Some(handle) = project.upgrade(cx) {
 374                let project = handle.read(cx);
 375                if let Some(project_id) = project.remote_id() {
 376                    projects.insert(project_id, handle.clone());
 377                    rejoined_projects.push(proto::RejoinProject {
 378                        id: project_id,
 379                        worktrees: project
 380                            .worktrees(cx)
 381                            .map(|worktree| {
 382                                let worktree = worktree.read(cx);
 383                                proto::RejoinWorktree {
 384                                    id: worktree.id().to_proto(),
 385                                    scan_id: worktree.completed_scan_id() as u64,
 386                                }
 387                            })
 388                            .collect(),
 389                    });
 390                }
 391                return true;
 392            }
 393            false
 394        });
 395
 396        let response = self.client.request(proto::RejoinRoom {
 397            id: self.id,
 398            reshared_projects,
 399            rejoined_projects,
 400        });
 401
 402        cx.spawn(|this, mut cx| async move {
 403            let response = response.await?;
 404            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 405            this.update(&mut cx, |this, cx| {
 406                this.status = RoomStatus::Online;
 407                this.apply_room_update(room_proto, cx)?;
 408
 409                for reshared_project in response.reshared_projects {
 410                    if let Some(project) = projects.get(&reshared_project.id) {
 411                        project.update(cx, |project, cx| {
 412                            project.reshared(reshared_project, cx).log_err();
 413                        });
 414                    }
 415                }
 416
 417                for rejoined_project in response.rejoined_projects {
 418                    if let Some(project) = projects.get(&rejoined_project.id) {
 419                        project.update(cx, |project, cx| {
 420                            project.rejoined(rejoined_project, cx).log_err();
 421                        });
 422                    }
 423                }
 424
 425                anyhow::Ok(())
 426            })
 427        })
 428    }
 429
 430    pub fn id(&self) -> u64 {
 431        self.id
 432    }
 433
 434    pub fn status(&self) -> RoomStatus {
 435        self.status
 436    }
 437
 438    pub fn local_participant(&self) -> &LocalParticipant {
 439        &self.local_participant
 440    }
 441
 442    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 443        &self.remote_participants
 444    }
 445
 446    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 447        self.remote_participants
 448            .values()
 449            .find(|p| p.peer_id == peer_id)
 450    }
 451
 452    pub fn pending_participants(&self) -> &[Arc<User>] {
 453        &self.pending_participants
 454    }
 455
 456    pub fn contains_participant(&self, user_id: u64) -> bool {
 457        self.participant_user_ids.contains(&user_id)
 458    }
 459
 460    async fn handle_room_updated(
 461        this: ModelHandle<Self>,
 462        envelope: TypedEnvelope<proto::RoomUpdated>,
 463        _: Arc<Client>,
 464        mut cx: AsyncAppContext,
 465    ) -> Result<()> {
 466        let room = envelope
 467            .payload
 468            .room
 469            .ok_or_else(|| anyhow!("invalid room"))?;
 470        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 471    }
 472
 473    fn apply_room_update(
 474        &mut self,
 475        mut room: proto::Room,
 476        cx: &mut ModelContext<Self>,
 477    ) -> Result<()> {
 478        // Filter ourselves out from the room's participants.
 479        let local_participant_ix = room
 480            .participants
 481            .iter()
 482            .position(|participant| Some(participant.user_id) == self.client.user_id());
 483        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 484
 485        let pending_participant_user_ids = room
 486            .pending_participants
 487            .iter()
 488            .map(|p| p.user_id)
 489            .collect::<Vec<_>>();
 490        let remote_participant_user_ids = room
 491            .participants
 492            .iter()
 493            .map(|p| p.user_id)
 494            .collect::<Vec<_>>();
 495        let (remote_participants, pending_participants) =
 496            self.user_store.update(cx, move |user_store, cx| {
 497                (
 498                    user_store.get_users(remote_participant_user_ids, cx),
 499                    user_store.get_users(pending_participant_user_ids, cx),
 500                )
 501            });
 502        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 503            let (remote_participants, pending_participants) =
 504                futures::join!(remote_participants, pending_participants);
 505
 506            this.update(&mut cx, |this, cx| {
 507                this.participant_user_ids.clear();
 508
 509                if let Some(participant) = local_participant {
 510                    this.local_participant.projects = participant.projects;
 511                } else {
 512                    this.local_participant.projects.clear();
 513                }
 514
 515                if let Some(participants) = remote_participants.log_err() {
 516                    for (participant, user) in room.participants.into_iter().zip(participants) {
 517                        let Some(peer_id) = participant.peer_id else { continue };
 518                        this.participant_user_ids.insert(participant.user_id);
 519
 520                        let old_projects = this
 521                            .remote_participants
 522                            .get(&participant.user_id)
 523                            .into_iter()
 524                            .flat_map(|existing| &existing.projects)
 525                            .map(|project| project.id)
 526                            .collect::<HashSet<_>>();
 527                        let new_projects = participant
 528                            .projects
 529                            .iter()
 530                            .map(|project| project.id)
 531                            .collect::<HashSet<_>>();
 532
 533                        for project in &participant.projects {
 534                            if !old_projects.contains(&project.id) {
 535                                cx.emit(Event::RemoteProjectShared {
 536                                    owner: user.clone(),
 537                                    project_id: project.id,
 538                                    worktree_root_names: project.worktree_root_names.clone(),
 539                                });
 540                            }
 541                        }
 542
 543                        for unshared_project_id in old_projects.difference(&new_projects) {
 544                            this.joined_projects.retain(|project| {
 545                                if let Some(project) = project.upgrade(cx) {
 546                                    project.update(cx, |project, cx| {
 547                                        if project.remote_id() == Some(*unshared_project_id) {
 548                                            project.disconnected_from_host(cx);
 549                                            false
 550                                        } else {
 551                                            true
 552                                        }
 553                                    })
 554                                } else {
 555                                    false
 556                                }
 557                            });
 558                            cx.emit(Event::RemoteProjectUnshared {
 559                                project_id: *unshared_project_id,
 560                            });
 561                        }
 562
 563                        let location = ParticipantLocation::from_proto(participant.location)
 564                            .unwrap_or(ParticipantLocation::External);
 565                        if let Some(remote_participant) =
 566                            this.remote_participants.get_mut(&participant.user_id)
 567                        {
 568                            remote_participant.projects = participant.projects;
 569                            remote_participant.peer_id = peer_id;
 570                            if location != remote_participant.location {
 571                                remote_participant.location = location;
 572                                cx.emit(Event::ParticipantLocationChanged {
 573                                    participant_id: peer_id,
 574                                });
 575                            }
 576                        } else {
 577                            this.remote_participants.insert(
 578                                participant.user_id,
 579                                RemoteParticipant {
 580                                    user: user.clone(),
 581                                    peer_id,
 582                                    projects: participant.projects,
 583                                    location,
 584                                    tracks: Default::default(),
 585                                },
 586                            );
 587
 588                            if let Some(live_kit) = this.live_kit.as_ref() {
 589                                let tracks =
 590                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
 591                                for track in tracks {
 592                                    this.remote_video_track_updated(
 593                                        RemoteVideoTrackUpdate::Subscribed(track),
 594                                        cx,
 595                                    )
 596                                    .log_err();
 597                                }
 598                            }
 599                        }
 600                    }
 601
 602                    this.remote_participants.retain(|user_id, participant| {
 603                        if this.participant_user_ids.contains(user_id) {
 604                            true
 605                        } else {
 606                            for project in &participant.projects {
 607                                cx.emit(Event::RemoteProjectUnshared {
 608                                    project_id: project.id,
 609                                });
 610                            }
 611                            false
 612                        }
 613                    });
 614                }
 615
 616                if let Some(pending_participants) = pending_participants.log_err() {
 617                    this.pending_participants = pending_participants;
 618                    for participant in &this.pending_participants {
 619                        this.participant_user_ids.insert(participant.id);
 620                    }
 621                }
 622
 623                this.pending_room_update.take();
 624                if this.should_leave() {
 625                    log::info!("room is empty, leaving");
 626                    let _ = this.leave(cx);
 627                }
 628
 629                this.check_invariants();
 630                cx.notify();
 631            });
 632        }));
 633
 634        cx.notify();
 635        Ok(())
 636    }
 637
 638    fn remote_video_track_updated(
 639        &mut self,
 640        change: RemoteVideoTrackUpdate,
 641        cx: &mut ModelContext<Self>,
 642    ) -> Result<()> {
 643        match change {
 644            RemoteVideoTrackUpdate::Subscribed(track) => {
 645                let user_id = track.publisher_id().parse()?;
 646                let track_id = track.sid().to_string();
 647                let participant = self
 648                    .remote_participants
 649                    .get_mut(&user_id)
 650                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 651                participant.tracks.insert(
 652                    track_id.clone(),
 653                    Arc::new(RemoteVideoTrack {
 654                        live_kit_track: track,
 655                    }),
 656                );
 657                cx.emit(Event::RemoteVideoTracksChanged {
 658                    participant_id: participant.peer_id,
 659                });
 660            }
 661            RemoteVideoTrackUpdate::Unsubscribed {
 662                publisher_id,
 663                track_id,
 664            } => {
 665                let user_id = publisher_id.parse()?;
 666                let participant = self
 667                    .remote_participants
 668                    .get_mut(&user_id)
 669                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 670                participant.tracks.remove(&track_id);
 671                cx.emit(Event::RemoteVideoTracksChanged {
 672                    participant_id: participant.peer_id,
 673                });
 674            }
 675        }
 676
 677        cx.notify();
 678        Ok(())
 679    }
 680
 681    fn check_invariants(&self) {
 682        #[cfg(any(test, feature = "test-support"))]
 683        {
 684            for participant in self.remote_participants.values() {
 685                assert!(self.participant_user_ids.contains(&participant.user.id));
 686                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 687            }
 688
 689            for participant in &self.pending_participants {
 690                assert!(self.participant_user_ids.contains(&participant.id));
 691                assert_ne!(participant.id, self.client.user_id().unwrap());
 692            }
 693
 694            assert_eq!(
 695                self.participant_user_ids.len(),
 696                self.remote_participants.len() + self.pending_participants.len()
 697            );
 698        }
 699    }
 700
 701    pub(crate) fn call(
 702        &mut self,
 703        called_user_id: u64,
 704        initial_project_id: Option<u64>,
 705        cx: &mut ModelContext<Self>,
 706    ) -> Task<Result<()>> {
 707        if self.status.is_offline() {
 708            return Task::ready(Err(anyhow!("room is offline")));
 709        }
 710
 711        cx.notify();
 712        let client = self.client.clone();
 713        let room_id = self.id;
 714        self.pending_call_count += 1;
 715        cx.spawn(|this, mut cx| async move {
 716            let result = client
 717                .request(proto::Call {
 718                    room_id,
 719                    called_user_id,
 720                    initial_project_id,
 721                })
 722                .await;
 723            this.update(&mut cx, |this, cx| {
 724                this.pending_call_count -= 1;
 725                if this.should_leave() {
 726                    this.leave(cx)?;
 727                }
 728                result
 729            })?;
 730            Ok(())
 731        })
 732    }
 733
 734    pub fn join_project(
 735        &mut self,
 736        id: u64,
 737        language_registry: Arc<LanguageRegistry>,
 738        fs: Arc<dyn Fs>,
 739        cx: &mut ModelContext<Self>,
 740    ) -> Task<Result<ModelHandle<Project>>> {
 741        let client = self.client.clone();
 742        let user_store = self.user_store.clone();
 743        cx.spawn(|this, mut cx| async move {
 744            let project =
 745                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 746            this.update(&mut cx, |this, cx| {
 747                this.joined_projects.retain(|project| {
 748                    if let Some(project) = project.upgrade(cx) {
 749                        !project.read(cx).is_read_only()
 750                    } else {
 751                        false
 752                    }
 753                });
 754                this.joined_projects.insert(project.downgrade());
 755            });
 756            Ok(project)
 757        })
 758    }
 759
 760    pub(crate) fn share_project(
 761        &mut self,
 762        project: ModelHandle<Project>,
 763        cx: &mut ModelContext<Self>,
 764    ) -> Task<Result<u64>> {
 765        if let Some(project_id) = project.read(cx).remote_id() {
 766            return Task::ready(Ok(project_id));
 767        }
 768
 769        let request = self.client.request(proto::ShareProject {
 770            room_id: self.id(),
 771            worktrees: project.read(cx).worktree_metadata_protos(cx),
 772        });
 773        cx.spawn(|this, mut cx| async move {
 774            let response = request.await?;
 775
 776            project.update(&mut cx, |project, cx| {
 777                project.shared(response.project_id, cx)
 778            })?;
 779
 780            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 781            this.update(&mut cx, |this, cx| {
 782                this.shared_projects.insert(project.downgrade());
 783                let active_project = this.local_participant.active_project.as_ref();
 784                if active_project.map_or(false, |location| *location == project) {
 785                    this.set_location(Some(&project), cx)
 786                } else {
 787                    Task::ready(Ok(()))
 788                }
 789            })
 790            .await?;
 791
 792            Ok(response.project_id)
 793        })
 794    }
 795
 796    pub(crate) fn unshare_project(
 797        &mut self,
 798        project: ModelHandle<Project>,
 799        cx: &mut ModelContext<Self>,
 800    ) -> Result<()> {
 801        let project_id = match project.read(cx).remote_id() {
 802            Some(project_id) => project_id,
 803            None => return Ok(()),
 804        };
 805
 806        self.client.send(proto::UnshareProject { project_id })
 807    }
 808
 809    pub(crate) fn set_location(
 810        &mut self,
 811        project: Option<&ModelHandle<Project>>,
 812        cx: &mut ModelContext<Self>,
 813    ) -> Task<Result<()>> {
 814        if self.status.is_offline() {
 815            return Task::ready(Err(anyhow!("room is offline")));
 816        }
 817
 818        let client = self.client.clone();
 819        let room_id = self.id;
 820        let location = if let Some(project) = project {
 821            self.local_participant.active_project = Some(project.downgrade());
 822            if let Some(project_id) = project.read(cx).remote_id() {
 823                proto::participant_location::Variant::SharedProject(
 824                    proto::participant_location::SharedProject { id: project_id },
 825                )
 826            } else {
 827                proto::participant_location::Variant::UnsharedProject(
 828                    proto::participant_location::UnsharedProject {},
 829                )
 830            }
 831        } else {
 832            self.local_participant.active_project = None;
 833            proto::participant_location::Variant::External(proto::participant_location::External {})
 834        };
 835
 836        cx.notify();
 837        cx.foreground().spawn(async move {
 838            client
 839                .request(proto::UpdateParticipantLocation {
 840                    room_id,
 841                    location: Some(proto::ParticipantLocation {
 842                        variant: Some(location),
 843                    }),
 844                })
 845                .await?;
 846            Ok(())
 847        })
 848    }
 849
 850    pub fn is_screen_sharing(&self) -> bool {
 851        self.live_kit.as_ref().map_or(false, |live_kit| {
 852            !matches!(live_kit.screen_track, ScreenTrack::None)
 853        })
 854    }
 855
 856    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 857        if self.status.is_offline() {
 858            return Task::ready(Err(anyhow!("room is offline")));
 859        } else if self.is_screen_sharing() {
 860            return Task::ready(Err(anyhow!("screen was already shared")));
 861        }
 862
 863        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 864            let publish_id = post_inc(&mut live_kit.next_publish_id);
 865            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 866            cx.notify();
 867            (live_kit.room.display_sources(), publish_id)
 868        } else {
 869            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 870        };
 871
 872        cx.spawn_weak(|this, mut cx| async move {
 873            let publish_track = async {
 874                let displays = displays.await?;
 875                let display = displays
 876                    .first()
 877                    .ok_or_else(|| anyhow!("no display found"))?;
 878                let track = LocalVideoTrack::screen_share_for_display(&display);
 879                this.upgrade(&cx)
 880                    .ok_or_else(|| anyhow!("room was dropped"))?
 881                    .read_with(&cx, |this, _| {
 882                        this.live_kit
 883                            .as_ref()
 884                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 885                    })
 886                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 887                    .await
 888            };
 889
 890            let publication = publish_track.await;
 891            this.upgrade(&cx)
 892                .ok_or_else(|| anyhow!("room was dropped"))?
 893                .update(&mut cx, |this, cx| {
 894                    let live_kit = this
 895                        .live_kit
 896                        .as_mut()
 897                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 898
 899                    let canceled = if let ScreenTrack::Pending {
 900                        publish_id: cur_publish_id,
 901                    } = &live_kit.screen_track
 902                    {
 903                        *cur_publish_id != publish_id
 904                    } else {
 905                        true
 906                    };
 907
 908                    match publication {
 909                        Ok(publication) => {
 910                            if canceled {
 911                                live_kit.room.unpublish_track(publication);
 912                            } else {
 913                                live_kit.screen_track = ScreenTrack::Published(publication);
 914                                cx.notify();
 915                            }
 916                            Ok(())
 917                        }
 918                        Err(error) => {
 919                            if canceled {
 920                                Ok(())
 921                            } else {
 922                                live_kit.screen_track = ScreenTrack::None;
 923                                cx.notify();
 924                                Err(error)
 925                            }
 926                        }
 927                    }
 928                })
 929        })
 930    }
 931
 932    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 933        if self.status.is_offline() {
 934            return Err(anyhow!("room is offline"));
 935        }
 936
 937        let live_kit = self
 938            .live_kit
 939            .as_mut()
 940            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 941        match mem::take(&mut live_kit.screen_track) {
 942            ScreenTrack::None => Err(anyhow!("screen was not shared")),
 943            ScreenTrack::Pending { .. } => {
 944                cx.notify();
 945                Ok(())
 946            }
 947            ScreenTrack::Published(track) => {
 948                live_kit.room.unpublish_track(track);
 949                cx.notify();
 950                Ok(())
 951            }
 952        }
 953    }
 954
 955    #[cfg(any(test, feature = "test-support"))]
 956    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
 957        self.live_kit
 958            .as_ref()
 959            .unwrap()
 960            .room
 961            .set_display_sources(sources);
 962    }
 963}
 964
 965struct LiveKitRoom {
 966    room: Arc<live_kit_client::Room>,
 967    screen_track: ScreenTrack,
 968    next_publish_id: usize,
 969    _maintain_room: Task<()>,
 970    _maintain_tracks: Task<()>,
 971}
 972
 973enum ScreenTrack {
 974    None,
 975    Pending { publish_id: usize },
 976    Published(LocalTrackPublication),
 977}
 978
 979impl Default for ScreenTrack {
 980    fn default() -> Self {
 981        Self::None
 982    }
 983}
 984
 985#[derive(Copy, Clone, PartialEq, Eq)]
 986pub enum RoomStatus {
 987    Online,
 988    Rejoining,
 989    Offline,
 990}
 991
 992impl RoomStatus {
 993    pub fn is_offline(&self) -> bool {
 994        matches!(self, RoomStatus::Offline)
 995    }
 996
 997    pub fn is_online(&self) -> bool {
 998        matches!(self, RoomStatus::Online)
 999    }
1000}