room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{
  14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  15};
  16use language::LanguageRegistry;
  17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  18use postage::stream::Stream;
  19use project::Project;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    ParticipantLocationChanged {
  28        participant_id: proto::PeerId,
  29    },
  30    RemoteVideoTracksChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteProjectShared {
  34        owner: Arc<User>,
  35        project_id: u64,
  36        worktree_root_names: Vec<String>,
  37    },
  38    RemoteProjectUnshared {
  39        project_id: u64,
  40    },
  41    Left,
  42}
  43
  44pub struct Room {
  45    id: u64,
  46    live_kit: Option<LiveKitRoom>,
  47    status: RoomStatus,
  48    shared_projects: HashSet<WeakModelHandle<Project>>,
  49    joined_projects: HashSet<WeakModelHandle<Project>>,
  50    local_participant: LocalParticipant,
  51    remote_participants: BTreeMap<u64, RemoteParticipant>,
  52    pending_participants: Vec<Arc<User>>,
  53    participant_user_ids: HashSet<u64>,
  54    pending_call_count: usize,
  55    leave_when_empty: bool,
  56    client: Arc<Client>,
  57    user_store: ModelHandle<UserStore>,
  58    follows_by_leader_id: HashMap<PeerId, Vec<PeerId>>,
  59    subscriptions: Vec<client::Subscription>,
  60    pending_room_update: Option<Task<()>>,
  61    maintain_connection: Option<Task<Option<()>>>,
  62}
  63
  64impl Entity for Room {
  65    type Event = Event;
  66
  67    fn release(&mut self, _: &mut MutableAppContext) {
  68        if self.status.is_online() {
  69            log::info!("room was released, sending leave message");
  70            let _ = self.client.send(proto::LeaveRoom {});
  71        }
  72    }
  73}
  74
  75impl Room {
  76    fn new(
  77        id: u64,
  78        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  79        client: Arc<Client>,
  80        user_store: ModelHandle<UserStore>,
  81        cx: &mut ModelContext<Self>,
  82    ) -> Self {
  83        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
  84            let room = live_kit_client::Room::new();
  85            let mut status = room.status();
  86            // Consume the initial status of the room.
  87            let _ = status.try_recv();
  88            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
  89                while let Some(status) = status.next().await {
  90                    let this = if let Some(this) = this.upgrade(&cx) {
  91                        this
  92                    } else {
  93                        break;
  94                    };
  95
  96                    if status == live_kit_client::ConnectionState::Disconnected {
  97                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
  98                        break;
  99                    }
 100                }
 101            });
 102
 103            let mut track_changes = room.remote_video_track_updates();
 104            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 105                while let Some(track_change) = track_changes.next().await {
 106                    let this = if let Some(this) = this.upgrade(&cx) {
 107                        this
 108                    } else {
 109                        break;
 110                    };
 111
 112                    this.update(&mut cx, |this, cx| {
 113                        this.remote_video_track_updated(track_change, cx).log_err()
 114                    });
 115                }
 116            });
 117
 118            cx.foreground()
 119                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 120                .detach_and_log_err(cx);
 121
 122            Some(LiveKitRoom {
 123                room,
 124                screen_track: ScreenTrack::None,
 125                next_publish_id: 0,
 126                _maintain_room,
 127                _maintain_tracks,
 128            })
 129        } else {
 130            None
 131        };
 132
 133        let maintain_connection =
 134            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 135
 136        Self {
 137            id,
 138            live_kit: live_kit_room,
 139            status: RoomStatus::Online,
 140            shared_projects: Default::default(),
 141            joined_projects: Default::default(),
 142            participant_user_ids: Default::default(),
 143            local_participant: Default::default(),
 144            remote_participants: Default::default(),
 145            pending_participants: Default::default(),
 146            pending_call_count: 0,
 147            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 148            leave_when_empty: false,
 149            pending_room_update: None,
 150            client,
 151            user_store,
 152            follows_by_leader_id: Default::default(),
 153            maintain_connection: Some(maintain_connection),
 154        }
 155    }
 156
 157    pub(crate) fn create(
 158        called_user_id: u64,
 159        initial_project: Option<ModelHandle<Project>>,
 160        client: Arc<Client>,
 161        user_store: ModelHandle<UserStore>,
 162        cx: &mut MutableAppContext,
 163    ) -> Task<Result<ModelHandle<Self>>> {
 164        cx.spawn(|mut cx| async move {
 165            let response = client.request(proto::CreateRoom {}).await?;
 166            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 167            let room = cx.add_model(|cx| {
 168                Self::new(
 169                    room_proto.id,
 170                    response.live_kit_connection_info,
 171                    client,
 172                    user_store,
 173                    cx,
 174                )
 175            });
 176
 177            let initial_project_id = if let Some(initial_project) = initial_project {
 178                let initial_project_id = room
 179                    .update(&mut cx, |room, cx| {
 180                        room.share_project(initial_project.clone(), cx)
 181                    })
 182                    .await?;
 183                Some(initial_project_id)
 184            } else {
 185                None
 186            };
 187
 188            match room
 189                .update(&mut cx, |room, cx| {
 190                    room.leave_when_empty = true;
 191                    room.call(called_user_id, initial_project_id, cx)
 192                })
 193                .await
 194            {
 195                Ok(()) => Ok(room),
 196                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 197            }
 198        })
 199    }
 200
 201    pub(crate) fn join(
 202        call: &IncomingCall,
 203        client: Arc<Client>,
 204        user_store: ModelHandle<UserStore>,
 205        cx: &mut MutableAppContext,
 206    ) -> Task<Result<ModelHandle<Self>>> {
 207        let room_id = call.room_id;
 208        cx.spawn(|mut cx| async move {
 209            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 210            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 211            let room = cx.add_model(|cx| {
 212                Self::new(
 213                    room_id,
 214                    response.live_kit_connection_info,
 215                    client,
 216                    user_store,
 217                    cx,
 218                )
 219            });
 220            room.update(&mut cx, |room, cx| {
 221                room.leave_when_empty = true;
 222                room.apply_room_update(room_proto, cx)?;
 223                anyhow::Ok(())
 224            })?;
 225            Ok(room)
 226        })
 227    }
 228
 229    fn should_leave(&self) -> bool {
 230        self.leave_when_empty
 231            && self.pending_room_update.is_none()
 232            && self.pending_participants.is_empty()
 233            && self.remote_participants.is_empty()
 234            && self.pending_call_count == 0
 235    }
 236
 237    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 238        if self.status.is_offline() {
 239            return Err(anyhow!("room is offline"));
 240        }
 241
 242        cx.notify();
 243        cx.emit(Event::Left);
 244        log::info!("leaving room");
 245
 246        for project in self.shared_projects.drain() {
 247            if let Some(project) = project.upgrade(cx) {
 248                project.update(cx, |project, cx| {
 249                    project.unshare(cx).log_err();
 250                });
 251            }
 252        }
 253        for project in self.joined_projects.drain() {
 254            if let Some(project) = project.upgrade(cx) {
 255                project.update(cx, |project, cx| {
 256                    project.disconnected_from_host(cx);
 257                });
 258            }
 259        }
 260
 261        self.status = RoomStatus::Offline;
 262        self.remote_participants.clear();
 263        self.pending_participants.clear();
 264        self.participant_user_ids.clear();
 265        self.subscriptions.clear();
 266        self.live_kit.take();
 267        self.pending_room_update.take();
 268        self.maintain_connection.take();
 269        self.client.send(proto::LeaveRoom {})?;
 270        Ok(())
 271    }
 272
 273    async fn maintain_connection(
 274        this: WeakModelHandle<Self>,
 275        client: Arc<Client>,
 276        mut cx: AsyncAppContext,
 277    ) -> Result<()> {
 278        let mut client_status = client.status();
 279        loop {
 280            let is_connected = client_status
 281                .next()
 282                .await
 283                .map_or(false, |s| s.is_connected());
 284
 285            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 286            if !is_connected || client_status.next().await.is_some() {
 287                log::info!("detected client disconnection");
 288                this.upgrade(&cx)
 289                    .ok_or_else(|| anyhow!("room was dropped"))?
 290                    .update(&mut cx, |this, cx| {
 291                        this.status = RoomStatus::Rejoining;
 292                        cx.notify();
 293                    });
 294
 295                // Wait for client to re-establish a connection to the server.
 296                {
 297                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 298                    let client_reconnection = async {
 299                        let mut remaining_attempts = 3;
 300                        while remaining_attempts > 0 {
 301                            log::info!(
 302                                "waiting for client status change, remaining attempts {}",
 303                                remaining_attempts
 304                            );
 305                            let Some(status) = client_status.next().await else { break };
 306                            if status.is_connected() {
 307                                log::info!("client reconnected, attempting to rejoin room");
 308
 309                                let Some(this) = this.upgrade(&cx) else { break };
 310                                if this
 311                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 312                                    .await
 313                                    .log_err()
 314                                    .is_some()
 315                                {
 316                                    return true;
 317                                } else {
 318                                    remaining_attempts -= 1;
 319                                }
 320                            }
 321                        }
 322                        false
 323                    }
 324                    .fuse();
 325                    futures::pin_mut!(client_reconnection);
 326
 327                    futures::select_biased! {
 328                        reconnected = client_reconnection => {
 329                            if reconnected {
 330                                log::info!("successfully reconnected to room");
 331                                // If we successfully joined the room, go back around the loop
 332                                // waiting for future connection status changes.
 333                                continue;
 334                            }
 335                        }
 336                        _ = reconnection_timeout => {
 337                            log::info!("room reconnection timeout expired");
 338                        }
 339                    }
 340                }
 341
 342                // The client failed to re-establish a connection to the server
 343                // or an error occurred while trying to re-join the room. Either way
 344                // we leave the room and return an error.
 345                if let Some(this) = this.upgrade(&cx) {
 346                    log::info!("reconnection failed, leaving room");
 347                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 348                }
 349                return Err(anyhow!(
 350                    "can't reconnect to room: client failed to re-establish connection"
 351                ));
 352            }
 353        }
 354    }
 355
 356    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 357        let mut projects = HashMap::default();
 358        let mut reshared_projects = Vec::new();
 359        let mut rejoined_projects = Vec::new();
 360        self.shared_projects.retain(|project| {
 361            if let Some(handle) = project.upgrade(cx) {
 362                let project = handle.read(cx);
 363                if let Some(project_id) = project.remote_id() {
 364                    projects.insert(project_id, handle.clone());
 365                    reshared_projects.push(proto::UpdateProject {
 366                        project_id,
 367                        worktrees: project.worktree_metadata_protos(cx),
 368                    });
 369                    return true;
 370                }
 371            }
 372            false
 373        });
 374        self.joined_projects.retain(|project| {
 375            if let Some(handle) = project.upgrade(cx) {
 376                let project = handle.read(cx);
 377                if let Some(project_id) = project.remote_id() {
 378                    projects.insert(project_id, handle.clone());
 379                    rejoined_projects.push(proto::RejoinProject {
 380                        id: project_id,
 381                        worktrees: project
 382                            .worktrees(cx)
 383                            .map(|worktree| {
 384                                let worktree = worktree.read(cx);
 385                                proto::RejoinWorktree {
 386                                    id: worktree.id().to_proto(),
 387                                    scan_id: worktree.completed_scan_id() as u64,
 388                                }
 389                            })
 390                            .collect(),
 391                    });
 392                }
 393                return true;
 394            }
 395            false
 396        });
 397
 398        let response = self.client.request(proto::RejoinRoom {
 399            id: self.id,
 400            reshared_projects,
 401            rejoined_projects,
 402        });
 403
 404        cx.spawn(|this, mut cx| async move {
 405            let response = response.await?;
 406            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 407            this.update(&mut cx, |this, cx| {
 408                this.status = RoomStatus::Online;
 409                this.apply_room_update(room_proto, cx)?;
 410
 411                for reshared_project in response.reshared_projects {
 412                    if let Some(project) = projects.get(&reshared_project.id) {
 413                        project.update(cx, |project, cx| {
 414                            project.reshared(reshared_project, cx).log_err();
 415                        });
 416                    }
 417                }
 418
 419                for rejoined_project in response.rejoined_projects {
 420                    if let Some(project) = projects.get(&rejoined_project.id) {
 421                        project.update(cx, |project, cx| {
 422                            project.rejoined(rejoined_project, cx).log_err();
 423                        });
 424                    }
 425                }
 426
 427                anyhow::Ok(())
 428            })
 429        })
 430    }
 431
 432    pub fn id(&self) -> u64 {
 433        self.id
 434    }
 435
 436    pub fn status(&self) -> RoomStatus {
 437        self.status
 438    }
 439
 440    pub fn local_participant(&self) -> &LocalParticipant {
 441        &self.local_participant
 442    }
 443
 444    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 445        &self.remote_participants
 446    }
 447
 448    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 449        self.remote_participants
 450            .values()
 451            .find(|p| p.peer_id == peer_id)
 452    }
 453
 454    pub fn pending_participants(&self) -> &[Arc<User>] {
 455        &self.pending_participants
 456    }
 457
 458    pub fn contains_participant(&self, user_id: u64) -> bool {
 459        self.participant_user_ids.contains(&user_id)
 460    }
 461
 462    pub fn followers_for(&self, leader_id: PeerId) -> &[PeerId] {
 463        self.follows_by_leader_id
 464            .get(&leader_id)
 465            .map_or(&[], |v| v.as_slice())
 466    }
 467
 468    async fn handle_room_updated(
 469        this: ModelHandle<Self>,
 470        envelope: TypedEnvelope<proto::RoomUpdated>,
 471        _: Arc<Client>,
 472        mut cx: AsyncAppContext,
 473    ) -> Result<()> {
 474        let room = envelope
 475            .payload
 476            .room
 477            .ok_or_else(|| anyhow!("invalid room"))?;
 478        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 479    }
 480
 481    fn apply_room_update(
 482        &mut self,
 483        mut room: proto::Room,
 484        cx: &mut ModelContext<Self>,
 485    ) -> Result<()> {
 486        // Filter ourselves out from the room's participants.
 487        let local_participant_ix = room
 488            .participants
 489            .iter()
 490            .position(|participant| Some(participant.user_id) == self.client.user_id());
 491        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 492
 493        let pending_participant_user_ids = room
 494            .pending_participants
 495            .iter()
 496            .map(|p| p.user_id)
 497            .collect::<Vec<_>>();
 498
 499        let remote_participant_user_ids = room
 500            .participants
 501            .iter()
 502            .map(|p| p.user_id)
 503            .collect::<Vec<_>>();
 504
 505        let (remote_participants, pending_participants) =
 506            self.user_store.update(cx, move |user_store, cx| {
 507                (
 508                    user_store.get_users(remote_participant_user_ids, cx),
 509                    user_store.get_users(pending_participant_user_ids, cx),
 510                )
 511            });
 512
 513        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 514            let (remote_participants, pending_participants) =
 515                futures::join!(remote_participants, pending_participants);
 516
 517            this.update(&mut cx, |this, cx| {
 518                this.participant_user_ids.clear();
 519
 520                if let Some(participant) = local_participant {
 521                    this.local_participant.projects = participant.projects;
 522                } else {
 523                    this.local_participant.projects.clear();
 524                }
 525
 526                if let Some(participants) = remote_participants.log_err() {
 527                    for (participant, user) in room.participants.into_iter().zip(participants) {
 528                        let Some(peer_id) = participant.peer_id else { continue };
 529                        this.participant_user_ids.insert(participant.user_id);
 530
 531                        let old_projects = this
 532                            .remote_participants
 533                            .get(&participant.user_id)
 534                            .into_iter()
 535                            .flat_map(|existing| &existing.projects)
 536                            .map(|project| project.id)
 537                            .collect::<HashSet<_>>();
 538                        let new_projects = participant
 539                            .projects
 540                            .iter()
 541                            .map(|project| project.id)
 542                            .collect::<HashSet<_>>();
 543
 544                        for project in &participant.projects {
 545                            if !old_projects.contains(&project.id) {
 546                                cx.emit(Event::RemoteProjectShared {
 547                                    owner: user.clone(),
 548                                    project_id: project.id,
 549                                    worktree_root_names: project.worktree_root_names.clone(),
 550                                });
 551                            }
 552                        }
 553
 554                        for unshared_project_id in old_projects.difference(&new_projects) {
 555                            this.joined_projects.retain(|project| {
 556                                if let Some(project) = project.upgrade(cx) {
 557                                    project.update(cx, |project, cx| {
 558                                        if project.remote_id() == Some(*unshared_project_id) {
 559                                            project.disconnected_from_host(cx);
 560                                            false
 561                                        } else {
 562                                            true
 563                                        }
 564                                    })
 565                                } else {
 566                                    false
 567                                }
 568                            });
 569                            cx.emit(Event::RemoteProjectUnshared {
 570                                project_id: *unshared_project_id,
 571                            });
 572                        }
 573
 574                        let location = ParticipantLocation::from_proto(participant.location)
 575                            .unwrap_or(ParticipantLocation::External);
 576                        if let Some(remote_participant) =
 577                            this.remote_participants.get_mut(&participant.user_id)
 578                        {
 579                            remote_participant.projects = participant.projects;
 580                            remote_participant.peer_id = peer_id;
 581                            if location != remote_participant.location {
 582                                remote_participant.location = location;
 583                                cx.emit(Event::ParticipantLocationChanged {
 584                                    participant_id: peer_id,
 585                                });
 586                            }
 587                        } else {
 588                            this.remote_participants.insert(
 589                                participant.user_id,
 590                                RemoteParticipant {
 591                                    user: user.clone(),
 592                                    peer_id,
 593                                    projects: participant.projects,
 594                                    location,
 595                                    tracks: Default::default(),
 596                                },
 597                            );
 598
 599                            if let Some(live_kit) = this.live_kit.as_ref() {
 600                                let tracks =
 601                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
 602                                for track in tracks {
 603                                    this.remote_video_track_updated(
 604                                        RemoteVideoTrackUpdate::Subscribed(track),
 605                                        cx,
 606                                    )
 607                                    .log_err();
 608                                }
 609                            }
 610                        }
 611                    }
 612
 613                    this.remote_participants.retain(|user_id, participant| {
 614                        if this.participant_user_ids.contains(user_id) {
 615                            true
 616                        } else {
 617                            for project in &participant.projects {
 618                                cx.emit(Event::RemoteProjectUnshared {
 619                                    project_id: project.id,
 620                                });
 621                            }
 622                            false
 623                        }
 624                    });
 625                }
 626
 627                if let Some(pending_participants) = pending_participants.log_err() {
 628                    this.pending_participants = pending_participants;
 629                    for participant in &this.pending_participants {
 630                        this.participant_user_ids.insert(participant.id);
 631                    }
 632                }
 633
 634                this.follows_by_leader_id.clear();
 635                for follower in room.followers {
 636                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 637                        (Some(leader), Some(follower)) => (leader, follower),
 638
 639                        _ => {
 640                            log::error!("Follower message {follower:?} missing some state");
 641                            continue;
 642                        }
 643                    };
 644
 645                    let list = this
 646                        .follows_by_leader_id
 647                        .entry(leader)
 648                        .or_insert(Vec::new());
 649                    if !list.contains(&follower) {
 650                        list.push(follower);
 651                    }
 652                }
 653
 654                this.pending_room_update.take();
 655                if this.should_leave() {
 656                    log::info!("room is empty, leaving");
 657                    let _ = this.leave(cx);
 658                }
 659
 660                this.check_invariants();
 661                cx.notify();
 662            });
 663        }));
 664
 665        cx.notify();
 666        Ok(())
 667    }
 668
 669    fn remote_video_track_updated(
 670        &mut self,
 671        change: RemoteVideoTrackUpdate,
 672        cx: &mut ModelContext<Self>,
 673    ) -> Result<()> {
 674        match change {
 675            RemoteVideoTrackUpdate::Subscribed(track) => {
 676                let user_id = track.publisher_id().parse()?;
 677                let track_id = track.sid().to_string();
 678                let participant = self
 679                    .remote_participants
 680                    .get_mut(&user_id)
 681                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 682                participant.tracks.insert(
 683                    track_id.clone(),
 684                    Arc::new(RemoteVideoTrack {
 685                        live_kit_track: track,
 686                    }),
 687                );
 688                cx.emit(Event::RemoteVideoTracksChanged {
 689                    participant_id: participant.peer_id,
 690                });
 691            }
 692            RemoteVideoTrackUpdate::Unsubscribed {
 693                publisher_id,
 694                track_id,
 695            } => {
 696                let user_id = publisher_id.parse()?;
 697                let participant = self
 698                    .remote_participants
 699                    .get_mut(&user_id)
 700                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 701                participant.tracks.remove(&track_id);
 702                cx.emit(Event::RemoteVideoTracksChanged {
 703                    participant_id: participant.peer_id,
 704                });
 705            }
 706        }
 707
 708        cx.notify();
 709        Ok(())
 710    }
 711
 712    fn check_invariants(&self) {
 713        #[cfg(any(test, feature = "test-support"))]
 714        {
 715            for participant in self.remote_participants.values() {
 716                assert!(self.participant_user_ids.contains(&participant.user.id));
 717                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 718            }
 719
 720            for participant in &self.pending_participants {
 721                assert!(self.participant_user_ids.contains(&participant.id));
 722                assert_ne!(participant.id, self.client.user_id().unwrap());
 723            }
 724
 725            assert_eq!(
 726                self.participant_user_ids.len(),
 727                self.remote_participants.len() + self.pending_participants.len()
 728            );
 729        }
 730    }
 731
 732    pub(crate) fn call(
 733        &mut self,
 734        called_user_id: u64,
 735        initial_project_id: Option<u64>,
 736        cx: &mut ModelContext<Self>,
 737    ) -> Task<Result<()>> {
 738        if self.status.is_offline() {
 739            return Task::ready(Err(anyhow!("room is offline")));
 740        }
 741
 742        cx.notify();
 743        let client = self.client.clone();
 744        let room_id = self.id;
 745        self.pending_call_count += 1;
 746        cx.spawn(|this, mut cx| async move {
 747            let result = client
 748                .request(proto::Call {
 749                    room_id,
 750                    called_user_id,
 751                    initial_project_id,
 752                })
 753                .await;
 754            this.update(&mut cx, |this, cx| {
 755                this.pending_call_count -= 1;
 756                if this.should_leave() {
 757                    this.leave(cx)?;
 758                }
 759                result
 760            })?;
 761            Ok(())
 762        })
 763    }
 764
 765    pub fn join_project(
 766        &mut self,
 767        id: u64,
 768        language_registry: Arc<LanguageRegistry>,
 769        fs: Arc<dyn Fs>,
 770        cx: &mut ModelContext<Self>,
 771    ) -> Task<Result<ModelHandle<Project>>> {
 772        let client = self.client.clone();
 773        let user_store = self.user_store.clone();
 774        cx.spawn(|this, mut cx| async move {
 775            let project =
 776                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 777            this.update(&mut cx, |this, cx| {
 778                this.joined_projects.retain(|project| {
 779                    if let Some(project) = project.upgrade(cx) {
 780                        !project.read(cx).is_read_only()
 781                    } else {
 782                        false
 783                    }
 784                });
 785                this.joined_projects.insert(project.downgrade());
 786            });
 787            Ok(project)
 788        })
 789    }
 790
 791    pub(crate) fn share_project(
 792        &mut self,
 793        project: ModelHandle<Project>,
 794        cx: &mut ModelContext<Self>,
 795    ) -> Task<Result<u64>> {
 796        if let Some(project_id) = project.read(cx).remote_id() {
 797            return Task::ready(Ok(project_id));
 798        }
 799
 800        let request = self.client.request(proto::ShareProject {
 801            room_id: self.id(),
 802            worktrees: project.read(cx).worktree_metadata_protos(cx),
 803        });
 804        cx.spawn(|this, mut cx| async move {
 805            let response = request.await?;
 806
 807            project.update(&mut cx, |project, cx| {
 808                project.shared(response.project_id, cx)
 809            })?;
 810
 811            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 812            this.update(&mut cx, |this, cx| {
 813                this.shared_projects.insert(project.downgrade());
 814                let active_project = this.local_participant.active_project.as_ref();
 815                if active_project.map_or(false, |location| *location == project) {
 816                    this.set_location(Some(&project), cx)
 817                } else {
 818                    Task::ready(Ok(()))
 819                }
 820            })
 821            .await?;
 822
 823            Ok(response.project_id)
 824        })
 825    }
 826
 827    pub(crate) fn unshare_project(
 828        &mut self,
 829        project: ModelHandle<Project>,
 830        cx: &mut ModelContext<Self>,
 831    ) -> Result<()> {
 832        let project_id = match project.read(cx).remote_id() {
 833            Some(project_id) => project_id,
 834            None => return Ok(()),
 835        };
 836
 837        self.client.send(proto::UnshareProject { project_id })?;
 838        project.update(cx, |this, cx| this.unshare(cx))
 839    }
 840
 841    pub(crate) fn set_location(
 842        &mut self,
 843        project: Option<&ModelHandle<Project>>,
 844        cx: &mut ModelContext<Self>,
 845    ) -> Task<Result<()>> {
 846        if self.status.is_offline() {
 847            return Task::ready(Err(anyhow!("room is offline")));
 848        }
 849
 850        let client = self.client.clone();
 851        let room_id = self.id;
 852        let location = if let Some(project) = project {
 853            self.local_participant.active_project = Some(project.downgrade());
 854            if let Some(project_id) = project.read(cx).remote_id() {
 855                proto::participant_location::Variant::SharedProject(
 856                    proto::participant_location::SharedProject { id: project_id },
 857                )
 858            } else {
 859                proto::participant_location::Variant::UnsharedProject(
 860                    proto::participant_location::UnsharedProject {},
 861                )
 862            }
 863        } else {
 864            self.local_participant.active_project = None;
 865            proto::participant_location::Variant::External(proto::participant_location::External {})
 866        };
 867
 868        cx.notify();
 869        cx.foreground().spawn(async move {
 870            client
 871                .request(proto::UpdateParticipantLocation {
 872                    room_id,
 873                    location: Some(proto::ParticipantLocation {
 874                        variant: Some(location),
 875                    }),
 876                })
 877                .await?;
 878            Ok(())
 879        })
 880    }
 881
 882    pub fn is_screen_sharing(&self) -> bool {
 883        self.live_kit.as_ref().map_or(false, |live_kit| {
 884            !matches!(live_kit.screen_track, ScreenTrack::None)
 885        })
 886    }
 887
 888    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 889        if self.status.is_offline() {
 890            return Task::ready(Err(anyhow!("room is offline")));
 891        } else if self.is_screen_sharing() {
 892            return Task::ready(Err(anyhow!("screen was already shared")));
 893        }
 894
 895        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 896            let publish_id = post_inc(&mut live_kit.next_publish_id);
 897            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 898            cx.notify();
 899            (live_kit.room.display_sources(), publish_id)
 900        } else {
 901            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 902        };
 903
 904        cx.spawn_weak(|this, mut cx| async move {
 905            let publish_track = async {
 906                let displays = displays.await?;
 907                let display = displays
 908                    .first()
 909                    .ok_or_else(|| anyhow!("no display found"))?;
 910                let track = LocalVideoTrack::screen_share_for_display(&display);
 911                this.upgrade(&cx)
 912                    .ok_or_else(|| anyhow!("room was dropped"))?
 913                    .read_with(&cx, |this, _| {
 914                        this.live_kit
 915                            .as_ref()
 916                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 917                    })
 918                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 919                    .await
 920            };
 921
 922            let publication = publish_track.await;
 923            this.upgrade(&cx)
 924                .ok_or_else(|| anyhow!("room was dropped"))?
 925                .update(&mut cx, |this, cx| {
 926                    let live_kit = this
 927                        .live_kit
 928                        .as_mut()
 929                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 930
 931                    let canceled = if let ScreenTrack::Pending {
 932                        publish_id: cur_publish_id,
 933                    } = &live_kit.screen_track
 934                    {
 935                        *cur_publish_id != publish_id
 936                    } else {
 937                        true
 938                    };
 939
 940                    match publication {
 941                        Ok(publication) => {
 942                            if canceled {
 943                                live_kit.room.unpublish_track(publication);
 944                            } else {
 945                                live_kit.screen_track = ScreenTrack::Published(publication);
 946                                cx.notify();
 947                            }
 948                            Ok(())
 949                        }
 950                        Err(error) => {
 951                            if canceled {
 952                                Ok(())
 953                            } else {
 954                                live_kit.screen_track = ScreenTrack::None;
 955                                cx.notify();
 956                                Err(error)
 957                            }
 958                        }
 959                    }
 960                })
 961        })
 962    }
 963
 964    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 965        if self.status.is_offline() {
 966            return Err(anyhow!("room is offline"));
 967        }
 968
 969        let live_kit = self
 970            .live_kit
 971            .as_mut()
 972            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 973        match mem::take(&mut live_kit.screen_track) {
 974            ScreenTrack::None => Err(anyhow!("screen was not shared")),
 975            ScreenTrack::Pending { .. } => {
 976                cx.notify();
 977                Ok(())
 978            }
 979            ScreenTrack::Published(track) => {
 980                live_kit.room.unpublish_track(track);
 981                cx.notify();
 982                Ok(())
 983            }
 984        }
 985    }
 986
 987    #[cfg(any(test, feature = "test-support"))]
 988    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
 989        self.live_kit
 990            .as_ref()
 991            .unwrap()
 992            .room
 993            .set_display_sources(sources);
 994    }
 995}
 996
 997struct LiveKitRoom {
 998    room: Arc<live_kit_client::Room>,
 999    screen_track: ScreenTrack,
1000    next_publish_id: usize,
1001    _maintain_room: Task<()>,
1002    _maintain_tracks: Task<()>,
1003}
1004
1005enum ScreenTrack {
1006    None,
1007    Pending { publish_id: usize },
1008    Published(LocalTrackPublication),
1009}
1010
1011impl Default for ScreenTrack {
1012    fn default() -> Self {
1013        Self::None
1014    }
1015}
1016
1017#[derive(Copy, Clone, PartialEq, Eq)]
1018pub enum RoomStatus {
1019    Online,
1020    Rejoining,
1021    Offline,
1022}
1023
1024impl RoomStatus {
1025    pub fn is_offline(&self) -> bool {
1026        matches!(self, RoomStatus::Offline)
1027    }
1028
1029    pub fn is_online(&self) -> bool {
1030        matches!(self, RoomStatus::Online)
1031    }
1032}