room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    Left,
  48}
  49
  50pub struct Room {
  51    id: u64,
  52    channel_id: Option<u64>,
  53    live_kit: Option<LiveKitRoom>,
  54    status: RoomStatus,
  55    shared_projects: HashSet<WeakModelHandle<Project>>,
  56    joined_projects: HashSet<WeakModelHandle<Project>>,
  57    local_participant: LocalParticipant,
  58    remote_participants: BTreeMap<u64, RemoteParticipant>,
  59    pending_participants: Vec<Arc<User>>,
  60    participant_user_ids: HashSet<u64>,
  61    pending_call_count: usize,
  62    leave_when_empty: bool,
  63    client: Arc<Client>,
  64    user_store: ModelHandle<UserStore>,
  65    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  66    subscriptions: Vec<client::Subscription>,
  67    pending_room_update: Option<Task<()>>,
  68    maintain_connection: Option<Task<Option<()>>>,
  69}
  70
  71impl Entity for Room {
  72    type Event = Event;
  73
  74    fn release(&mut self, cx: &mut AppContext) {
  75        if self.status.is_online() {
  76            self.leave_internal(cx).detach_and_log_err(cx);
  77        }
  78    }
  79
  80    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  81        if self.status.is_online() {
  82            let leave = self.leave_internal(cx);
  83            Some(
  84                cx.background()
  85                    .spawn(async move {
  86                        leave.await.log_err();
  87                    })
  88                    .boxed(),
  89            )
  90        } else {
  91            None
  92        }
  93    }
  94}
  95
  96impl Room {
  97    pub fn channel_id(&self) -> Option<u64> {
  98        self.channel_id
  99    }
 100
 101    #[cfg(any(test, feature = "test-support"))]
 102    pub fn is_connected(&self) -> bool {
 103        if let Some(live_kit) = self.live_kit.as_ref() {
 104            matches!(
 105                *live_kit.room.status().borrow(),
 106                live_kit_client::ConnectionState::Connected { .. }
 107            )
 108        } else {
 109            false
 110        }
 111    }
 112
 113    fn new(
 114        id: u64,
 115        channel_id: Option<u64>,
 116        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 117        client: Arc<Client>,
 118        user_store: ModelHandle<UserStore>,
 119        cx: &mut ModelContext<Self>,
 120    ) -> Self {
 121        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 122            let room = live_kit_client::Room::new();
 123            let mut status = room.status();
 124            // Consume the initial status of the room.
 125            let _ = status.try_recv();
 126            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 127                while let Some(status) = status.next().await {
 128                    let this = if let Some(this) = this.upgrade(&cx) {
 129                        this
 130                    } else {
 131                        break;
 132                    };
 133
 134                    if status == live_kit_client::ConnectionState::Disconnected {
 135                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 136                        break;
 137                    }
 138                }
 139            });
 140
 141            let mut track_video_changes = room.remote_video_track_updates();
 142            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 143                while let Some(track_change) = track_video_changes.next().await {
 144                    let this = if let Some(this) = this.upgrade(&cx) {
 145                        this
 146                    } else {
 147                        break;
 148                    };
 149
 150                    this.update(&mut cx, |this, cx| {
 151                        this.remote_video_track_updated(track_change, cx).log_err()
 152                    });
 153                }
 154            });
 155
 156            let mut track_audio_changes = room.remote_audio_track_updates();
 157            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 158                while let Some(track_change) = track_audio_changes.next().await {
 159                    let this = if let Some(this) = this.upgrade(&cx) {
 160                        this
 161                    } else {
 162                        break;
 163                    };
 164
 165                    this.update(&mut cx, |this, cx| {
 166                        this.remote_audio_track_updated(track_change, cx).log_err()
 167                    });
 168                }
 169            });
 170
 171            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 172            cx.spawn(|this, mut cx| async move {
 173                connect.await?;
 174
 175                if !cx.read(|cx| settings::get::<CallSettings>(cx).mute_on_join) {
 176                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 177                        .await?;
 178                }
 179
 180                anyhow::Ok(())
 181            })
 182            .detach_and_log_err(cx);
 183
 184            Some(LiveKitRoom {
 185                room,
 186                screen_track: LocalTrack::None,
 187                microphone_track: LocalTrack::None,
 188                next_publish_id: 0,
 189                muted_by_user: false,
 190                deafened: false,
 191                speaking: false,
 192                _maintain_room,
 193                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 194            })
 195        } else {
 196            None
 197        };
 198
 199        let maintain_connection =
 200            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 201
 202        Audio::play_sound(Sound::Joined, cx);
 203
 204        Self {
 205            id,
 206            channel_id,
 207            live_kit: live_kit_room,
 208            status: RoomStatus::Online,
 209            shared_projects: Default::default(),
 210            joined_projects: Default::default(),
 211            participant_user_ids: Default::default(),
 212            local_participant: Default::default(),
 213            remote_participants: Default::default(),
 214            pending_participants: Default::default(),
 215            pending_call_count: 0,
 216            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 217            leave_when_empty: false,
 218            pending_room_update: None,
 219            client,
 220            user_store,
 221            follows_by_leader_id_project_id: Default::default(),
 222            maintain_connection: Some(maintain_connection),
 223        }
 224    }
 225
 226    pub(crate) fn create(
 227        called_user_id: u64,
 228        initial_project: Option<ModelHandle<Project>>,
 229        client: Arc<Client>,
 230        user_store: ModelHandle<UserStore>,
 231        cx: &mut AppContext,
 232    ) -> Task<Result<ModelHandle<Self>>> {
 233        cx.spawn(|mut cx| async move {
 234            let response = client.request(proto::CreateRoom {}).await?;
 235            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 236            let room = cx.add_model(|cx| {
 237                Self::new(
 238                    room_proto.id,
 239                    None,
 240                    response.live_kit_connection_info,
 241                    client,
 242                    user_store,
 243                    cx,
 244                )
 245            });
 246
 247            let initial_project_id = if let Some(initial_project) = initial_project {
 248                let initial_project_id = room
 249                    .update(&mut cx, |room, cx| {
 250                        room.share_project(initial_project.clone(), cx)
 251                    })
 252                    .await?;
 253                Some(initial_project_id)
 254            } else {
 255                None
 256            };
 257
 258            match room
 259                .update(&mut cx, |room, cx| {
 260                    room.leave_when_empty = true;
 261                    room.call(called_user_id, initial_project_id, cx)
 262                })
 263                .await
 264            {
 265                Ok(()) => Ok(room),
 266                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 267            }
 268        })
 269    }
 270
 271    pub(crate) fn join_channel(
 272        channel_id: u64,
 273        client: Arc<Client>,
 274        user_store: ModelHandle<UserStore>,
 275        cx: &mut AppContext,
 276    ) -> Task<Result<ModelHandle<Self>>> {
 277        cx.spawn(|cx| async move {
 278            Self::from_join_response(
 279                client.request(proto::JoinChannel { channel_id }).await?,
 280                client,
 281                user_store,
 282                cx,
 283            )
 284        })
 285    }
 286
 287    pub(crate) fn join(
 288        call: &IncomingCall,
 289        client: Arc<Client>,
 290        user_store: ModelHandle<UserStore>,
 291        cx: &mut AppContext,
 292    ) -> Task<Result<ModelHandle<Self>>> {
 293        let id = call.room_id;
 294        cx.spawn(|cx| async move {
 295            Self::from_join_response(
 296                client.request(proto::JoinRoom { id }).await?,
 297                client,
 298                user_store,
 299                cx,
 300            )
 301        })
 302    }
 303
 304    fn from_join_response(
 305        response: proto::JoinRoomResponse,
 306        client: Arc<Client>,
 307        user_store: ModelHandle<UserStore>,
 308        mut cx: AsyncAppContext,
 309    ) -> Result<ModelHandle<Self>> {
 310        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 311        let room = cx.add_model(|cx| {
 312            Self::new(
 313                room_proto.id,
 314                response.channel_id,
 315                response.live_kit_connection_info,
 316                client,
 317                user_store,
 318                cx,
 319            )
 320        });
 321        room.update(&mut cx, |room, cx| {
 322            room.leave_when_empty = room.channel_id.is_none();
 323            room.apply_room_update(room_proto, cx)?;
 324            anyhow::Ok(())
 325        })?;
 326        Ok(room)
 327    }
 328
 329    fn should_leave(&self) -> bool {
 330        self.leave_when_empty
 331            && self.pending_room_update.is_none()
 332            && self.pending_participants.is_empty()
 333            && self.remote_participants.is_empty()
 334            && self.pending_call_count == 0
 335    }
 336
 337    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 338        cx.notify();
 339        cx.emit(Event::Left);
 340        self.leave_internal(cx)
 341    }
 342
 343    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 344        if self.status.is_offline() {
 345            return Task::ready(Err(anyhow!("room is offline")));
 346        }
 347
 348        log::info!("leaving room");
 349        Audio::play_sound(Sound::Leave, cx);
 350
 351        self.clear_state(cx);
 352
 353        let leave_room = self.client.request(proto::LeaveRoom {});
 354        cx.background().spawn(async move {
 355            leave_room.await?;
 356            anyhow::Ok(())
 357        })
 358    }
 359
 360    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 361        for project in self.shared_projects.drain() {
 362            if let Some(project) = project.upgrade(cx) {
 363                project.update(cx, |project, cx| {
 364                    project.unshare(cx).log_err();
 365                });
 366            }
 367        }
 368        for project in self.joined_projects.drain() {
 369            if let Some(project) = project.upgrade(cx) {
 370                project.update(cx, |project, cx| {
 371                    project.disconnected_from_host(cx);
 372                    project.close(cx);
 373                });
 374            }
 375        }
 376
 377        self.status = RoomStatus::Offline;
 378        self.remote_participants.clear();
 379        self.pending_participants.clear();
 380        self.participant_user_ids.clear();
 381        self.subscriptions.clear();
 382        self.live_kit.take();
 383        self.pending_room_update.take();
 384        self.maintain_connection.take();
 385    }
 386
 387    async fn maintain_connection(
 388        this: WeakModelHandle<Self>,
 389        client: Arc<Client>,
 390        mut cx: AsyncAppContext,
 391    ) -> Result<()> {
 392        let mut client_status = client.status();
 393        loop {
 394            let _ = client_status.try_recv();
 395            let is_connected = client_status.borrow().is_connected();
 396            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 397            if !is_connected || client_status.next().await.is_some() {
 398                log::info!("detected client disconnection");
 399
 400                this.upgrade(&cx)
 401                    .ok_or_else(|| anyhow!("room was dropped"))?
 402                    .update(&mut cx, |this, cx| {
 403                        this.status = RoomStatus::Rejoining;
 404                        cx.notify();
 405                    });
 406
 407                // Wait for client to re-establish a connection to the server.
 408                {
 409                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 410                    let client_reconnection = async {
 411                        let mut remaining_attempts = 3;
 412                        while remaining_attempts > 0 {
 413                            if client_status.borrow().is_connected() {
 414                                log::info!("client reconnected, attempting to rejoin room");
 415
 416                                let Some(this) = this.upgrade(&cx) else { break };
 417                                if this
 418                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 419                                    .await
 420                                    .log_err()
 421                                    .is_some()
 422                                {
 423                                    return true;
 424                                } else {
 425                                    remaining_attempts -= 1;
 426                                }
 427                            } else if client_status.borrow().is_signed_out() {
 428                                return false;
 429                            }
 430
 431                            log::info!(
 432                                "waiting for client status change, remaining attempts {}",
 433                                remaining_attempts
 434                            );
 435                            client_status.next().await;
 436                        }
 437                        false
 438                    }
 439                    .fuse();
 440                    futures::pin_mut!(client_reconnection);
 441
 442                    futures::select_biased! {
 443                        reconnected = client_reconnection => {
 444                            if reconnected {
 445                                log::info!("successfully reconnected to room");
 446                                // If we successfully joined the room, go back around the loop
 447                                // waiting for future connection status changes.
 448                                continue;
 449                            }
 450                        }
 451                        _ = reconnection_timeout => {
 452                            log::info!("room reconnection timeout expired");
 453                        }
 454                    }
 455                }
 456
 457                break;
 458            }
 459        }
 460
 461        // The client failed to re-establish a connection to the server
 462        // or an error occurred while trying to re-join the room. Either way
 463        // we leave the room and return an error.
 464        if let Some(this) = this.upgrade(&cx) {
 465            log::info!("reconnection failed, leaving room");
 466            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 467        }
 468        Err(anyhow!(
 469            "can't reconnect to room: client failed to re-establish connection"
 470        ))
 471    }
 472
 473    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 474        let mut projects = HashMap::default();
 475        let mut reshared_projects = Vec::new();
 476        let mut rejoined_projects = Vec::new();
 477        self.shared_projects.retain(|project| {
 478            if let Some(handle) = project.upgrade(cx) {
 479                let project = handle.read(cx);
 480                if let Some(project_id) = project.remote_id() {
 481                    projects.insert(project_id, handle.clone());
 482                    reshared_projects.push(proto::UpdateProject {
 483                        project_id,
 484                        worktrees: project.worktree_metadata_protos(cx),
 485                    });
 486                    return true;
 487                }
 488            }
 489            false
 490        });
 491        self.joined_projects.retain(|project| {
 492            if let Some(handle) = project.upgrade(cx) {
 493                let project = handle.read(cx);
 494                if let Some(project_id) = project.remote_id() {
 495                    projects.insert(project_id, handle.clone());
 496                    rejoined_projects.push(proto::RejoinProject {
 497                        id: project_id,
 498                        worktrees: project
 499                            .worktrees(cx)
 500                            .map(|worktree| {
 501                                let worktree = worktree.read(cx);
 502                                proto::RejoinWorktree {
 503                                    id: worktree.id().to_proto(),
 504                                    scan_id: worktree.completed_scan_id() as u64,
 505                                }
 506                            })
 507                            .collect(),
 508                    });
 509                }
 510                return true;
 511            }
 512            false
 513        });
 514
 515        let response = self.client.request_envelope(proto::RejoinRoom {
 516            id: self.id,
 517            reshared_projects,
 518            rejoined_projects,
 519        });
 520
 521        cx.spawn(|this, mut cx| async move {
 522            let response = response.await?;
 523            let message_id = response.message_id;
 524            let response = response.payload;
 525            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 526            this.update(&mut cx, |this, cx| {
 527                this.status = RoomStatus::Online;
 528                this.apply_room_update(room_proto, cx)?;
 529
 530                for reshared_project in response.reshared_projects {
 531                    if let Some(project) = projects.get(&reshared_project.id) {
 532                        project.update(cx, |project, cx| {
 533                            project.reshared(reshared_project, cx).log_err();
 534                        });
 535                    }
 536                }
 537
 538                for rejoined_project in response.rejoined_projects {
 539                    if let Some(project) = projects.get(&rejoined_project.id) {
 540                        project.update(cx, |project, cx| {
 541                            project.rejoined(rejoined_project, message_id, cx).log_err();
 542                        });
 543                    }
 544                }
 545
 546                anyhow::Ok(())
 547            })
 548        })
 549    }
 550
 551    pub fn id(&self) -> u64 {
 552        self.id
 553    }
 554
 555    pub fn status(&self) -> RoomStatus {
 556        self.status
 557    }
 558
 559    pub fn local_participant(&self) -> &LocalParticipant {
 560        &self.local_participant
 561    }
 562
 563    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 564        &self.remote_participants
 565    }
 566
 567    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 568        self.remote_participants
 569            .values()
 570            .find(|p| p.peer_id == peer_id)
 571    }
 572
 573    pub fn pending_participants(&self) -> &[Arc<User>] {
 574        &self.pending_participants
 575    }
 576
 577    pub fn contains_participant(&self, user_id: u64) -> bool {
 578        self.participant_user_ids.contains(&user_id)
 579    }
 580
 581    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 582        self.follows_by_leader_id_project_id
 583            .get(&(leader_id, project_id))
 584            .map_or(&[], |v| v.as_slice())
 585    }
 586
 587    async fn handle_room_updated(
 588        this: ModelHandle<Self>,
 589        envelope: TypedEnvelope<proto::RoomUpdated>,
 590        _: Arc<Client>,
 591        mut cx: AsyncAppContext,
 592    ) -> Result<()> {
 593        let room = envelope
 594            .payload
 595            .room
 596            .ok_or_else(|| anyhow!("invalid room"))?;
 597        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 598    }
 599
 600    fn apply_room_update(
 601        &mut self,
 602        mut room: proto::Room,
 603        cx: &mut ModelContext<Self>,
 604    ) -> Result<()> {
 605        // Filter ourselves out from the room's participants.
 606        let local_participant_ix = room
 607            .participants
 608            .iter()
 609            .position(|participant| Some(participant.user_id) == self.client.user_id());
 610        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 611
 612        let pending_participant_user_ids = room
 613            .pending_participants
 614            .iter()
 615            .map(|p| p.user_id)
 616            .collect::<Vec<_>>();
 617
 618        let remote_participant_user_ids = room
 619            .participants
 620            .iter()
 621            .map(|p| p.user_id)
 622            .collect::<Vec<_>>();
 623
 624        let (remote_participants, pending_participants) =
 625            self.user_store.update(cx, move |user_store, cx| {
 626                (
 627                    user_store.get_users(remote_participant_user_ids, cx),
 628                    user_store.get_users(pending_participant_user_ids, cx),
 629                )
 630            });
 631
 632        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 633            let (remote_participants, pending_participants) =
 634                futures::join!(remote_participants, pending_participants);
 635
 636            this.update(&mut cx, |this, cx| {
 637                this.participant_user_ids.clear();
 638
 639                if let Some(participant) = local_participant {
 640                    this.local_participant.projects = participant.projects;
 641                } else {
 642                    this.local_participant.projects.clear();
 643                }
 644
 645                if let Some(participants) = remote_participants.log_err() {
 646                    for (participant, user) in room.participants.into_iter().zip(participants) {
 647                        let Some(peer_id) = participant.peer_id else { continue };
 648                        this.participant_user_ids.insert(participant.user_id);
 649
 650                        let old_projects = this
 651                            .remote_participants
 652                            .get(&participant.user_id)
 653                            .into_iter()
 654                            .flat_map(|existing| &existing.projects)
 655                            .map(|project| project.id)
 656                            .collect::<HashSet<_>>();
 657                        let new_projects = participant
 658                            .projects
 659                            .iter()
 660                            .map(|project| project.id)
 661                            .collect::<HashSet<_>>();
 662
 663                        for project in &participant.projects {
 664                            if !old_projects.contains(&project.id) {
 665                                cx.emit(Event::RemoteProjectShared {
 666                                    owner: user.clone(),
 667                                    project_id: project.id,
 668                                    worktree_root_names: project.worktree_root_names.clone(),
 669                                });
 670                            }
 671                        }
 672
 673                        for unshared_project_id in old_projects.difference(&new_projects) {
 674                            this.joined_projects.retain(|project| {
 675                                if let Some(project) = project.upgrade(cx) {
 676                                    project.update(cx, |project, cx| {
 677                                        if project.remote_id() == Some(*unshared_project_id) {
 678                                            project.disconnected_from_host(cx);
 679                                            false
 680                                        } else {
 681                                            true
 682                                        }
 683                                    })
 684                                } else {
 685                                    false
 686                                }
 687                            });
 688                            cx.emit(Event::RemoteProjectUnshared {
 689                                project_id: *unshared_project_id,
 690                            });
 691                        }
 692
 693                        let location = ParticipantLocation::from_proto(participant.location)
 694                            .unwrap_or(ParticipantLocation::External);
 695                        if let Some(remote_participant) =
 696                            this.remote_participants.get_mut(&participant.user_id)
 697                        {
 698                            remote_participant.projects = participant.projects;
 699                            remote_participant.peer_id = peer_id;
 700                            if location != remote_participant.location {
 701                                remote_participant.location = location;
 702                                cx.emit(Event::ParticipantLocationChanged {
 703                                    participant_id: peer_id,
 704                                });
 705                            }
 706                        } else {
 707                            this.remote_participants.insert(
 708                                participant.user_id,
 709                                RemoteParticipant {
 710                                    user: user.clone(),
 711                                    peer_id,
 712                                    projects: participant.projects,
 713                                    location,
 714                                    muted: true,
 715                                    speaking: false,
 716                                    video_tracks: Default::default(),
 717                                    audio_tracks: Default::default(),
 718                                },
 719                            );
 720
 721                            Audio::play_sound(Sound::Joined, cx);
 722
 723                            if let Some(live_kit) = this.live_kit.as_ref() {
 724                                let video_tracks =
 725                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 726                                let audio_tracks =
 727                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 728                                let publications = live_kit
 729                                    .room
 730                                    .remote_audio_track_publications(&user.id.to_string());
 731
 732                                for track in video_tracks {
 733                                    this.remote_video_track_updated(
 734                                        RemoteVideoTrackUpdate::Subscribed(track),
 735                                        cx,
 736                                    )
 737                                    .log_err();
 738                                }
 739
 740                                for (track, publication) in
 741                                    audio_tracks.iter().zip(publications.iter())
 742                                {
 743                                    this.remote_audio_track_updated(
 744                                        RemoteAudioTrackUpdate::Subscribed(
 745                                            track.clone(),
 746                                            publication.clone(),
 747                                        ),
 748                                        cx,
 749                                    )
 750                                    .log_err();
 751                                }
 752                            }
 753                        }
 754                    }
 755
 756                    this.remote_participants.retain(|user_id, participant| {
 757                        if this.participant_user_ids.contains(user_id) {
 758                            true
 759                        } else {
 760                            for project in &participant.projects {
 761                                cx.emit(Event::RemoteProjectUnshared {
 762                                    project_id: project.id,
 763                                });
 764                            }
 765                            false
 766                        }
 767                    });
 768                }
 769
 770                if let Some(pending_participants) = pending_participants.log_err() {
 771                    this.pending_participants = pending_participants;
 772                    for participant in &this.pending_participants {
 773                        this.participant_user_ids.insert(participant.id);
 774                    }
 775                }
 776
 777                this.follows_by_leader_id_project_id.clear();
 778                for follower in room.followers {
 779                    let project_id = follower.project_id;
 780                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 781                        (Some(leader), Some(follower)) => (leader, follower),
 782
 783                        _ => {
 784                            log::error!("Follower message {follower:?} missing some state");
 785                            continue;
 786                        }
 787                    };
 788
 789                    let list = this
 790                        .follows_by_leader_id_project_id
 791                        .entry((leader, project_id))
 792                        .or_insert(Vec::new());
 793                    if !list.contains(&follower) {
 794                        list.push(follower);
 795                    }
 796                }
 797
 798                this.pending_room_update.take();
 799                if this.should_leave() {
 800                    log::info!("room is empty, leaving");
 801                    let _ = this.leave(cx);
 802                }
 803
 804                this.check_invariants();
 805                cx.notify();
 806            });
 807        }));
 808
 809        cx.notify();
 810        Ok(())
 811    }
 812
 813    fn remote_video_track_updated(
 814        &mut self,
 815        change: RemoteVideoTrackUpdate,
 816        cx: &mut ModelContext<Self>,
 817    ) -> Result<()> {
 818        match change {
 819            RemoteVideoTrackUpdate::Subscribed(track) => {
 820                let user_id = track.publisher_id().parse()?;
 821                let track_id = track.sid().to_string();
 822                let participant = self
 823                    .remote_participants
 824                    .get_mut(&user_id)
 825                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 826                participant.video_tracks.insert(
 827                    track_id.clone(),
 828                    Arc::new(RemoteVideoTrack {
 829                        live_kit_track: track,
 830                    }),
 831                );
 832                cx.emit(Event::RemoteVideoTracksChanged {
 833                    participant_id: participant.peer_id,
 834                });
 835            }
 836            RemoteVideoTrackUpdate::Unsubscribed {
 837                publisher_id,
 838                track_id,
 839            } => {
 840                let user_id = publisher_id.parse()?;
 841                let participant = self
 842                    .remote_participants
 843                    .get_mut(&user_id)
 844                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 845                participant.video_tracks.remove(&track_id);
 846                cx.emit(Event::RemoteVideoTracksChanged {
 847                    participant_id: participant.peer_id,
 848                });
 849            }
 850        }
 851
 852        cx.notify();
 853        Ok(())
 854    }
 855
 856    fn remote_audio_track_updated(
 857        &mut self,
 858        change: RemoteAudioTrackUpdate,
 859        cx: &mut ModelContext<Self>,
 860    ) -> Result<()> {
 861        match change {
 862            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 863                let mut speaker_ids = speakers
 864                    .into_iter()
 865                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 866                    .collect::<Vec<u64>>();
 867                speaker_ids.sort_unstable();
 868                for (sid, participant) in &mut self.remote_participants {
 869                    if let Ok(_) = speaker_ids.binary_search(sid) {
 870                        participant.speaking = true;
 871                    } else {
 872                        participant.speaking = false;
 873                    }
 874                }
 875                if let Some(id) = self.client.user_id() {
 876                    if let Some(room) = &mut self.live_kit {
 877                        if let Ok(_) = speaker_ids.binary_search(&id) {
 878                            room.speaking = true;
 879                        } else {
 880                            room.speaking = false;
 881                        }
 882                    }
 883                }
 884                cx.notify();
 885            }
 886            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 887                let mut found = false;
 888                for participant in &mut self.remote_participants.values_mut() {
 889                    for track in participant.audio_tracks.values() {
 890                        if track.sid() == track_id {
 891                            found = true;
 892                            break;
 893                        }
 894                    }
 895                    if found {
 896                        participant.muted = muted;
 897                        break;
 898                    }
 899                }
 900
 901                cx.notify();
 902            }
 903            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 904                let user_id = track.publisher_id().parse()?;
 905                let track_id = track.sid().to_string();
 906                let participant = self
 907                    .remote_participants
 908                    .get_mut(&user_id)
 909                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 910
 911                participant.audio_tracks.insert(track_id.clone(), track);
 912                participant.muted = publication.is_muted();
 913
 914                cx.emit(Event::RemoteAudioTracksChanged {
 915                    participant_id: participant.peer_id,
 916                });
 917            }
 918            RemoteAudioTrackUpdate::Unsubscribed {
 919                publisher_id,
 920                track_id,
 921            } => {
 922                let user_id = publisher_id.parse()?;
 923                let participant = self
 924                    .remote_participants
 925                    .get_mut(&user_id)
 926                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 927                participant.audio_tracks.remove(&track_id);
 928                cx.emit(Event::RemoteAudioTracksChanged {
 929                    participant_id: participant.peer_id,
 930                });
 931            }
 932        }
 933
 934        cx.notify();
 935        Ok(())
 936    }
 937
 938    fn check_invariants(&self) {
 939        #[cfg(any(test, feature = "test-support"))]
 940        {
 941            for participant in self.remote_participants.values() {
 942                assert!(self.participant_user_ids.contains(&participant.user.id));
 943                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 944            }
 945
 946            for participant in &self.pending_participants {
 947                assert!(self.participant_user_ids.contains(&participant.id));
 948                assert_ne!(participant.id, self.client.user_id().unwrap());
 949            }
 950
 951            assert_eq!(
 952                self.participant_user_ids.len(),
 953                self.remote_participants.len() + self.pending_participants.len()
 954            );
 955        }
 956    }
 957
 958    pub(crate) fn call(
 959        &mut self,
 960        called_user_id: u64,
 961        initial_project_id: Option<u64>,
 962        cx: &mut ModelContext<Self>,
 963    ) -> Task<Result<()>> {
 964        if self.status.is_offline() {
 965            return Task::ready(Err(anyhow!("room is offline")));
 966        }
 967
 968        cx.notify();
 969        let client = self.client.clone();
 970        let room_id = self.id;
 971        self.pending_call_count += 1;
 972        cx.spawn(|this, mut cx| async move {
 973            let result = client
 974                .request(proto::Call {
 975                    room_id,
 976                    called_user_id,
 977                    initial_project_id,
 978                })
 979                .await;
 980            this.update(&mut cx, |this, cx| {
 981                this.pending_call_count -= 1;
 982                if this.should_leave() {
 983                    this.leave(cx).detach_and_log_err(cx);
 984                }
 985            });
 986            result?;
 987            Ok(())
 988        })
 989    }
 990
 991    pub fn join_project(
 992        &mut self,
 993        id: u64,
 994        language_registry: Arc<LanguageRegistry>,
 995        fs: Arc<dyn Fs>,
 996        cx: &mut ModelContext<Self>,
 997    ) -> Task<Result<ModelHandle<Project>>> {
 998        let client = self.client.clone();
 999        let user_store = self.user_store.clone();
1000        cx.spawn(|this, mut cx| async move {
1001            let project =
1002                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1003
1004            this.update(&mut cx, |this, cx| {
1005                this.joined_projects.retain(|project| {
1006                    if let Some(project) = project.upgrade(cx) {
1007                        !project.read(cx).is_read_only()
1008                    } else {
1009                        false
1010                    }
1011                });
1012                this.joined_projects.insert(project.downgrade());
1013            });
1014            Ok(project)
1015        })
1016    }
1017
1018    pub(crate) fn share_project(
1019        &mut self,
1020        project: ModelHandle<Project>,
1021        cx: &mut ModelContext<Self>,
1022    ) -> Task<Result<u64>> {
1023        if let Some(project_id) = project.read(cx).remote_id() {
1024            return Task::ready(Ok(project_id));
1025        }
1026
1027        let request = self.client.request(proto::ShareProject {
1028            room_id: self.id(),
1029            worktrees: project.read(cx).worktree_metadata_protos(cx),
1030        });
1031        cx.spawn(|this, mut cx| async move {
1032            let response = request.await?;
1033
1034            project.update(&mut cx, |project, cx| {
1035                project.shared(response.project_id, cx)
1036            })?;
1037
1038            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1039            this.update(&mut cx, |this, cx| {
1040                this.shared_projects.insert(project.downgrade());
1041                let active_project = this.local_participant.active_project.as_ref();
1042                if active_project.map_or(false, |location| *location == project) {
1043                    this.set_location(Some(&project), cx)
1044                } else {
1045                    Task::ready(Ok(()))
1046                }
1047            })
1048            .await?;
1049
1050            Ok(response.project_id)
1051        })
1052    }
1053
1054    pub(crate) fn unshare_project(
1055        &mut self,
1056        project: ModelHandle<Project>,
1057        cx: &mut ModelContext<Self>,
1058    ) -> Result<()> {
1059        let project_id = match project.read(cx).remote_id() {
1060            Some(project_id) => project_id,
1061            None => return Ok(()),
1062        };
1063
1064        self.client.send(proto::UnshareProject { project_id })?;
1065        project.update(cx, |this, cx| this.unshare(cx))
1066    }
1067
1068    pub(crate) fn set_location(
1069        &mut self,
1070        project: Option<&ModelHandle<Project>>,
1071        cx: &mut ModelContext<Self>,
1072    ) -> Task<Result<()>> {
1073        if self.status.is_offline() {
1074            return Task::ready(Err(anyhow!("room is offline")));
1075        }
1076
1077        let client = self.client.clone();
1078        let room_id = self.id;
1079        let location = if let Some(project) = project {
1080            self.local_participant.active_project = Some(project.downgrade());
1081            if let Some(project_id) = project.read(cx).remote_id() {
1082                proto::participant_location::Variant::SharedProject(
1083                    proto::participant_location::SharedProject { id: project_id },
1084                )
1085            } else {
1086                proto::participant_location::Variant::UnsharedProject(
1087                    proto::participant_location::UnsharedProject {},
1088                )
1089            }
1090        } else {
1091            self.local_participant.active_project = None;
1092            proto::participant_location::Variant::External(proto::participant_location::External {})
1093        };
1094
1095        cx.notify();
1096        cx.foreground().spawn(async move {
1097            client
1098                .request(proto::UpdateParticipantLocation {
1099                    room_id,
1100                    location: Some(proto::ParticipantLocation {
1101                        variant: Some(location),
1102                    }),
1103                })
1104                .await?;
1105            Ok(())
1106        })
1107    }
1108
1109    pub fn is_screen_sharing(&self) -> bool {
1110        self.live_kit.as_ref().map_or(false, |live_kit| {
1111            !matches!(live_kit.screen_track, LocalTrack::None)
1112        })
1113    }
1114
1115    pub fn is_sharing_mic(&self) -> bool {
1116        self.live_kit.as_ref().map_or(false, |live_kit| {
1117            !matches!(live_kit.microphone_track, LocalTrack::None)
1118        })
1119    }
1120
1121    pub fn is_muted(&self, cx: &AppContext) -> bool {
1122        self.live_kit
1123            .as_ref()
1124            .and_then(|live_kit| match &live_kit.microphone_track {
1125                LocalTrack::None => Some(settings::get::<CallSettings>(cx).mute_on_join),
1126                LocalTrack::Pending { muted, .. } => Some(*muted),
1127                LocalTrack::Published { muted, .. } => Some(*muted),
1128            })
1129            .unwrap_or(false)
1130    }
1131
1132    pub fn is_speaking(&self) -> bool {
1133        self.live_kit
1134            .as_ref()
1135            .map_or(false, |live_kit| live_kit.speaking)
1136    }
1137
1138    pub fn is_deafened(&self) -> Option<bool> {
1139        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1140    }
1141
1142    #[track_caller]
1143    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1144        if self.status.is_offline() {
1145            return Task::ready(Err(anyhow!("room is offline")));
1146        } else if self.is_sharing_mic() {
1147            return Task::ready(Err(anyhow!("microphone was already shared")));
1148        }
1149
1150        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1151            let publish_id = post_inc(&mut live_kit.next_publish_id);
1152            live_kit.microphone_track = LocalTrack::Pending {
1153                publish_id,
1154                muted: false,
1155            };
1156            cx.notify();
1157            publish_id
1158        } else {
1159            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1160        };
1161
1162        cx.spawn_weak(|this, mut cx| async move {
1163            let publish_track = async {
1164                let track = LocalAudioTrack::create();
1165                this.upgrade(&cx)
1166                    .ok_or_else(|| anyhow!("room was dropped"))?
1167                    .read_with(&cx, |this, _| {
1168                        this.live_kit
1169                            .as_ref()
1170                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1171                    })
1172                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1173                    .await
1174            };
1175
1176            let publication = publish_track.await;
1177            this.upgrade(&cx)
1178                .ok_or_else(|| anyhow!("room was dropped"))?
1179                .update(&mut cx, |this, cx| {
1180                    let live_kit = this
1181                        .live_kit
1182                        .as_mut()
1183                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1184
1185                    let (canceled, muted) = if let LocalTrack::Pending {
1186                        publish_id: cur_publish_id,
1187                        muted,
1188                    } = &live_kit.microphone_track
1189                    {
1190                        (*cur_publish_id != publish_id, *muted)
1191                    } else {
1192                        (true, false)
1193                    };
1194
1195                    match publication {
1196                        Ok(publication) => {
1197                            if canceled {
1198                                live_kit.room.unpublish_track(publication);
1199                            } else {
1200                                if muted {
1201                                    cx.background().spawn(publication.set_mute(muted)).detach();
1202                                }
1203                                live_kit.microphone_track = LocalTrack::Published {
1204                                    track_publication: publication,
1205                                    muted,
1206                                };
1207                                cx.notify();
1208                            }
1209                            Ok(())
1210                        }
1211                        Err(error) => {
1212                            if canceled {
1213                                Ok(())
1214                            } else {
1215                                live_kit.microphone_track = LocalTrack::None;
1216                                cx.notify();
1217                                Err(error)
1218                            }
1219                        }
1220                    }
1221                })
1222        })
1223    }
1224
1225    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1226        if self.status.is_offline() {
1227            return Task::ready(Err(anyhow!("room is offline")));
1228        } else if self.is_screen_sharing() {
1229            return Task::ready(Err(anyhow!("screen was already shared")));
1230        }
1231
1232        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1233            let publish_id = post_inc(&mut live_kit.next_publish_id);
1234            live_kit.screen_track = LocalTrack::Pending {
1235                publish_id,
1236                muted: false,
1237            };
1238            cx.notify();
1239            (live_kit.room.display_sources(), publish_id)
1240        } else {
1241            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1242        };
1243
1244        cx.spawn_weak(|this, mut cx| async move {
1245            let publish_track = async {
1246                let displays = displays.await?;
1247                let display = displays
1248                    .first()
1249                    .ok_or_else(|| anyhow!("no display found"))?;
1250                let track = LocalVideoTrack::screen_share_for_display(&display);
1251                this.upgrade(&cx)
1252                    .ok_or_else(|| anyhow!("room was dropped"))?
1253                    .read_with(&cx, |this, _| {
1254                        this.live_kit
1255                            .as_ref()
1256                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1257                    })
1258                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1259                    .await
1260            };
1261
1262            let publication = publish_track.await;
1263            this.upgrade(&cx)
1264                .ok_or_else(|| anyhow!("room was dropped"))?
1265                .update(&mut cx, |this, cx| {
1266                    let live_kit = this
1267                        .live_kit
1268                        .as_mut()
1269                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1270
1271                    let (canceled, muted) = if let LocalTrack::Pending {
1272                        publish_id: cur_publish_id,
1273                        muted,
1274                    } = &live_kit.screen_track
1275                    {
1276                        (*cur_publish_id != publish_id, *muted)
1277                    } else {
1278                        (true, false)
1279                    };
1280
1281                    match publication {
1282                        Ok(publication) => {
1283                            if canceled {
1284                                live_kit.room.unpublish_track(publication);
1285                            } else {
1286                                if muted {
1287                                    cx.background().spawn(publication.set_mute(muted)).detach();
1288                                }
1289                                live_kit.screen_track = LocalTrack::Published {
1290                                    track_publication: publication,
1291                                    muted,
1292                                };
1293                                cx.notify();
1294                            }
1295
1296                            Audio::play_sound(Sound::StartScreenshare, cx);
1297
1298                            Ok(())
1299                        }
1300                        Err(error) => {
1301                            if canceled {
1302                                Ok(())
1303                            } else {
1304                                live_kit.screen_track = LocalTrack::None;
1305                                cx.notify();
1306                                Err(error)
1307                            }
1308                        }
1309                    }
1310                })
1311        })
1312    }
1313
1314    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1315        let should_mute = !self.is_muted(cx);
1316        if let Some(live_kit) = self.live_kit.as_mut() {
1317            if matches!(live_kit.microphone_track, LocalTrack::None) {
1318                return Ok(self.share_microphone(cx));
1319            }
1320
1321            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1322            live_kit.muted_by_user = should_mute;
1323
1324            if old_muted == true && live_kit.deafened == true {
1325                if let Some(task) = self.toggle_deafen(cx).ok() {
1326                    task.detach();
1327                }
1328            }
1329
1330            Ok(ret_task)
1331        } else {
1332            Err(anyhow!("LiveKit not started"))
1333        }
1334    }
1335
1336    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1337        if let Some(live_kit) = self.live_kit.as_mut() {
1338            (*live_kit).deafened = !live_kit.deafened;
1339
1340            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1341            // Context notification is sent within set_mute itself.
1342            let mut mute_task = None;
1343            // When deafening, mute user's mic as well.
1344            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1345            if live_kit.deafened || !live_kit.muted_by_user {
1346                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1347            };
1348            for participant in self.remote_participants.values() {
1349                for track in live_kit
1350                    .room
1351                    .remote_audio_track_publications(&participant.user.id.to_string())
1352                {
1353                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1354                }
1355            }
1356
1357            Ok(cx.foreground().spawn(async move {
1358                if let Some(mute_task) = mute_task {
1359                    mute_task.await?;
1360                }
1361                for task in tasks {
1362                    task.await?;
1363                }
1364                Ok(())
1365            }))
1366        } else {
1367            Err(anyhow!("LiveKit not started"))
1368        }
1369    }
1370
1371    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1372        if self.status.is_offline() {
1373            return Err(anyhow!("room is offline"));
1374        }
1375
1376        let live_kit = self
1377            .live_kit
1378            .as_mut()
1379            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1380        match mem::take(&mut live_kit.screen_track) {
1381            LocalTrack::None => Err(anyhow!("screen was not shared")),
1382            LocalTrack::Pending { .. } => {
1383                cx.notify();
1384                Ok(())
1385            }
1386            LocalTrack::Published {
1387                track_publication, ..
1388            } => {
1389                live_kit.room.unpublish_track(track_publication);
1390                cx.notify();
1391
1392                Audio::play_sound(Sound::StopScreenshare, cx);
1393                Ok(())
1394            }
1395        }
1396    }
1397
1398    #[cfg(any(test, feature = "test-support"))]
1399    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1400        self.live_kit
1401            .as_ref()
1402            .unwrap()
1403            .room
1404            .set_display_sources(sources);
1405    }
1406}
1407
1408struct LiveKitRoom {
1409    room: Arc<live_kit_client::Room>,
1410    screen_track: LocalTrack,
1411    microphone_track: LocalTrack,
1412    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1413    muted_by_user: bool,
1414    deafened: bool,
1415    speaking: bool,
1416    next_publish_id: usize,
1417    _maintain_room: Task<()>,
1418    _maintain_tracks: [Task<()>; 2],
1419}
1420
1421impl LiveKitRoom {
1422    fn set_mute(
1423        self: &mut LiveKitRoom,
1424        should_mute: bool,
1425        cx: &mut ModelContext<Room>,
1426    ) -> Result<(Task<Result<()>>, bool)> {
1427        if !should_mute {
1428            // clear user muting state.
1429            self.muted_by_user = false;
1430        }
1431
1432        let (result, old_muted) = match &mut self.microphone_track {
1433            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1434            LocalTrack::Pending { muted, .. } => {
1435                let old_muted = *muted;
1436                *muted = should_mute;
1437                cx.notify();
1438                Ok((Task::Ready(Some(Ok(()))), old_muted))
1439            }
1440            LocalTrack::Published {
1441                track_publication,
1442                muted,
1443            } => {
1444                let old_muted = *muted;
1445                *muted = should_mute;
1446                cx.notify();
1447                Ok((
1448                    cx.background().spawn(track_publication.set_mute(*muted)),
1449                    old_muted,
1450                ))
1451            }
1452        }?;
1453
1454        if old_muted != should_mute {
1455            if should_mute {
1456                Audio::play_sound(Sound::Mute, cx);
1457            } else {
1458                Audio::play_sound(Sound::Unmute, cx);
1459            }
1460        }
1461
1462        Ok((result, old_muted))
1463    }
1464}
1465
1466enum LocalTrack {
1467    None,
1468    Pending {
1469        publish_id: usize,
1470        muted: bool,
1471    },
1472    Published {
1473        track_publication: LocalTrackPublication,
1474        muted: bool,
1475    },
1476}
1477
1478impl Default for LocalTrack {
1479    fn default() -> Self {
1480        Self::None
1481    }
1482}
1483
1484#[derive(Copy, Clone, PartialEq, Eq)]
1485pub enum RoomStatus {
1486    Online,
1487    Rejoining,
1488    Offline,
1489}
1490
1491impl RoomStatus {
1492    pub fn is_offline(&self) -> bool {
1493        matches!(self, RoomStatus::Offline)
1494    }
1495
1496    pub fn is_online(&self) -> bool {
1497        matches!(self, RoomStatus::Online)
1498    }
1499}