room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    Left,
  48}
  49
  50pub struct Room {
  51    id: u64,
  52    channel_id: Option<u64>,
  53    live_kit: Option<LiveKitRoom>,
  54    status: RoomStatus,
  55    shared_projects: HashSet<WeakModelHandle<Project>>,
  56    joined_projects: HashSet<WeakModelHandle<Project>>,
  57    local_participant: LocalParticipant,
  58    remote_participants: BTreeMap<u64, RemoteParticipant>,
  59    pending_participants: Vec<Arc<User>>,
  60    participant_user_ids: HashSet<u64>,
  61    pending_call_count: usize,
  62    leave_when_empty: bool,
  63    client: Arc<Client>,
  64    user_store: ModelHandle<UserStore>,
  65    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  66    subscriptions: Vec<client::Subscription>,
  67    pending_room_update: Option<Task<()>>,
  68    maintain_connection: Option<Task<Option<()>>>,
  69}
  70
  71impl Entity for Room {
  72    type Event = Event;
  73
  74    fn release(&mut self, cx: &mut AppContext) {
  75        if self.status.is_online() {
  76            self.leave_internal(cx).detach_and_log_err(cx);
  77        }
  78    }
  79
  80    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  81        if self.status.is_online() {
  82            let leave = self.leave_internal(cx);
  83            Some(
  84                cx.background()
  85                    .spawn(async move {
  86                        leave.await.log_err();
  87                    })
  88                    .boxed(),
  89            )
  90        } else {
  91            None
  92        }
  93    }
  94}
  95
  96impl Room {
  97    pub fn channel_id(&self) -> Option<u64> {
  98        self.channel_id
  99    }
 100
 101    #[cfg(any(test, feature = "test-support"))]
 102    pub fn is_connected(&self) -> bool {
 103        if let Some(live_kit) = self.live_kit.as_ref() {
 104            matches!(
 105                *live_kit.room.status().borrow(),
 106                live_kit_client::ConnectionState::Connected { .. }
 107            )
 108        } else {
 109            false
 110        }
 111    }
 112
 113    fn new(
 114        id: u64,
 115        channel_id: Option<u64>,
 116        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 117        client: Arc<Client>,
 118        user_store: ModelHandle<UserStore>,
 119        cx: &mut ModelContext<Self>,
 120    ) -> Self {
 121        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 122            let room = live_kit_client::Room::new();
 123            let mut status = room.status();
 124            // Consume the initial status of the room.
 125            let _ = status.try_recv();
 126            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 127                while let Some(status) = status.next().await {
 128                    let this = if let Some(this) = this.upgrade(&cx) {
 129                        this
 130                    } else {
 131                        break;
 132                    };
 133
 134                    if status == live_kit_client::ConnectionState::Disconnected {
 135                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 136                        break;
 137                    }
 138                }
 139            });
 140
 141            let mut track_video_changes = room.remote_video_track_updates();
 142            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 143                while let Some(track_change) = track_video_changes.next().await {
 144                    let this = if let Some(this) = this.upgrade(&cx) {
 145                        this
 146                    } else {
 147                        break;
 148                    };
 149
 150                    this.update(&mut cx, |this, cx| {
 151                        this.remote_video_track_updated(track_change, cx).log_err()
 152                    });
 153                }
 154            });
 155
 156            let mut track_audio_changes = room.remote_audio_track_updates();
 157            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 158                while let Some(track_change) = track_audio_changes.next().await {
 159                    let this = if let Some(this) = this.upgrade(&cx) {
 160                        this
 161                    } else {
 162                        break;
 163                    };
 164
 165                    this.update(&mut cx, |this, cx| {
 166                        this.remote_audio_track_updated(track_change, cx).log_err()
 167                    });
 168                }
 169            });
 170
 171            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 172            cx.spawn(|this, mut cx| async move {
 173                connect.await?;
 174
 175                if !cx.read(Self::mute_on_join) {
 176                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 177                        .await?;
 178                }
 179
 180                anyhow::Ok(())
 181            })
 182            .detach_and_log_err(cx);
 183
 184            Some(LiveKitRoom {
 185                room,
 186                screen_track: LocalTrack::None,
 187                microphone_track: LocalTrack::None,
 188                next_publish_id: 0,
 189                muted_by_user: false,
 190                deafened: false,
 191                speaking: false,
 192                _maintain_room,
 193                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 194            })
 195        } else {
 196            None
 197        };
 198
 199        let maintain_connection =
 200            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 201
 202        Audio::play_sound(Sound::Joined, cx);
 203
 204        Self {
 205            id,
 206            channel_id,
 207            live_kit: live_kit_room,
 208            status: RoomStatus::Online,
 209            shared_projects: Default::default(),
 210            joined_projects: Default::default(),
 211            participant_user_ids: Default::default(),
 212            local_participant: Default::default(),
 213            remote_participants: Default::default(),
 214            pending_participants: Default::default(),
 215            pending_call_count: 0,
 216            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 217            leave_when_empty: false,
 218            pending_room_update: None,
 219            client,
 220            user_store,
 221            follows_by_leader_id_project_id: Default::default(),
 222            maintain_connection: Some(maintain_connection),
 223        }
 224    }
 225
 226    pub(crate) fn create(
 227        called_user_id: u64,
 228        initial_project: Option<ModelHandle<Project>>,
 229        client: Arc<Client>,
 230        user_store: ModelHandle<UserStore>,
 231        cx: &mut AppContext,
 232    ) -> Task<Result<ModelHandle<Self>>> {
 233        cx.spawn(|mut cx| async move {
 234            let response = client.request(proto::CreateRoom {}).await?;
 235            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 236            let room = cx.add_model(|cx| {
 237                Self::new(
 238                    room_proto.id,
 239                    None,
 240                    response.live_kit_connection_info,
 241                    client,
 242                    user_store,
 243                    cx,
 244                )
 245            });
 246
 247            let initial_project_id = if let Some(initial_project) = initial_project {
 248                let initial_project_id = room
 249                    .update(&mut cx, |room, cx| {
 250                        room.share_project(initial_project.clone(), cx)
 251                    })
 252                    .await?;
 253                Some(initial_project_id)
 254            } else {
 255                None
 256            };
 257
 258            match room
 259                .update(&mut cx, |room, cx| {
 260                    room.leave_when_empty = true;
 261                    room.call(called_user_id, initial_project_id, cx)
 262                })
 263                .await
 264            {
 265                Ok(()) => Ok(room),
 266                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 267            }
 268        })
 269    }
 270
 271    pub(crate) fn join_channel(
 272        channel_id: u64,
 273        client: Arc<Client>,
 274        user_store: ModelHandle<UserStore>,
 275        cx: &mut AppContext,
 276    ) -> Task<Result<ModelHandle<Self>>> {
 277        cx.spawn(|cx| async move {
 278            Self::from_join_response(
 279                client.request(proto::JoinChannel { channel_id }).await?,
 280                client,
 281                user_store,
 282                cx,
 283            )
 284        })
 285    }
 286
 287    pub(crate) fn join(
 288        call: &IncomingCall,
 289        client: Arc<Client>,
 290        user_store: ModelHandle<UserStore>,
 291        cx: &mut AppContext,
 292    ) -> Task<Result<ModelHandle<Self>>> {
 293        let id = call.room_id;
 294        cx.spawn(|cx| async move {
 295            Self::from_join_response(
 296                client.request(proto::JoinRoom { id }).await?,
 297                client,
 298                user_store,
 299                cx,
 300            )
 301        })
 302    }
 303
 304    pub fn mute_on_join(cx: &AppContext) -> bool {
 305        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 306    }
 307
 308    fn from_join_response(
 309        response: proto::JoinRoomResponse,
 310        client: Arc<Client>,
 311        user_store: ModelHandle<UserStore>,
 312        mut cx: AsyncAppContext,
 313    ) -> Result<ModelHandle<Self>> {
 314        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 315        let room = cx.add_model(|cx| {
 316            Self::new(
 317                room_proto.id,
 318                response.channel_id,
 319                response.live_kit_connection_info,
 320                client,
 321                user_store,
 322                cx,
 323            )
 324        });
 325        room.update(&mut cx, |room, cx| {
 326            room.leave_when_empty = room.channel_id.is_none();
 327            room.apply_room_update(room_proto, cx)?;
 328            anyhow::Ok(())
 329        })?;
 330        Ok(room)
 331    }
 332
 333    fn should_leave(&self) -> bool {
 334        self.leave_when_empty
 335            && self.pending_room_update.is_none()
 336            && self.pending_participants.is_empty()
 337            && self.remote_participants.is_empty()
 338            && self.pending_call_count == 0
 339    }
 340
 341    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 342        cx.notify();
 343        cx.emit(Event::Left);
 344        self.leave_internal(cx)
 345    }
 346
 347    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 348        if self.status.is_offline() {
 349            return Task::ready(Err(anyhow!("room is offline")));
 350        }
 351
 352        log::info!("leaving room");
 353        Audio::play_sound(Sound::Leave, cx);
 354
 355        self.clear_state(cx);
 356
 357        let leave_room = self.client.request(proto::LeaveRoom {});
 358        cx.background().spawn(async move {
 359            leave_room.await?;
 360            anyhow::Ok(())
 361        })
 362    }
 363
 364    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 365        for project in self.shared_projects.drain() {
 366            if let Some(project) = project.upgrade(cx) {
 367                project.update(cx, |project, cx| {
 368                    project.unshare(cx).log_err();
 369                });
 370            }
 371        }
 372        for project in self.joined_projects.drain() {
 373            if let Some(project) = project.upgrade(cx) {
 374                project.update(cx, |project, cx| {
 375                    project.disconnected_from_host(cx);
 376                    project.close(cx);
 377                });
 378            }
 379        }
 380
 381        self.status = RoomStatus::Offline;
 382        self.remote_participants.clear();
 383        self.pending_participants.clear();
 384        self.participant_user_ids.clear();
 385        self.subscriptions.clear();
 386        self.live_kit.take();
 387        self.pending_room_update.take();
 388        self.maintain_connection.take();
 389    }
 390
 391    async fn maintain_connection(
 392        this: WeakModelHandle<Self>,
 393        client: Arc<Client>,
 394        mut cx: AsyncAppContext,
 395    ) -> Result<()> {
 396        let mut client_status = client.status();
 397        loop {
 398            let _ = client_status.try_recv();
 399            let is_connected = client_status.borrow().is_connected();
 400            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 401            if !is_connected || client_status.next().await.is_some() {
 402                log::info!("detected client disconnection");
 403
 404                this.upgrade(&cx)
 405                    .ok_or_else(|| anyhow!("room was dropped"))?
 406                    .update(&mut cx, |this, cx| {
 407                        this.status = RoomStatus::Rejoining;
 408                        cx.notify();
 409                    });
 410
 411                // Wait for client to re-establish a connection to the server.
 412                {
 413                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 414                    let client_reconnection = async {
 415                        let mut remaining_attempts = 3;
 416                        while remaining_attempts > 0 {
 417                            if client_status.borrow().is_connected() {
 418                                log::info!("client reconnected, attempting to rejoin room");
 419
 420                                let Some(this) = this.upgrade(&cx) else { break };
 421                                if this
 422                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 423                                    .await
 424                                    .log_err()
 425                                    .is_some()
 426                                {
 427                                    return true;
 428                                } else {
 429                                    remaining_attempts -= 1;
 430                                }
 431                            } else if client_status.borrow().is_signed_out() {
 432                                return false;
 433                            }
 434
 435                            log::info!(
 436                                "waiting for client status change, remaining attempts {}",
 437                                remaining_attempts
 438                            );
 439                            client_status.next().await;
 440                        }
 441                        false
 442                    }
 443                    .fuse();
 444                    futures::pin_mut!(client_reconnection);
 445
 446                    futures::select_biased! {
 447                        reconnected = client_reconnection => {
 448                            if reconnected {
 449                                log::info!("successfully reconnected to room");
 450                                // If we successfully joined the room, go back around the loop
 451                                // waiting for future connection status changes.
 452                                continue;
 453                            }
 454                        }
 455                        _ = reconnection_timeout => {
 456                            log::info!("room reconnection timeout expired");
 457                        }
 458                    }
 459                }
 460
 461                break;
 462            }
 463        }
 464
 465        // The client failed to re-establish a connection to the server
 466        // or an error occurred while trying to re-join the room. Either way
 467        // we leave the room and return an error.
 468        if let Some(this) = this.upgrade(&cx) {
 469            log::info!("reconnection failed, leaving room");
 470            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 471        }
 472        Err(anyhow!(
 473            "can't reconnect to room: client failed to re-establish connection"
 474        ))
 475    }
 476
 477    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 478        let mut projects = HashMap::default();
 479        let mut reshared_projects = Vec::new();
 480        let mut rejoined_projects = Vec::new();
 481        self.shared_projects.retain(|project| {
 482            if let Some(handle) = project.upgrade(cx) {
 483                let project = handle.read(cx);
 484                if let Some(project_id) = project.remote_id() {
 485                    projects.insert(project_id, handle.clone());
 486                    reshared_projects.push(proto::UpdateProject {
 487                        project_id,
 488                        worktrees: project.worktree_metadata_protos(cx),
 489                    });
 490                    return true;
 491                }
 492            }
 493            false
 494        });
 495        self.joined_projects.retain(|project| {
 496            if let Some(handle) = project.upgrade(cx) {
 497                let project = handle.read(cx);
 498                if let Some(project_id) = project.remote_id() {
 499                    projects.insert(project_id, handle.clone());
 500                    rejoined_projects.push(proto::RejoinProject {
 501                        id: project_id,
 502                        worktrees: project
 503                            .worktrees(cx)
 504                            .map(|worktree| {
 505                                let worktree = worktree.read(cx);
 506                                proto::RejoinWorktree {
 507                                    id: worktree.id().to_proto(),
 508                                    scan_id: worktree.completed_scan_id() as u64,
 509                                }
 510                            })
 511                            .collect(),
 512                    });
 513                }
 514                return true;
 515            }
 516            false
 517        });
 518
 519        let response = self.client.request_envelope(proto::RejoinRoom {
 520            id: self.id,
 521            reshared_projects,
 522            rejoined_projects,
 523        });
 524
 525        cx.spawn(|this, mut cx| async move {
 526            let response = response.await?;
 527            let message_id = response.message_id;
 528            let response = response.payload;
 529            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 530            this.update(&mut cx, |this, cx| {
 531                this.status = RoomStatus::Online;
 532                this.apply_room_update(room_proto, cx)?;
 533
 534                for reshared_project in response.reshared_projects {
 535                    if let Some(project) = projects.get(&reshared_project.id) {
 536                        project.update(cx, |project, cx| {
 537                            project.reshared(reshared_project, cx).log_err();
 538                        });
 539                    }
 540                }
 541
 542                for rejoined_project in response.rejoined_projects {
 543                    if let Some(project) = projects.get(&rejoined_project.id) {
 544                        project.update(cx, |project, cx| {
 545                            project.rejoined(rejoined_project, message_id, cx).log_err();
 546                        });
 547                    }
 548                }
 549
 550                anyhow::Ok(())
 551            })
 552        })
 553    }
 554
 555    pub fn id(&self) -> u64 {
 556        self.id
 557    }
 558
 559    pub fn status(&self) -> RoomStatus {
 560        self.status
 561    }
 562
 563    pub fn local_participant(&self) -> &LocalParticipant {
 564        &self.local_participant
 565    }
 566
 567    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 568        &self.remote_participants
 569    }
 570
 571    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 572        self.remote_participants
 573            .values()
 574            .find(|p| p.peer_id == peer_id)
 575    }
 576
 577    pub fn pending_participants(&self) -> &[Arc<User>] {
 578        &self.pending_participants
 579    }
 580
 581    pub fn contains_participant(&self, user_id: u64) -> bool {
 582        self.participant_user_ids.contains(&user_id)
 583    }
 584
 585    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 586        self.follows_by_leader_id_project_id
 587            .get(&(leader_id, project_id))
 588            .map_or(&[], |v| v.as_slice())
 589    }
 590
 591    async fn handle_room_updated(
 592        this: ModelHandle<Self>,
 593        envelope: TypedEnvelope<proto::RoomUpdated>,
 594        _: Arc<Client>,
 595        mut cx: AsyncAppContext,
 596    ) -> Result<()> {
 597        let room = envelope
 598            .payload
 599            .room
 600            .ok_or_else(|| anyhow!("invalid room"))?;
 601        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 602    }
 603
 604    fn apply_room_update(
 605        &mut self,
 606        mut room: proto::Room,
 607        cx: &mut ModelContext<Self>,
 608    ) -> Result<()> {
 609        // Filter ourselves out from the room's participants.
 610        let local_participant_ix = room
 611            .participants
 612            .iter()
 613            .position(|participant| Some(participant.user_id) == self.client.user_id());
 614        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 615
 616        let pending_participant_user_ids = room
 617            .pending_participants
 618            .iter()
 619            .map(|p| p.user_id)
 620            .collect::<Vec<_>>();
 621
 622        let remote_participant_user_ids = room
 623            .participants
 624            .iter()
 625            .map(|p| p.user_id)
 626            .collect::<Vec<_>>();
 627
 628        let (remote_participants, pending_participants) =
 629            self.user_store.update(cx, move |user_store, cx| {
 630                (
 631                    user_store.get_users(remote_participant_user_ids, cx),
 632                    user_store.get_users(pending_participant_user_ids, cx),
 633                )
 634            });
 635
 636        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 637            let (remote_participants, pending_participants) =
 638                futures::join!(remote_participants, pending_participants);
 639
 640            this.update(&mut cx, |this, cx| {
 641                this.participant_user_ids.clear();
 642
 643                if let Some(participant) = local_participant {
 644                    this.local_participant.projects = participant.projects;
 645                } else {
 646                    this.local_participant.projects.clear();
 647                }
 648
 649                if let Some(participants) = remote_participants.log_err() {
 650                    for (participant, user) in room.participants.into_iter().zip(participants) {
 651                        let Some(peer_id) = participant.peer_id else {
 652                            continue;
 653                        };
 654                        this.participant_user_ids.insert(participant.user_id);
 655
 656                        let old_projects = this
 657                            .remote_participants
 658                            .get(&participant.user_id)
 659                            .into_iter()
 660                            .flat_map(|existing| &existing.projects)
 661                            .map(|project| project.id)
 662                            .collect::<HashSet<_>>();
 663                        let new_projects = participant
 664                            .projects
 665                            .iter()
 666                            .map(|project| project.id)
 667                            .collect::<HashSet<_>>();
 668
 669                        for project in &participant.projects {
 670                            if !old_projects.contains(&project.id) {
 671                                cx.emit(Event::RemoteProjectShared {
 672                                    owner: user.clone(),
 673                                    project_id: project.id,
 674                                    worktree_root_names: project.worktree_root_names.clone(),
 675                                });
 676                            }
 677                        }
 678
 679                        for unshared_project_id in old_projects.difference(&new_projects) {
 680                            this.joined_projects.retain(|project| {
 681                                if let Some(project) = project.upgrade(cx) {
 682                                    project.update(cx, |project, cx| {
 683                                        if project.remote_id() == Some(*unshared_project_id) {
 684                                            project.disconnected_from_host(cx);
 685                                            false
 686                                        } else {
 687                                            true
 688                                        }
 689                                    })
 690                                } else {
 691                                    false
 692                                }
 693                            });
 694                            cx.emit(Event::RemoteProjectUnshared {
 695                                project_id: *unshared_project_id,
 696                            });
 697                        }
 698
 699                        let location = ParticipantLocation::from_proto(participant.location)
 700                            .unwrap_or(ParticipantLocation::External);
 701                        if let Some(remote_participant) =
 702                            this.remote_participants.get_mut(&participant.user_id)
 703                        {
 704                            remote_participant.projects = participant.projects;
 705                            remote_participant.peer_id = peer_id;
 706                            if location != remote_participant.location {
 707                                remote_participant.location = location;
 708                                cx.emit(Event::ParticipantLocationChanged {
 709                                    participant_id: peer_id,
 710                                });
 711                            }
 712                        } else {
 713                            this.remote_participants.insert(
 714                                participant.user_id,
 715                                RemoteParticipant {
 716                                    user: user.clone(),
 717                                    participant_index: ParticipantIndex(
 718                                        participant.participant_index,
 719                                    ),
 720                                    peer_id,
 721                                    projects: participant.projects,
 722                                    location,
 723                                    muted: true,
 724                                    speaking: false,
 725                                    video_tracks: Default::default(),
 726                                    audio_tracks: Default::default(),
 727                                },
 728                            );
 729
 730                            Audio::play_sound(Sound::Joined, cx);
 731
 732                            if let Some(live_kit) = this.live_kit.as_ref() {
 733                                let video_tracks =
 734                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 735                                let audio_tracks =
 736                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 737                                let publications = live_kit
 738                                    .room
 739                                    .remote_audio_track_publications(&user.id.to_string());
 740
 741                                for track in video_tracks {
 742                                    this.remote_video_track_updated(
 743                                        RemoteVideoTrackUpdate::Subscribed(track),
 744                                        cx,
 745                                    )
 746                                    .log_err();
 747                                }
 748
 749                                for (track, publication) in
 750                                    audio_tracks.iter().zip(publications.iter())
 751                                {
 752                                    this.remote_audio_track_updated(
 753                                        RemoteAudioTrackUpdate::Subscribed(
 754                                            track.clone(),
 755                                            publication.clone(),
 756                                        ),
 757                                        cx,
 758                                    )
 759                                    .log_err();
 760                                }
 761                            }
 762                        }
 763                    }
 764
 765                    this.remote_participants.retain(|user_id, participant| {
 766                        if this.participant_user_ids.contains(user_id) {
 767                            true
 768                        } else {
 769                            for project in &participant.projects {
 770                                cx.emit(Event::RemoteProjectUnshared {
 771                                    project_id: project.id,
 772                                });
 773                            }
 774                            false
 775                        }
 776                    });
 777                }
 778
 779                if let Some(pending_participants) = pending_participants.log_err() {
 780                    this.pending_participants = pending_participants;
 781                    for participant in &this.pending_participants {
 782                        this.participant_user_ids.insert(participant.id);
 783                    }
 784                }
 785
 786                this.follows_by_leader_id_project_id.clear();
 787                for follower in room.followers {
 788                    let project_id = follower.project_id;
 789                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 790                        (Some(leader), Some(follower)) => (leader, follower),
 791
 792                        _ => {
 793                            log::error!("Follower message {follower:?} missing some state");
 794                            continue;
 795                        }
 796                    };
 797
 798                    let list = this
 799                        .follows_by_leader_id_project_id
 800                        .entry((leader, project_id))
 801                        .or_insert(Vec::new());
 802                    if !list.contains(&follower) {
 803                        list.push(follower);
 804                    }
 805                }
 806
 807                this.pending_room_update.take();
 808                if this.should_leave() {
 809                    log::info!("room is empty, leaving");
 810                    let _ = this.leave(cx);
 811                }
 812
 813                this.user_store.update(cx, |user_store, cx| {
 814                    let participant_indices_by_user_id = this
 815                        .remote_participants
 816                        .iter()
 817                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 818                        .collect();
 819                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 820                });
 821
 822                this.check_invariants();
 823                cx.notify();
 824            });
 825        }));
 826
 827        cx.notify();
 828        Ok(())
 829    }
 830
 831    fn remote_video_track_updated(
 832        &mut self,
 833        change: RemoteVideoTrackUpdate,
 834        cx: &mut ModelContext<Self>,
 835    ) -> Result<()> {
 836        match change {
 837            RemoteVideoTrackUpdate::Subscribed(track) => {
 838                let user_id = track.publisher_id().parse()?;
 839                let track_id = track.sid().to_string();
 840                let participant = self
 841                    .remote_participants
 842                    .get_mut(&user_id)
 843                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 844                participant.video_tracks.insert(
 845                    track_id.clone(),
 846                    Arc::new(RemoteVideoTrack {
 847                        live_kit_track: track,
 848                    }),
 849                );
 850                cx.emit(Event::RemoteVideoTracksChanged {
 851                    participant_id: participant.peer_id,
 852                });
 853            }
 854            RemoteVideoTrackUpdate::Unsubscribed {
 855                publisher_id,
 856                track_id,
 857            } => {
 858                let user_id = publisher_id.parse()?;
 859                let participant = self
 860                    .remote_participants
 861                    .get_mut(&user_id)
 862                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 863                participant.video_tracks.remove(&track_id);
 864                cx.emit(Event::RemoteVideoTracksChanged {
 865                    participant_id: participant.peer_id,
 866                });
 867            }
 868        }
 869
 870        cx.notify();
 871        Ok(())
 872    }
 873
 874    fn remote_audio_track_updated(
 875        &mut self,
 876        change: RemoteAudioTrackUpdate,
 877        cx: &mut ModelContext<Self>,
 878    ) -> Result<()> {
 879        match change {
 880            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 881                let mut speaker_ids = speakers
 882                    .into_iter()
 883                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 884                    .collect::<Vec<u64>>();
 885                speaker_ids.sort_unstable();
 886                for (sid, participant) in &mut self.remote_participants {
 887                    if let Ok(_) = speaker_ids.binary_search(sid) {
 888                        participant.speaking = true;
 889                    } else {
 890                        participant.speaking = false;
 891                    }
 892                }
 893                if let Some(id) = self.client.user_id() {
 894                    if let Some(room) = &mut self.live_kit {
 895                        if let Ok(_) = speaker_ids.binary_search(&id) {
 896                            room.speaking = true;
 897                        } else {
 898                            room.speaking = false;
 899                        }
 900                    }
 901                }
 902                cx.notify();
 903            }
 904            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 905                let mut found = false;
 906                for participant in &mut self.remote_participants.values_mut() {
 907                    for track in participant.audio_tracks.values() {
 908                        if track.sid() == track_id {
 909                            found = true;
 910                            break;
 911                        }
 912                    }
 913                    if found {
 914                        participant.muted = muted;
 915                        break;
 916                    }
 917                }
 918
 919                cx.notify();
 920            }
 921            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 922                let user_id = track.publisher_id().parse()?;
 923                let track_id = track.sid().to_string();
 924                let participant = self
 925                    .remote_participants
 926                    .get_mut(&user_id)
 927                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 928
 929                participant.audio_tracks.insert(track_id.clone(), track);
 930                participant.muted = publication.is_muted();
 931
 932                cx.emit(Event::RemoteAudioTracksChanged {
 933                    participant_id: participant.peer_id,
 934                });
 935            }
 936            RemoteAudioTrackUpdate::Unsubscribed {
 937                publisher_id,
 938                track_id,
 939            } => {
 940                let user_id = publisher_id.parse()?;
 941                let participant = self
 942                    .remote_participants
 943                    .get_mut(&user_id)
 944                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 945                participant.audio_tracks.remove(&track_id);
 946                cx.emit(Event::RemoteAudioTracksChanged {
 947                    participant_id: participant.peer_id,
 948                });
 949            }
 950        }
 951
 952        cx.notify();
 953        Ok(())
 954    }
 955
 956    fn check_invariants(&self) {
 957        #[cfg(any(test, feature = "test-support"))]
 958        {
 959            for participant in self.remote_participants.values() {
 960                assert!(self.participant_user_ids.contains(&participant.user.id));
 961                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 962            }
 963
 964            for participant in &self.pending_participants {
 965                assert!(self.participant_user_ids.contains(&participant.id));
 966                assert_ne!(participant.id, self.client.user_id().unwrap());
 967            }
 968
 969            assert_eq!(
 970                self.participant_user_ids.len(),
 971                self.remote_participants.len() + self.pending_participants.len()
 972            );
 973        }
 974    }
 975
 976    pub(crate) fn call(
 977        &mut self,
 978        called_user_id: u64,
 979        initial_project_id: Option<u64>,
 980        cx: &mut ModelContext<Self>,
 981    ) -> Task<Result<()>> {
 982        if self.status.is_offline() {
 983            return Task::ready(Err(anyhow!("room is offline")));
 984        }
 985
 986        cx.notify();
 987        let client = self.client.clone();
 988        let room_id = self.id;
 989        self.pending_call_count += 1;
 990        cx.spawn(|this, mut cx| async move {
 991            let result = client
 992                .request(proto::Call {
 993                    room_id,
 994                    called_user_id,
 995                    initial_project_id,
 996                })
 997                .await;
 998            this.update(&mut cx, |this, cx| {
 999                this.pending_call_count -= 1;
1000                if this.should_leave() {
1001                    this.leave(cx).detach_and_log_err(cx);
1002                }
1003            });
1004            result?;
1005            Ok(())
1006        })
1007    }
1008
1009    pub fn join_project(
1010        &mut self,
1011        id: u64,
1012        language_registry: Arc<LanguageRegistry>,
1013        fs: Arc<dyn Fs>,
1014        cx: &mut ModelContext<Self>,
1015    ) -> Task<Result<ModelHandle<Project>>> {
1016        let client = self.client.clone();
1017        let user_store = self.user_store.clone();
1018        cx.spawn(|this, mut cx| async move {
1019            let project =
1020                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1021
1022            this.update(&mut cx, |this, cx| {
1023                this.joined_projects.retain(|project| {
1024                    if let Some(project) = project.upgrade(cx) {
1025                        !project.read(cx).is_read_only()
1026                    } else {
1027                        false
1028                    }
1029                });
1030                this.joined_projects.insert(project.downgrade());
1031            });
1032            Ok(project)
1033        })
1034    }
1035
1036    pub(crate) fn share_project(
1037        &mut self,
1038        project: ModelHandle<Project>,
1039        cx: &mut ModelContext<Self>,
1040    ) -> Task<Result<u64>> {
1041        if let Some(project_id) = project.read(cx).remote_id() {
1042            return Task::ready(Ok(project_id));
1043        }
1044
1045        let request = self.client.request(proto::ShareProject {
1046            room_id: self.id(),
1047            worktrees: project.read(cx).worktree_metadata_protos(cx),
1048        });
1049        cx.spawn(|this, mut cx| async move {
1050            let response = request.await?;
1051
1052            project.update(&mut cx, |project, cx| {
1053                project.shared(response.project_id, cx)
1054            })?;
1055
1056            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1057            this.update(&mut cx, |this, cx| {
1058                this.shared_projects.insert(project.downgrade());
1059                let active_project = this.local_participant.active_project.as_ref();
1060                if active_project.map_or(false, |location| *location == project) {
1061                    this.set_location(Some(&project), cx)
1062                } else {
1063                    Task::ready(Ok(()))
1064                }
1065            })
1066            .await?;
1067
1068            Ok(response.project_id)
1069        })
1070    }
1071
1072    pub(crate) fn unshare_project(
1073        &mut self,
1074        project: ModelHandle<Project>,
1075        cx: &mut ModelContext<Self>,
1076    ) -> Result<()> {
1077        let project_id = match project.read(cx).remote_id() {
1078            Some(project_id) => project_id,
1079            None => return Ok(()),
1080        };
1081
1082        self.client.send(proto::UnshareProject { project_id })?;
1083        project.update(cx, |this, cx| this.unshare(cx))
1084    }
1085
1086    pub(crate) fn set_location(
1087        &mut self,
1088        project: Option<&ModelHandle<Project>>,
1089        cx: &mut ModelContext<Self>,
1090    ) -> Task<Result<()>> {
1091        if self.status.is_offline() {
1092            return Task::ready(Err(anyhow!("room is offline")));
1093        }
1094
1095        let client = self.client.clone();
1096        let room_id = self.id;
1097        let location = if let Some(project) = project {
1098            self.local_participant.active_project = Some(project.downgrade());
1099            if let Some(project_id) = project.read(cx).remote_id() {
1100                proto::participant_location::Variant::SharedProject(
1101                    proto::participant_location::SharedProject { id: project_id },
1102                )
1103            } else {
1104                proto::participant_location::Variant::UnsharedProject(
1105                    proto::participant_location::UnsharedProject {},
1106                )
1107            }
1108        } else {
1109            self.local_participant.active_project = None;
1110            proto::participant_location::Variant::External(proto::participant_location::External {})
1111        };
1112
1113        cx.notify();
1114        cx.foreground().spawn(async move {
1115            client
1116                .request(proto::UpdateParticipantLocation {
1117                    room_id,
1118                    location: Some(proto::ParticipantLocation {
1119                        variant: Some(location),
1120                    }),
1121                })
1122                .await?;
1123            Ok(())
1124        })
1125    }
1126
1127    pub fn is_screen_sharing(&self) -> bool {
1128        self.live_kit.as_ref().map_or(false, |live_kit| {
1129            !matches!(live_kit.screen_track, LocalTrack::None)
1130        })
1131    }
1132
1133    pub fn is_sharing_mic(&self) -> bool {
1134        self.live_kit.as_ref().map_or(false, |live_kit| {
1135            !matches!(live_kit.microphone_track, LocalTrack::None)
1136        })
1137    }
1138
1139    pub fn is_muted(&self, cx: &AppContext) -> bool {
1140        self.live_kit
1141            .as_ref()
1142            .and_then(|live_kit| match &live_kit.microphone_track {
1143                LocalTrack::None => Some(Self::mute_on_join(cx)),
1144                LocalTrack::Pending { muted, .. } => Some(*muted),
1145                LocalTrack::Published { muted, .. } => Some(*muted),
1146            })
1147            .unwrap_or(false)
1148    }
1149
1150    pub fn is_speaking(&self) -> bool {
1151        self.live_kit
1152            .as_ref()
1153            .map_or(false, |live_kit| live_kit.speaking)
1154    }
1155
1156    pub fn is_deafened(&self) -> Option<bool> {
1157        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1158    }
1159
1160    #[track_caller]
1161    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1162        if self.status.is_offline() {
1163            return Task::ready(Err(anyhow!("room is offline")));
1164        } else if self.is_sharing_mic() {
1165            return Task::ready(Err(anyhow!("microphone was already shared")));
1166        }
1167
1168        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1169            let publish_id = post_inc(&mut live_kit.next_publish_id);
1170            live_kit.microphone_track = LocalTrack::Pending {
1171                publish_id,
1172                muted: false,
1173            };
1174            cx.notify();
1175            publish_id
1176        } else {
1177            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1178        };
1179
1180        cx.spawn_weak(|this, mut cx| async move {
1181            let publish_track = async {
1182                let track = LocalAudioTrack::create();
1183                this.upgrade(&cx)
1184                    .ok_or_else(|| anyhow!("room was dropped"))?
1185                    .read_with(&cx, |this, _| {
1186                        this.live_kit
1187                            .as_ref()
1188                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1189                    })
1190                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1191                    .await
1192            };
1193
1194            let publication = publish_track.await;
1195            this.upgrade(&cx)
1196                .ok_or_else(|| anyhow!("room was dropped"))?
1197                .update(&mut cx, |this, cx| {
1198                    let live_kit = this
1199                        .live_kit
1200                        .as_mut()
1201                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1202
1203                    let (canceled, muted) = if let LocalTrack::Pending {
1204                        publish_id: cur_publish_id,
1205                        muted,
1206                    } = &live_kit.microphone_track
1207                    {
1208                        (*cur_publish_id != publish_id, *muted)
1209                    } else {
1210                        (true, false)
1211                    };
1212
1213                    match publication {
1214                        Ok(publication) => {
1215                            if canceled {
1216                                live_kit.room.unpublish_track(publication);
1217                            } else {
1218                                if muted {
1219                                    cx.background().spawn(publication.set_mute(muted)).detach();
1220                                }
1221                                live_kit.microphone_track = LocalTrack::Published {
1222                                    track_publication: publication,
1223                                    muted,
1224                                };
1225                                cx.notify();
1226                            }
1227                            Ok(())
1228                        }
1229                        Err(error) => {
1230                            if canceled {
1231                                Ok(())
1232                            } else {
1233                                live_kit.microphone_track = LocalTrack::None;
1234                                cx.notify();
1235                                Err(error)
1236                            }
1237                        }
1238                    }
1239                })
1240        })
1241    }
1242
1243    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1244        if self.status.is_offline() {
1245            return Task::ready(Err(anyhow!("room is offline")));
1246        } else if self.is_screen_sharing() {
1247            return Task::ready(Err(anyhow!("screen was already shared")));
1248        }
1249
1250        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1251            let publish_id = post_inc(&mut live_kit.next_publish_id);
1252            live_kit.screen_track = LocalTrack::Pending {
1253                publish_id,
1254                muted: false,
1255            };
1256            cx.notify();
1257            (live_kit.room.display_sources(), publish_id)
1258        } else {
1259            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1260        };
1261
1262        cx.spawn_weak(|this, mut cx| async move {
1263            let publish_track = async {
1264                let displays = displays.await?;
1265                let display = displays
1266                    .first()
1267                    .ok_or_else(|| anyhow!("no display found"))?;
1268                let track = LocalVideoTrack::screen_share_for_display(&display);
1269                this.upgrade(&cx)
1270                    .ok_or_else(|| anyhow!("room was dropped"))?
1271                    .read_with(&cx, |this, _| {
1272                        this.live_kit
1273                            .as_ref()
1274                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1275                    })
1276                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1277                    .await
1278            };
1279
1280            let publication = publish_track.await;
1281            this.upgrade(&cx)
1282                .ok_or_else(|| anyhow!("room was dropped"))?
1283                .update(&mut cx, |this, cx| {
1284                    let live_kit = this
1285                        .live_kit
1286                        .as_mut()
1287                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1288
1289                    let (canceled, muted) = if let LocalTrack::Pending {
1290                        publish_id: cur_publish_id,
1291                        muted,
1292                    } = &live_kit.screen_track
1293                    {
1294                        (*cur_publish_id != publish_id, *muted)
1295                    } else {
1296                        (true, false)
1297                    };
1298
1299                    match publication {
1300                        Ok(publication) => {
1301                            if canceled {
1302                                live_kit.room.unpublish_track(publication);
1303                            } else {
1304                                if muted {
1305                                    cx.background().spawn(publication.set_mute(muted)).detach();
1306                                }
1307                                live_kit.screen_track = LocalTrack::Published {
1308                                    track_publication: publication,
1309                                    muted,
1310                                };
1311                                cx.notify();
1312                            }
1313
1314                            Audio::play_sound(Sound::StartScreenshare, cx);
1315
1316                            Ok(())
1317                        }
1318                        Err(error) => {
1319                            if canceled {
1320                                Ok(())
1321                            } else {
1322                                live_kit.screen_track = LocalTrack::None;
1323                                cx.notify();
1324                                Err(error)
1325                            }
1326                        }
1327                    }
1328                })
1329        })
1330    }
1331
1332    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1333        let should_mute = !self.is_muted(cx);
1334        if let Some(live_kit) = self.live_kit.as_mut() {
1335            if matches!(live_kit.microphone_track, LocalTrack::None) {
1336                return Ok(self.share_microphone(cx));
1337            }
1338
1339            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1340            live_kit.muted_by_user = should_mute;
1341
1342            if old_muted == true && live_kit.deafened == true {
1343                if let Some(task) = self.toggle_deafen(cx).ok() {
1344                    task.detach();
1345                }
1346            }
1347
1348            Ok(ret_task)
1349        } else {
1350            Err(anyhow!("LiveKit not started"))
1351        }
1352    }
1353
1354    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1355        if let Some(live_kit) = self.live_kit.as_mut() {
1356            (*live_kit).deafened = !live_kit.deafened;
1357
1358            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1359            // Context notification is sent within set_mute itself.
1360            let mut mute_task = None;
1361            // When deafening, mute user's mic as well.
1362            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1363            if live_kit.deafened || !live_kit.muted_by_user {
1364                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1365            };
1366            for participant in self.remote_participants.values() {
1367                for track in live_kit
1368                    .room
1369                    .remote_audio_track_publications(&participant.user.id.to_string())
1370                {
1371                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1372                }
1373            }
1374
1375            Ok(cx.foreground().spawn(async move {
1376                if let Some(mute_task) = mute_task {
1377                    mute_task.await?;
1378                }
1379                for task in tasks {
1380                    task.await?;
1381                }
1382                Ok(())
1383            }))
1384        } else {
1385            Err(anyhow!("LiveKit not started"))
1386        }
1387    }
1388
1389    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1390        if self.status.is_offline() {
1391            return Err(anyhow!("room is offline"));
1392        }
1393
1394        let live_kit = self
1395            .live_kit
1396            .as_mut()
1397            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1398        match mem::take(&mut live_kit.screen_track) {
1399            LocalTrack::None => Err(anyhow!("screen was not shared")),
1400            LocalTrack::Pending { .. } => {
1401                cx.notify();
1402                Ok(())
1403            }
1404            LocalTrack::Published {
1405                track_publication, ..
1406            } => {
1407                live_kit.room.unpublish_track(track_publication);
1408                cx.notify();
1409
1410                Audio::play_sound(Sound::StopScreenshare, cx);
1411                Ok(())
1412            }
1413        }
1414    }
1415
1416    #[cfg(any(test, feature = "test-support"))]
1417    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1418        self.live_kit
1419            .as_ref()
1420            .unwrap()
1421            .room
1422            .set_display_sources(sources);
1423    }
1424}
1425
1426struct LiveKitRoom {
1427    room: Arc<live_kit_client::Room>,
1428    screen_track: LocalTrack,
1429    microphone_track: LocalTrack,
1430    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1431    muted_by_user: bool,
1432    deafened: bool,
1433    speaking: bool,
1434    next_publish_id: usize,
1435    _maintain_room: Task<()>,
1436    _maintain_tracks: [Task<()>; 2],
1437}
1438
1439impl LiveKitRoom {
1440    fn set_mute(
1441        self: &mut LiveKitRoom,
1442        should_mute: bool,
1443        cx: &mut ModelContext<Room>,
1444    ) -> Result<(Task<Result<()>>, bool)> {
1445        if !should_mute {
1446            // clear user muting state.
1447            self.muted_by_user = false;
1448        }
1449
1450        let (result, old_muted) = match &mut self.microphone_track {
1451            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1452            LocalTrack::Pending { muted, .. } => {
1453                let old_muted = *muted;
1454                *muted = should_mute;
1455                cx.notify();
1456                Ok((Task::Ready(Some(Ok(()))), old_muted))
1457            }
1458            LocalTrack::Published {
1459                track_publication,
1460                muted,
1461            } => {
1462                let old_muted = *muted;
1463                *muted = should_mute;
1464                cx.notify();
1465                Ok((
1466                    cx.background().spawn(track_publication.set_mute(*muted)),
1467                    old_muted,
1468                ))
1469            }
1470        }?;
1471
1472        if old_muted != should_mute {
1473            if should_mute {
1474                Audio::play_sound(Sound::Mute, cx);
1475            } else {
1476                Audio::play_sound(Sound::Unmute, cx);
1477            }
1478        }
1479
1480        Ok((result, old_muted))
1481    }
1482}
1483
1484enum LocalTrack {
1485    None,
1486    Pending {
1487        publish_id: usize,
1488        muted: bool,
1489    },
1490    Published {
1491        track_publication: LocalTrackPublication,
1492        muted: bool,
1493    },
1494}
1495
1496impl Default for LocalTrack {
1497    fn default() -> Self {
1498        Self::None
1499    }
1500}
1501
1502#[derive(Copy, Clone, PartialEq, Eq)]
1503pub enum RoomStatus {
1504    Online,
1505    Rejoining,
1506    Offline,
1507}
1508
1509impl RoomStatus {
1510    pub fn is_offline(&self) -> bool {
1511        matches!(self, RoomStatus::Offline)
1512    }
1513
1514    pub fn is_online(&self) -> bool {
1515        matches!(self, RoomStatus::Online)
1516    }
1517}