room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{
  14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  15};
  16use language::LanguageRegistry;
  17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  18use postage::stream::Stream;
  19use project::Project;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    ParticipantLocationChanged {
  28        participant_id: proto::PeerId,
  29    },
  30    RemoteVideoTracksChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteProjectShared {
  34        owner: Arc<User>,
  35        project_id: u64,
  36        worktree_root_names: Vec<String>,
  37    },
  38    RemoteProjectUnshared {
  39        project_id: u64,
  40    },
  41    Left,
  42}
  43
  44pub struct Room {
  45    id: u64,
  46    live_kit: Option<LiveKitRoom>,
  47    status: RoomStatus,
  48    shared_projects: HashSet<WeakModelHandle<Project>>,
  49    joined_projects: HashSet<WeakModelHandle<Project>>,
  50    local_participant: LocalParticipant,
  51    remote_participants: BTreeMap<u64, RemoteParticipant>,
  52    pending_participants: Vec<Arc<User>>,
  53    participant_user_ids: HashSet<u64>,
  54    pending_call_count: usize,
  55    leave_when_empty: bool,
  56    client: Arc<Client>,
  57    user_store: ModelHandle<UserStore>,
  58    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  59    subscriptions: Vec<client::Subscription>,
  60    pending_room_update: Option<Task<()>>,
  61    maintain_connection: Option<Task<Option<()>>>,
  62}
  63
  64impl Entity for Room {
  65    type Event = Event;
  66
  67    fn release(&mut self, _: &mut MutableAppContext) {
  68        if self.status.is_online() {
  69            log::info!("room was released, sending leave message");
  70            let _ = self.client.send(proto::LeaveRoom {});
  71        }
  72    }
  73}
  74
  75impl Room {
  76    fn new(
  77        id: u64,
  78        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  79        client: Arc<Client>,
  80        user_store: ModelHandle<UserStore>,
  81        cx: &mut ModelContext<Self>,
  82    ) -> Self {
  83        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
  84            let room = live_kit_client::Room::new();
  85            let mut status = room.status();
  86            // Consume the initial status of the room.
  87            let _ = status.try_recv();
  88            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
  89                while let Some(status) = status.next().await {
  90                    let this = if let Some(this) = this.upgrade(&cx) {
  91                        this
  92                    } else {
  93                        break;
  94                    };
  95
  96                    if status == live_kit_client::ConnectionState::Disconnected {
  97                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
  98                        break;
  99                    }
 100                }
 101            });
 102
 103            let mut track_changes = room.remote_video_track_updates();
 104            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 105                while let Some(track_change) = track_changes.next().await {
 106                    let this = if let Some(this) = this.upgrade(&cx) {
 107                        this
 108                    } else {
 109                        break;
 110                    };
 111
 112                    this.update(&mut cx, |this, cx| {
 113                        this.remote_video_track_updated(track_change, cx).log_err()
 114                    });
 115                }
 116            });
 117
 118            cx.foreground()
 119                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 120                .detach_and_log_err(cx);
 121
 122            Some(LiveKitRoom {
 123                room,
 124                screen_track: ScreenTrack::None,
 125                next_publish_id: 0,
 126                _maintain_room,
 127                _maintain_tracks,
 128            })
 129        } else {
 130            None
 131        };
 132
 133        let maintain_connection =
 134            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 135
 136        Self {
 137            id,
 138            live_kit: live_kit_room,
 139            status: RoomStatus::Online,
 140            shared_projects: Default::default(),
 141            joined_projects: Default::default(),
 142            participant_user_ids: Default::default(),
 143            local_participant: Default::default(),
 144            remote_participants: Default::default(),
 145            pending_participants: Default::default(),
 146            pending_call_count: 0,
 147            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 148            leave_when_empty: false,
 149            pending_room_update: None,
 150            client,
 151            user_store,
 152            follows_by_leader_id_project_id: Default::default(),
 153            maintain_connection: Some(maintain_connection),
 154        }
 155    }
 156
 157    pub(crate) fn create(
 158        called_user_id: u64,
 159        initial_project: Option<ModelHandle<Project>>,
 160        client: Arc<Client>,
 161        user_store: ModelHandle<UserStore>,
 162        cx: &mut MutableAppContext,
 163    ) -> Task<Result<ModelHandle<Self>>> {
 164        cx.spawn(|mut cx| async move {
 165            let response = client.request(proto::CreateRoom {}).await?;
 166            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 167            let room = cx.add_model(|cx| {
 168                Self::new(
 169                    room_proto.id,
 170                    response.live_kit_connection_info,
 171                    client,
 172                    user_store,
 173                    cx,
 174                )
 175            });
 176
 177            let initial_project_id = if let Some(initial_project) = initial_project {
 178                let initial_project_id = room
 179                    .update(&mut cx, |room, cx| {
 180                        room.share_project(initial_project.clone(), cx)
 181                    })
 182                    .await?;
 183                Some(initial_project_id)
 184            } else {
 185                None
 186            };
 187
 188            match room
 189                .update(&mut cx, |room, cx| {
 190                    room.leave_when_empty = true;
 191                    room.call(called_user_id, initial_project_id, cx)
 192                })
 193                .await
 194            {
 195                Ok(()) => Ok(room),
 196                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 197            }
 198        })
 199    }
 200
 201    pub(crate) fn join(
 202        call: &IncomingCall,
 203        client: Arc<Client>,
 204        user_store: ModelHandle<UserStore>,
 205        cx: &mut MutableAppContext,
 206    ) -> Task<Result<ModelHandle<Self>>> {
 207        let room_id = call.room_id;
 208        cx.spawn(|mut cx| async move {
 209            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 210            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 211            let room = cx.add_model(|cx| {
 212                Self::new(
 213                    room_id,
 214                    response.live_kit_connection_info,
 215                    client,
 216                    user_store,
 217                    cx,
 218                )
 219            });
 220            room.update(&mut cx, |room, cx| {
 221                room.leave_when_empty = true;
 222                room.apply_room_update(room_proto, cx)?;
 223                anyhow::Ok(())
 224            })?;
 225            Ok(room)
 226        })
 227    }
 228
 229    fn should_leave(&self) -> bool {
 230        self.leave_when_empty
 231            && self.pending_room_update.is_none()
 232            && self.pending_participants.is_empty()
 233            && self.remote_participants.is_empty()
 234            && self.pending_call_count == 0
 235    }
 236
 237    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 238        if self.status.is_offline() {
 239            return Err(anyhow!("room is offline"));
 240        }
 241
 242        cx.notify();
 243        cx.emit(Event::Left);
 244        log::info!("leaving room");
 245
 246        for project in self.shared_projects.drain() {
 247            if let Some(project) = project.upgrade(cx) {
 248                project.update(cx, |project, cx| {
 249                    project.unshare(cx).log_err();
 250                });
 251            }
 252        }
 253        for project in self.joined_projects.drain() {
 254            if let Some(project) = project.upgrade(cx) {
 255                project.update(cx, |project, cx| {
 256                    project.disconnected_from_host(cx);
 257                });
 258            }
 259        }
 260
 261        self.status = RoomStatus::Offline;
 262        self.remote_participants.clear();
 263        self.pending_participants.clear();
 264        self.participant_user_ids.clear();
 265        self.subscriptions.clear();
 266        self.live_kit.take();
 267        self.pending_room_update.take();
 268        self.maintain_connection.take();
 269        self.client.send(proto::LeaveRoom {})?;
 270        Ok(())
 271    }
 272
 273    async fn maintain_connection(
 274        this: WeakModelHandle<Self>,
 275        client: Arc<Client>,
 276        mut cx: AsyncAppContext,
 277    ) -> Result<()> {
 278        let mut client_status = client.status();
 279        loop {
 280            let _ = client_status.try_recv();
 281            let is_connected = client_status.borrow().is_connected();
 282            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 283            if !is_connected || client_status.next().await.is_some() {
 284                log::info!("detected client disconnection");
 285
 286                this.upgrade(&cx)
 287                    .ok_or_else(|| anyhow!("room was dropped"))?
 288                    .update(&mut cx, |this, cx| {
 289                        this.status = RoomStatus::Rejoining;
 290                        cx.notify();
 291                    });
 292
 293                // Wait for client to re-establish a connection to the server.
 294                {
 295                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 296                    let client_reconnection = async {
 297                        let mut remaining_attempts = 3;
 298                        while remaining_attempts > 0 {
 299                            if client_status.borrow().is_connected() {
 300                                log::info!("client reconnected, attempting to rejoin room");
 301
 302                                let Some(this) = this.upgrade(&cx) else { break };
 303                                if this
 304                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 305                                    .await
 306                                    .log_err()
 307                                    .is_some()
 308                                {
 309                                    return true;
 310                                } else {
 311                                    remaining_attempts -= 1;
 312                                }
 313                            } else if client_status.borrow().is_signed_out() {
 314                                return false;
 315                            }
 316
 317                            log::info!(
 318                                "waiting for client status change, remaining attempts {}",
 319                                remaining_attempts
 320                            );
 321                            client_status.next().await;
 322                        }
 323                        false
 324                    }
 325                    .fuse();
 326                    futures::pin_mut!(client_reconnection);
 327
 328                    futures::select_biased! {
 329                        reconnected = client_reconnection => {
 330                            if reconnected {
 331                                log::info!("successfully reconnected to room");
 332                                // If we successfully joined the room, go back around the loop
 333                                // waiting for future connection status changes.
 334                                continue;
 335                            }
 336                        }
 337                        _ = reconnection_timeout => {
 338                            log::info!("room reconnection timeout expired");
 339                        }
 340                    }
 341                }
 342
 343                break;
 344            }
 345        }
 346
 347        // The client failed to re-establish a connection to the server
 348        // or an error occurred while trying to re-join the room. Either way
 349        // we leave the room and return an error.
 350        if let Some(this) = this.upgrade(&cx) {
 351            log::info!("reconnection failed, leaving room");
 352            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 353        }
 354        Err(anyhow!(
 355            "can't reconnect to room: client failed to re-establish connection"
 356        ))
 357    }
 358
 359    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 360        let mut projects = HashMap::default();
 361        let mut reshared_projects = Vec::new();
 362        let mut rejoined_projects = Vec::new();
 363        self.shared_projects.retain(|project| {
 364            if let Some(handle) = project.upgrade(cx) {
 365                let project = handle.read(cx);
 366                if let Some(project_id) = project.remote_id() {
 367                    projects.insert(project_id, handle.clone());
 368                    reshared_projects.push(proto::UpdateProject {
 369                        project_id,
 370                        worktrees: project.worktree_metadata_protos(cx),
 371                    });
 372                    return true;
 373                }
 374            }
 375            false
 376        });
 377        self.joined_projects.retain(|project| {
 378            if let Some(handle) = project.upgrade(cx) {
 379                let project = handle.read(cx);
 380                if let Some(project_id) = project.remote_id() {
 381                    projects.insert(project_id, handle.clone());
 382                    rejoined_projects.push(proto::RejoinProject {
 383                        id: project_id,
 384                        worktrees: project
 385                            .worktrees(cx)
 386                            .map(|worktree| {
 387                                let worktree = worktree.read(cx);
 388                                proto::RejoinWorktree {
 389                                    id: worktree.id().to_proto(),
 390                                    scan_id: worktree.completed_scan_id() as u64,
 391                                }
 392                            })
 393                            .collect(),
 394                    });
 395                }
 396                return true;
 397            }
 398            false
 399        });
 400
 401        let response = self.client.request(proto::RejoinRoom {
 402            id: self.id,
 403            reshared_projects,
 404            rejoined_projects,
 405        });
 406
 407        cx.spawn(|this, mut cx| async move {
 408            let response = response.await?;
 409            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 410            this.update(&mut cx, |this, cx| {
 411                this.status = RoomStatus::Online;
 412                this.apply_room_update(room_proto, cx)?;
 413
 414                for reshared_project in response.reshared_projects {
 415                    if let Some(project) = projects.get(&reshared_project.id) {
 416                        project.update(cx, |project, cx| {
 417                            project.reshared(reshared_project, cx).log_err();
 418                        });
 419                    }
 420                }
 421
 422                for rejoined_project in response.rejoined_projects {
 423                    if let Some(project) = projects.get(&rejoined_project.id) {
 424                        project.update(cx, |project, cx| {
 425                            project.rejoined(rejoined_project, cx).log_err();
 426                        });
 427                    }
 428                }
 429
 430                anyhow::Ok(())
 431            })
 432        })
 433    }
 434
 435    pub fn id(&self) -> u64 {
 436        self.id
 437    }
 438
 439    pub fn status(&self) -> RoomStatus {
 440        self.status
 441    }
 442
 443    pub fn local_participant(&self) -> &LocalParticipant {
 444        &self.local_participant
 445    }
 446
 447    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 448        &self.remote_participants
 449    }
 450
 451    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 452        self.remote_participants
 453            .values()
 454            .find(|p| p.peer_id == peer_id)
 455    }
 456
 457    pub fn pending_participants(&self) -> &[Arc<User>] {
 458        &self.pending_participants
 459    }
 460
 461    pub fn contains_participant(&self, user_id: u64) -> bool {
 462        self.participant_user_ids.contains(&user_id)
 463    }
 464
 465    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 466        self.follows_by_leader_id_project_id
 467            .get(&(leader_id, project_id))
 468            .map_or(&[], |v| v.as_slice())
 469    }
 470
 471    async fn handle_room_updated(
 472        this: ModelHandle<Self>,
 473        envelope: TypedEnvelope<proto::RoomUpdated>,
 474        _: Arc<Client>,
 475        mut cx: AsyncAppContext,
 476    ) -> Result<()> {
 477        let room = envelope
 478            .payload
 479            .room
 480            .ok_or_else(|| anyhow!("invalid room"))?;
 481        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 482    }
 483
 484    fn apply_room_update(
 485        &mut self,
 486        mut room: proto::Room,
 487        cx: &mut ModelContext<Self>,
 488    ) -> Result<()> {
 489        // Filter ourselves out from the room's participants.
 490        let local_participant_ix = room
 491            .participants
 492            .iter()
 493            .position(|participant| Some(participant.user_id) == self.client.user_id());
 494        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 495
 496        let pending_participant_user_ids = room
 497            .pending_participants
 498            .iter()
 499            .map(|p| p.user_id)
 500            .collect::<Vec<_>>();
 501
 502        let remote_participant_user_ids = room
 503            .participants
 504            .iter()
 505            .map(|p| p.user_id)
 506            .collect::<Vec<_>>();
 507
 508        let (remote_participants, pending_participants) =
 509            self.user_store.update(cx, move |user_store, cx| {
 510                (
 511                    user_store.get_users(remote_participant_user_ids, cx),
 512                    user_store.get_users(pending_participant_user_ids, cx),
 513                )
 514            });
 515
 516        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 517            let (remote_participants, pending_participants) =
 518                futures::join!(remote_participants, pending_participants);
 519
 520            this.update(&mut cx, |this, cx| {
 521                this.participant_user_ids.clear();
 522
 523                if let Some(participant) = local_participant {
 524                    this.local_participant.projects = participant.projects;
 525                } else {
 526                    this.local_participant.projects.clear();
 527                }
 528
 529                if let Some(participants) = remote_participants.log_err() {
 530                    for (participant, user) in room.participants.into_iter().zip(participants) {
 531                        let Some(peer_id) = participant.peer_id else { continue };
 532                        this.participant_user_ids.insert(participant.user_id);
 533
 534                        let old_projects = this
 535                            .remote_participants
 536                            .get(&participant.user_id)
 537                            .into_iter()
 538                            .flat_map(|existing| &existing.projects)
 539                            .map(|project| project.id)
 540                            .collect::<HashSet<_>>();
 541                        let new_projects = participant
 542                            .projects
 543                            .iter()
 544                            .map(|project| project.id)
 545                            .collect::<HashSet<_>>();
 546
 547                        for project in &participant.projects {
 548                            if !old_projects.contains(&project.id) {
 549                                cx.emit(Event::RemoteProjectShared {
 550                                    owner: user.clone(),
 551                                    project_id: project.id,
 552                                    worktree_root_names: project.worktree_root_names.clone(),
 553                                });
 554                            }
 555                        }
 556
 557                        for unshared_project_id in old_projects.difference(&new_projects) {
 558                            this.joined_projects.retain(|project| {
 559                                if let Some(project) = project.upgrade(cx) {
 560                                    project.update(cx, |project, cx| {
 561                                        if project.remote_id() == Some(*unshared_project_id) {
 562                                            project.disconnected_from_host(cx);
 563                                            false
 564                                        } else {
 565                                            true
 566                                        }
 567                                    })
 568                                } else {
 569                                    false
 570                                }
 571                            });
 572                            cx.emit(Event::RemoteProjectUnshared {
 573                                project_id: *unshared_project_id,
 574                            });
 575                        }
 576
 577                        let location = ParticipantLocation::from_proto(participant.location)
 578                            .unwrap_or(ParticipantLocation::External);
 579                        if let Some(remote_participant) =
 580                            this.remote_participants.get_mut(&participant.user_id)
 581                        {
 582                            remote_participant.projects = participant.projects;
 583                            remote_participant.peer_id = peer_id;
 584                            if location != remote_participant.location {
 585                                remote_participant.location = location;
 586                                cx.emit(Event::ParticipantLocationChanged {
 587                                    participant_id: peer_id,
 588                                });
 589                            }
 590                        } else {
 591                            this.remote_participants.insert(
 592                                participant.user_id,
 593                                RemoteParticipant {
 594                                    user: user.clone(),
 595                                    peer_id,
 596                                    projects: participant.projects,
 597                                    location,
 598                                    tracks: Default::default(),
 599                                },
 600                            );
 601
 602                            if let Some(live_kit) = this.live_kit.as_ref() {
 603                                let tracks =
 604                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
 605                                for track in tracks {
 606                                    this.remote_video_track_updated(
 607                                        RemoteVideoTrackUpdate::Subscribed(track),
 608                                        cx,
 609                                    )
 610                                    .log_err();
 611                                }
 612                            }
 613                        }
 614                    }
 615
 616                    this.remote_participants.retain(|user_id, participant| {
 617                        if this.participant_user_ids.contains(user_id) {
 618                            true
 619                        } else {
 620                            for project in &participant.projects {
 621                                cx.emit(Event::RemoteProjectUnshared {
 622                                    project_id: project.id,
 623                                });
 624                            }
 625                            false
 626                        }
 627                    });
 628                }
 629
 630                if let Some(pending_participants) = pending_participants.log_err() {
 631                    this.pending_participants = pending_participants;
 632                    for participant in &this.pending_participants {
 633                        this.participant_user_ids.insert(participant.id);
 634                    }
 635                }
 636
 637                this.follows_by_leader_id_project_id.clear();
 638                for follower in room.followers {
 639                    let project_id = follower.project_id;
 640                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 641                        (Some(leader), Some(follower)) => (leader, follower),
 642
 643                        _ => {
 644                            log::error!("Follower message {follower:?} missing some state");
 645                            continue;
 646                        }
 647                    };
 648
 649                    let list = this
 650                        .follows_by_leader_id_project_id
 651                        .entry((leader, project_id))
 652                        .or_insert(Vec::new());
 653                    if !list.contains(&follower) {
 654                        list.push(follower);
 655                    }
 656                }
 657
 658                this.pending_room_update.take();
 659                if this.should_leave() {
 660                    log::info!("room is empty, leaving");
 661                    let _ = this.leave(cx);
 662                }
 663
 664                this.check_invariants();
 665                cx.notify();
 666            });
 667        }));
 668
 669        cx.notify();
 670        Ok(())
 671    }
 672
 673    fn remote_video_track_updated(
 674        &mut self,
 675        change: RemoteVideoTrackUpdate,
 676        cx: &mut ModelContext<Self>,
 677    ) -> Result<()> {
 678        match change {
 679            RemoteVideoTrackUpdate::Subscribed(track) => {
 680                let user_id = track.publisher_id().parse()?;
 681                let track_id = track.sid().to_string();
 682                let participant = self
 683                    .remote_participants
 684                    .get_mut(&user_id)
 685                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 686                participant.tracks.insert(
 687                    track_id.clone(),
 688                    Arc::new(RemoteVideoTrack {
 689                        live_kit_track: track,
 690                    }),
 691                );
 692                cx.emit(Event::RemoteVideoTracksChanged {
 693                    participant_id: participant.peer_id,
 694                });
 695            }
 696            RemoteVideoTrackUpdate::Unsubscribed {
 697                publisher_id,
 698                track_id,
 699            } => {
 700                let user_id = publisher_id.parse()?;
 701                let participant = self
 702                    .remote_participants
 703                    .get_mut(&user_id)
 704                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 705                participant.tracks.remove(&track_id);
 706                cx.emit(Event::RemoteVideoTracksChanged {
 707                    participant_id: participant.peer_id,
 708                });
 709            }
 710        }
 711
 712        cx.notify();
 713        Ok(())
 714    }
 715
 716    fn check_invariants(&self) {
 717        #[cfg(any(test, feature = "test-support"))]
 718        {
 719            for participant in self.remote_participants.values() {
 720                assert!(self.participant_user_ids.contains(&participant.user.id));
 721                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 722            }
 723
 724            for participant in &self.pending_participants {
 725                assert!(self.participant_user_ids.contains(&participant.id));
 726                assert_ne!(participant.id, self.client.user_id().unwrap());
 727            }
 728
 729            assert_eq!(
 730                self.participant_user_ids.len(),
 731                self.remote_participants.len() + self.pending_participants.len()
 732            );
 733        }
 734    }
 735
 736    pub(crate) fn call(
 737        &mut self,
 738        called_user_id: u64,
 739        initial_project_id: Option<u64>,
 740        cx: &mut ModelContext<Self>,
 741    ) -> Task<Result<()>> {
 742        if self.status.is_offline() {
 743            return Task::ready(Err(anyhow!("room is offline")));
 744        }
 745
 746        cx.notify();
 747        let client = self.client.clone();
 748        let room_id = self.id;
 749        self.pending_call_count += 1;
 750        cx.spawn(|this, mut cx| async move {
 751            let result = client
 752                .request(proto::Call {
 753                    room_id,
 754                    called_user_id,
 755                    initial_project_id,
 756                })
 757                .await;
 758            this.update(&mut cx, |this, cx| {
 759                this.pending_call_count -= 1;
 760                if this.should_leave() {
 761                    this.leave(cx)?;
 762                }
 763                result
 764            })?;
 765            Ok(())
 766        })
 767    }
 768
 769    pub fn join_project(
 770        &mut self,
 771        id: u64,
 772        language_registry: Arc<LanguageRegistry>,
 773        fs: Arc<dyn Fs>,
 774        cx: &mut ModelContext<Self>,
 775    ) -> Task<Result<ModelHandle<Project>>> {
 776        let client = self.client.clone();
 777        let user_store = self.user_store.clone();
 778        cx.spawn(|this, mut cx| async move {
 779            let project =
 780                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 781            this.update(&mut cx, |this, cx| {
 782                this.joined_projects.retain(|project| {
 783                    if let Some(project) = project.upgrade(cx) {
 784                        !project.read(cx).is_read_only()
 785                    } else {
 786                        false
 787                    }
 788                });
 789                this.joined_projects.insert(project.downgrade());
 790            });
 791            Ok(project)
 792        })
 793    }
 794
 795    pub(crate) fn share_project(
 796        &mut self,
 797        project: ModelHandle<Project>,
 798        cx: &mut ModelContext<Self>,
 799    ) -> Task<Result<u64>> {
 800        if let Some(project_id) = project.read(cx).remote_id() {
 801            return Task::ready(Ok(project_id));
 802        }
 803
 804        let request = self.client.request(proto::ShareProject {
 805            room_id: self.id(),
 806            worktrees: project.read(cx).worktree_metadata_protos(cx),
 807        });
 808        cx.spawn(|this, mut cx| async move {
 809            let response = request.await?;
 810
 811            project.update(&mut cx, |project, cx| {
 812                project.shared(response.project_id, cx)
 813            })?;
 814
 815            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 816            this.update(&mut cx, |this, cx| {
 817                this.shared_projects.insert(project.downgrade());
 818                let active_project = this.local_participant.active_project.as_ref();
 819                if active_project.map_or(false, |location| *location == project) {
 820                    this.set_location(Some(&project), cx)
 821                } else {
 822                    Task::ready(Ok(()))
 823                }
 824            })
 825            .await?;
 826
 827            Ok(response.project_id)
 828        })
 829    }
 830
 831    pub(crate) fn unshare_project(
 832        &mut self,
 833        project: ModelHandle<Project>,
 834        cx: &mut ModelContext<Self>,
 835    ) -> Result<()> {
 836        let project_id = match project.read(cx).remote_id() {
 837            Some(project_id) => project_id,
 838            None => return Ok(()),
 839        };
 840
 841        self.client.send(proto::UnshareProject { project_id })?;
 842        project.update(cx, |this, cx| this.unshare(cx))
 843    }
 844
 845    pub(crate) fn set_location(
 846        &mut self,
 847        project: Option<&ModelHandle<Project>>,
 848        cx: &mut ModelContext<Self>,
 849    ) -> Task<Result<()>> {
 850        if self.status.is_offline() {
 851            return Task::ready(Err(anyhow!("room is offline")));
 852        }
 853
 854        let client = self.client.clone();
 855        let room_id = self.id;
 856        let location = if let Some(project) = project {
 857            self.local_participant.active_project = Some(project.downgrade());
 858            if let Some(project_id) = project.read(cx).remote_id() {
 859                proto::participant_location::Variant::SharedProject(
 860                    proto::participant_location::SharedProject { id: project_id },
 861                )
 862            } else {
 863                proto::participant_location::Variant::UnsharedProject(
 864                    proto::participant_location::UnsharedProject {},
 865                )
 866            }
 867        } else {
 868            self.local_participant.active_project = None;
 869            proto::participant_location::Variant::External(proto::participant_location::External {})
 870        };
 871
 872        cx.notify();
 873        cx.foreground().spawn(async move {
 874            client
 875                .request(proto::UpdateParticipantLocation {
 876                    room_id,
 877                    location: Some(proto::ParticipantLocation {
 878                        variant: Some(location),
 879                    }),
 880                })
 881                .await?;
 882            Ok(())
 883        })
 884    }
 885
 886    pub fn is_screen_sharing(&self) -> bool {
 887        self.live_kit.as_ref().map_or(false, |live_kit| {
 888            !matches!(live_kit.screen_track, ScreenTrack::None)
 889        })
 890    }
 891
 892    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 893        if self.status.is_offline() {
 894            return Task::ready(Err(anyhow!("room is offline")));
 895        } else if self.is_screen_sharing() {
 896            return Task::ready(Err(anyhow!("screen was already shared")));
 897        }
 898
 899        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 900            let publish_id = post_inc(&mut live_kit.next_publish_id);
 901            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 902            cx.notify();
 903            (live_kit.room.display_sources(), publish_id)
 904        } else {
 905            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 906        };
 907
 908        cx.spawn_weak(|this, mut cx| async move {
 909            let publish_track = async {
 910                let displays = displays.await?;
 911                let display = displays
 912                    .first()
 913                    .ok_or_else(|| anyhow!("no display found"))?;
 914                let track = LocalVideoTrack::screen_share_for_display(&display);
 915                this.upgrade(&cx)
 916                    .ok_or_else(|| anyhow!("room was dropped"))?
 917                    .read_with(&cx, |this, _| {
 918                        this.live_kit
 919                            .as_ref()
 920                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 921                    })
 922                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 923                    .await
 924            };
 925
 926            let publication = publish_track.await;
 927            this.upgrade(&cx)
 928                .ok_or_else(|| anyhow!("room was dropped"))?
 929                .update(&mut cx, |this, cx| {
 930                    let live_kit = this
 931                        .live_kit
 932                        .as_mut()
 933                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 934
 935                    let canceled = if let ScreenTrack::Pending {
 936                        publish_id: cur_publish_id,
 937                    } = &live_kit.screen_track
 938                    {
 939                        *cur_publish_id != publish_id
 940                    } else {
 941                        true
 942                    };
 943
 944                    match publication {
 945                        Ok(publication) => {
 946                            if canceled {
 947                                live_kit.room.unpublish_track(publication);
 948                            } else {
 949                                live_kit.screen_track = ScreenTrack::Published(publication);
 950                                cx.notify();
 951                            }
 952                            Ok(())
 953                        }
 954                        Err(error) => {
 955                            if canceled {
 956                                Ok(())
 957                            } else {
 958                                live_kit.screen_track = ScreenTrack::None;
 959                                cx.notify();
 960                                Err(error)
 961                            }
 962                        }
 963                    }
 964                })
 965        })
 966    }
 967
 968    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 969        if self.status.is_offline() {
 970            return Err(anyhow!("room is offline"));
 971        }
 972
 973        let live_kit = self
 974            .live_kit
 975            .as_mut()
 976            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 977        match mem::take(&mut live_kit.screen_track) {
 978            ScreenTrack::None => Err(anyhow!("screen was not shared")),
 979            ScreenTrack::Pending { .. } => {
 980                cx.notify();
 981                Ok(())
 982            }
 983            ScreenTrack::Published(track) => {
 984                live_kit.room.unpublish_track(track);
 985                cx.notify();
 986                Ok(())
 987            }
 988        }
 989    }
 990
 991    #[cfg(any(test, feature = "test-support"))]
 992    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
 993        self.live_kit
 994            .as_ref()
 995            .unwrap()
 996            .room
 997            .set_display_sources(sources);
 998    }
 999}
1000
1001struct LiveKitRoom {
1002    room: Arc<live_kit_client::Room>,
1003    screen_track: ScreenTrack,
1004    next_publish_id: usize,
1005    _maintain_room: Task<()>,
1006    _maintain_tracks: Task<()>,
1007}
1008
1009enum ScreenTrack {
1010    None,
1011    Pending { publish_id: usize },
1012    Published(LocalTrackPublication),
1013}
1014
1015impl Default for ScreenTrack {
1016    fn default() -> Self {
1017        Self::None
1018    }
1019}
1020
1021#[derive(Copy, Clone, PartialEq, Eq)]
1022pub enum RoomStatus {
1023    Online,
1024    Rejoining,
1025    Offline,
1026}
1027
1028impl RoomStatus {
1029    pub fn is_offline(&self) -> bool {
1030        matches!(self, RoomStatus::Offline)
1031    }
1032
1033    pub fn is_online(&self) -> bool {
1034        matches!(self, RoomStatus::Online)
1035    }
1036}