room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    Left,
  54}
  55
  56pub struct Room {
  57    id: u64,
  58    channel_id: Option<u64>,
  59    live_kit: Option<LiveKitRoom>,
  60    status: RoomStatus,
  61    shared_projects: HashSet<WeakModelHandle<Project>>,
  62    joined_projects: HashSet<WeakModelHandle<Project>>,
  63    local_participant: LocalParticipant,
  64    remote_participants: BTreeMap<u64, RemoteParticipant>,
  65    pending_participants: Vec<Arc<User>>,
  66    participant_user_ids: HashSet<u64>,
  67    pending_call_count: usize,
  68    leave_when_empty: bool,
  69    client: Arc<Client>,
  70    user_store: ModelHandle<UserStore>,
  71    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  72    subscriptions: Vec<client::Subscription>,
  73    pending_room_update: Option<Task<()>>,
  74    maintain_connection: Option<Task<Option<()>>>,
  75}
  76
  77impl Entity for Room {
  78    type Event = Event;
  79
  80    fn release(&mut self, cx: &mut AppContext) {
  81        if self.status.is_online() {
  82            self.leave_internal(cx).detach_and_log_err(cx);
  83        }
  84    }
  85
  86    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  87        if self.status.is_online() {
  88            let leave = self.leave_internal(cx);
  89            Some(
  90                cx.background()
  91                    .spawn(async move {
  92                        leave.await.log_err();
  93                    })
  94                    .boxed(),
  95            )
  96        } else {
  97            None
  98        }
  99    }
 100}
 101
 102impl Room {
 103    pub fn channel_id(&self) -> Option<u64> {
 104        self.channel_id
 105    }
 106
 107    #[cfg(any(test, feature = "test-support"))]
 108    pub fn is_connected(&self) -> bool {
 109        if let Some(live_kit) = self.live_kit.as_ref() {
 110            matches!(
 111                *live_kit.room.status().borrow(),
 112                live_kit_client::ConnectionState::Connected { .. }
 113            )
 114        } else {
 115            false
 116        }
 117    }
 118
 119    fn new(
 120        id: u64,
 121        channel_id: Option<u64>,
 122        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 123        client: Arc<Client>,
 124        user_store: ModelHandle<UserStore>,
 125        cx: &mut ModelContext<Self>,
 126    ) -> Self {
 127        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 128            let room = live_kit_client::Room::new();
 129            let mut status = room.status();
 130            // Consume the initial status of the room.
 131            let _ = status.try_recv();
 132            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 133                while let Some(status) = status.next().await {
 134                    let this = if let Some(this) = this.upgrade(&cx) {
 135                        this
 136                    } else {
 137                        break;
 138                    };
 139
 140                    if status == live_kit_client::ConnectionState::Disconnected {
 141                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 142                        break;
 143                    }
 144                }
 145            });
 146
 147            let mut track_video_changes = room.remote_video_track_updates();
 148            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 149                while let Some(track_change) = track_video_changes.next().await {
 150                    let this = if let Some(this) = this.upgrade(&cx) {
 151                        this
 152                    } else {
 153                        break;
 154                    };
 155
 156                    this.update(&mut cx, |this, cx| {
 157                        this.remote_video_track_updated(track_change, cx).log_err()
 158                    });
 159                }
 160            });
 161
 162            let mut track_audio_changes = room.remote_audio_track_updates();
 163            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 164                while let Some(track_change) = track_audio_changes.next().await {
 165                    let this = if let Some(this) = this.upgrade(&cx) {
 166                        this
 167                    } else {
 168                        break;
 169                    };
 170
 171                    this.update(&mut cx, |this, cx| {
 172                        this.remote_audio_track_updated(track_change, cx).log_err()
 173                    });
 174                }
 175            });
 176
 177            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 178            cx.spawn(|this, mut cx| async move {
 179                connect.await?;
 180
 181                if !cx.read(Self::mute_on_join) {
 182                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 183                        .await?;
 184                }
 185
 186                anyhow::Ok(())
 187            })
 188            .detach_and_log_err(cx);
 189
 190            Some(LiveKitRoom {
 191                room,
 192                screen_track: LocalTrack::None,
 193                microphone_track: LocalTrack::None,
 194                next_publish_id: 0,
 195                muted_by_user: false,
 196                deafened: false,
 197                speaking: false,
 198                _maintain_room,
 199                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 200            })
 201        } else {
 202            None
 203        };
 204
 205        let maintain_connection =
 206            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 207
 208        Audio::play_sound(Sound::Joined, cx);
 209
 210        Self {
 211            id,
 212            channel_id,
 213            live_kit: live_kit_room,
 214            status: RoomStatus::Online,
 215            shared_projects: Default::default(),
 216            joined_projects: Default::default(),
 217            participant_user_ids: Default::default(),
 218            local_participant: Default::default(),
 219            remote_participants: Default::default(),
 220            pending_participants: Default::default(),
 221            pending_call_count: 0,
 222            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 223            leave_when_empty: false,
 224            pending_room_update: None,
 225            client,
 226            user_store,
 227            follows_by_leader_id_project_id: Default::default(),
 228            maintain_connection: Some(maintain_connection),
 229        }
 230    }
 231
 232    pub(crate) fn create(
 233        called_user_id: u64,
 234        initial_project: Option<ModelHandle<Project>>,
 235        client: Arc<Client>,
 236        user_store: ModelHandle<UserStore>,
 237        cx: &mut AppContext,
 238    ) -> Task<Result<ModelHandle<Self>>> {
 239        cx.spawn(|mut cx| async move {
 240            let response = client.request(proto::CreateRoom {}).await?;
 241            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 242            let room = cx.add_model(|cx| {
 243                Self::new(
 244                    room_proto.id,
 245                    None,
 246                    response.live_kit_connection_info,
 247                    client,
 248                    user_store,
 249                    cx,
 250                )
 251            });
 252
 253            let initial_project_id = if let Some(initial_project) = initial_project {
 254                let initial_project_id = room
 255                    .update(&mut cx, |room, cx| {
 256                        room.share_project(initial_project.clone(), cx)
 257                    })
 258                    .await?;
 259                Some(initial_project_id)
 260            } else {
 261                None
 262            };
 263
 264            match room
 265                .update(&mut cx, |room, cx| {
 266                    room.leave_when_empty = true;
 267                    room.call(called_user_id, initial_project_id, cx)
 268                })
 269                .await
 270            {
 271                Ok(()) => Ok(room),
 272                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 273            }
 274        })
 275    }
 276
 277    pub(crate) fn join_channel(
 278        channel_id: u64,
 279        client: Arc<Client>,
 280        user_store: ModelHandle<UserStore>,
 281        cx: &mut AppContext,
 282    ) -> Task<Result<ModelHandle<Self>>> {
 283        cx.spawn(|cx| async move {
 284            Self::from_join_response(
 285                client.request(proto::JoinChannel { channel_id }).await?,
 286                client,
 287                user_store,
 288                cx,
 289            )
 290        })
 291    }
 292
 293    pub(crate) fn join(
 294        call: &IncomingCall,
 295        client: Arc<Client>,
 296        user_store: ModelHandle<UserStore>,
 297        cx: &mut AppContext,
 298    ) -> Task<Result<ModelHandle<Self>>> {
 299        let id = call.room_id;
 300        cx.spawn(|cx| async move {
 301            Self::from_join_response(
 302                client.request(proto::JoinRoom { id }).await?,
 303                client,
 304                user_store,
 305                cx,
 306            )
 307        })
 308    }
 309
 310    pub fn mute_on_join(cx: &AppContext) -> bool {
 311        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 312    }
 313
 314    fn from_join_response(
 315        response: proto::JoinRoomResponse,
 316        client: Arc<Client>,
 317        user_store: ModelHandle<UserStore>,
 318        mut cx: AsyncAppContext,
 319    ) -> Result<ModelHandle<Self>> {
 320        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 321        let room = cx.add_model(|cx| {
 322            Self::new(
 323                room_proto.id,
 324                response.channel_id,
 325                response.live_kit_connection_info,
 326                client,
 327                user_store,
 328                cx,
 329            )
 330        });
 331        room.update(&mut cx, |room, cx| {
 332            room.leave_when_empty = room.channel_id.is_none();
 333            room.apply_room_update(room_proto, cx)?;
 334            anyhow::Ok(())
 335        })?;
 336        Ok(room)
 337    }
 338
 339    fn should_leave(&self) -> bool {
 340        self.leave_when_empty
 341            && self.pending_room_update.is_none()
 342            && self.pending_participants.is_empty()
 343            && self.remote_participants.is_empty()
 344            && self.pending_call_count == 0
 345    }
 346
 347    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 348        cx.notify();
 349        cx.emit(Event::Left);
 350        self.leave_internal(cx)
 351    }
 352
 353    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 354        if self.status.is_offline() {
 355            return Task::ready(Err(anyhow!("room is offline")));
 356        }
 357
 358        log::info!("leaving room");
 359        Audio::play_sound(Sound::Leave, cx);
 360
 361        self.clear_state(cx);
 362
 363        let leave_room = self.client.request(proto::LeaveRoom {});
 364        cx.background().spawn(async move {
 365            leave_room.await?;
 366            anyhow::Ok(())
 367        })
 368    }
 369
 370    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 371        for project in self.shared_projects.drain() {
 372            if let Some(project) = project.upgrade(cx) {
 373                project.update(cx, |project, cx| {
 374                    project.unshare(cx).log_err();
 375                });
 376            }
 377        }
 378        for project in self.joined_projects.drain() {
 379            if let Some(project) = project.upgrade(cx) {
 380                project.update(cx, |project, cx| {
 381                    project.disconnected_from_host(cx);
 382                    project.close(cx);
 383                });
 384            }
 385        }
 386
 387        self.status = RoomStatus::Offline;
 388        self.remote_participants.clear();
 389        self.pending_participants.clear();
 390        self.participant_user_ids.clear();
 391        self.subscriptions.clear();
 392        self.live_kit.take();
 393        self.pending_room_update.take();
 394        self.maintain_connection.take();
 395    }
 396
 397    async fn maintain_connection(
 398        this: WeakModelHandle<Self>,
 399        client: Arc<Client>,
 400        mut cx: AsyncAppContext,
 401    ) -> Result<()> {
 402        let mut client_status = client.status();
 403        loop {
 404            let _ = client_status.try_recv();
 405            let is_connected = client_status.borrow().is_connected();
 406            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 407            if !is_connected || client_status.next().await.is_some() {
 408                log::info!("detected client disconnection");
 409
 410                this.upgrade(&cx)
 411                    .ok_or_else(|| anyhow!("room was dropped"))?
 412                    .update(&mut cx, |this, cx| {
 413                        this.status = RoomStatus::Rejoining;
 414                        cx.notify();
 415                    });
 416
 417                // Wait for client to re-establish a connection to the server.
 418                {
 419                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 420                    let client_reconnection = async {
 421                        let mut remaining_attempts = 3;
 422                        while remaining_attempts > 0 {
 423                            if client_status.borrow().is_connected() {
 424                                log::info!("client reconnected, attempting to rejoin room");
 425
 426                                let Some(this) = this.upgrade(&cx) else { break };
 427                                if this
 428                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 429                                    .await
 430                                    .log_err()
 431                                    .is_some()
 432                                {
 433                                    return true;
 434                                } else {
 435                                    remaining_attempts -= 1;
 436                                }
 437                            } else if client_status.borrow().is_signed_out() {
 438                                return false;
 439                            }
 440
 441                            log::info!(
 442                                "waiting for client status change, remaining attempts {}",
 443                                remaining_attempts
 444                            );
 445                            client_status.next().await;
 446                        }
 447                        false
 448                    }
 449                    .fuse();
 450                    futures::pin_mut!(client_reconnection);
 451
 452                    futures::select_biased! {
 453                        reconnected = client_reconnection => {
 454                            if reconnected {
 455                                log::info!("successfully reconnected to room");
 456                                // If we successfully joined the room, go back around the loop
 457                                // waiting for future connection status changes.
 458                                continue;
 459                            }
 460                        }
 461                        _ = reconnection_timeout => {
 462                            log::info!("room reconnection timeout expired");
 463                        }
 464                    }
 465                }
 466
 467                break;
 468            }
 469        }
 470
 471        // The client failed to re-establish a connection to the server
 472        // or an error occurred while trying to re-join the room. Either way
 473        // we leave the room and return an error.
 474        if let Some(this) = this.upgrade(&cx) {
 475            log::info!("reconnection failed, leaving room");
 476            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 477        }
 478        Err(anyhow!(
 479            "can't reconnect to room: client failed to re-establish connection"
 480        ))
 481    }
 482
 483    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 484        let mut projects = HashMap::default();
 485        let mut reshared_projects = Vec::new();
 486        let mut rejoined_projects = Vec::new();
 487        self.shared_projects.retain(|project| {
 488            if let Some(handle) = project.upgrade(cx) {
 489                let project = handle.read(cx);
 490                if let Some(project_id) = project.remote_id() {
 491                    projects.insert(project_id, handle.clone());
 492                    reshared_projects.push(proto::UpdateProject {
 493                        project_id,
 494                        worktrees: project.worktree_metadata_protos(cx),
 495                    });
 496                    return true;
 497                }
 498            }
 499            false
 500        });
 501        self.joined_projects.retain(|project| {
 502            if let Some(handle) = project.upgrade(cx) {
 503                let project = handle.read(cx);
 504                if let Some(project_id) = project.remote_id() {
 505                    projects.insert(project_id, handle.clone());
 506                    rejoined_projects.push(proto::RejoinProject {
 507                        id: project_id,
 508                        worktrees: project
 509                            .worktrees(cx)
 510                            .map(|worktree| {
 511                                let worktree = worktree.read(cx);
 512                                proto::RejoinWorktree {
 513                                    id: worktree.id().to_proto(),
 514                                    scan_id: worktree.completed_scan_id() as u64,
 515                                }
 516                            })
 517                            .collect(),
 518                    });
 519                }
 520                return true;
 521            }
 522            false
 523        });
 524
 525        let response = self.client.request_envelope(proto::RejoinRoom {
 526            id: self.id,
 527            reshared_projects,
 528            rejoined_projects,
 529        });
 530
 531        cx.spawn(|this, mut cx| async move {
 532            let response = response.await?;
 533            let message_id = response.message_id;
 534            let response = response.payload;
 535            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 536            this.update(&mut cx, |this, cx| {
 537                this.status = RoomStatus::Online;
 538                this.apply_room_update(room_proto, cx)?;
 539
 540                for reshared_project in response.reshared_projects {
 541                    if let Some(project) = projects.get(&reshared_project.id) {
 542                        project.update(cx, |project, cx| {
 543                            project.reshared(reshared_project, cx).log_err();
 544                        });
 545                    }
 546                }
 547
 548                for rejoined_project in response.rejoined_projects {
 549                    if let Some(project) = projects.get(&rejoined_project.id) {
 550                        project.update(cx, |project, cx| {
 551                            project.rejoined(rejoined_project, message_id, cx).log_err();
 552                        });
 553                    }
 554                }
 555
 556                anyhow::Ok(())
 557            })
 558        })
 559    }
 560
 561    pub fn id(&self) -> u64 {
 562        self.id
 563    }
 564
 565    pub fn status(&self) -> RoomStatus {
 566        self.status
 567    }
 568
 569    pub fn local_participant(&self) -> &LocalParticipant {
 570        &self.local_participant
 571    }
 572
 573    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 574        &self.remote_participants
 575    }
 576
 577    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 578        self.remote_participants
 579            .values()
 580            .find(|p| p.peer_id == peer_id)
 581    }
 582
 583    pub fn pending_participants(&self) -> &[Arc<User>] {
 584        &self.pending_participants
 585    }
 586
 587    pub fn contains_participant(&self, user_id: u64) -> bool {
 588        self.participant_user_ids.contains(&user_id)
 589    }
 590
 591    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 592        self.follows_by_leader_id_project_id
 593            .get(&(leader_id, project_id))
 594            .map_or(&[], |v| v.as_slice())
 595    }
 596
 597    async fn handle_room_updated(
 598        this: ModelHandle<Self>,
 599        envelope: TypedEnvelope<proto::RoomUpdated>,
 600        _: Arc<Client>,
 601        mut cx: AsyncAppContext,
 602    ) -> Result<()> {
 603        let room = envelope
 604            .payload
 605            .room
 606            .ok_or_else(|| anyhow!("invalid room"))?;
 607        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 608    }
 609
 610    fn apply_room_update(
 611        &mut self,
 612        mut room: proto::Room,
 613        cx: &mut ModelContext<Self>,
 614    ) -> Result<()> {
 615        // Filter ourselves out from the room's participants.
 616        let local_participant_ix = room
 617            .participants
 618            .iter()
 619            .position(|participant| Some(participant.user_id) == self.client.user_id());
 620        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 621
 622        let pending_participant_user_ids = room
 623            .pending_participants
 624            .iter()
 625            .map(|p| p.user_id)
 626            .collect::<Vec<_>>();
 627
 628        let remote_participant_user_ids = room
 629            .participants
 630            .iter()
 631            .map(|p| p.user_id)
 632            .collect::<Vec<_>>();
 633
 634        let (remote_participants, pending_participants) =
 635            self.user_store.update(cx, move |user_store, cx| {
 636                (
 637                    user_store.get_users(remote_participant_user_ids, cx),
 638                    user_store.get_users(pending_participant_user_ids, cx),
 639                )
 640            });
 641
 642        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 643            let (remote_participants, pending_participants) =
 644                futures::join!(remote_participants, pending_participants);
 645
 646            this.update(&mut cx, |this, cx| {
 647                this.participant_user_ids.clear();
 648
 649                if let Some(participant) = local_participant {
 650                    this.local_participant.projects = participant.projects;
 651                } else {
 652                    this.local_participant.projects.clear();
 653                }
 654
 655                if let Some(participants) = remote_participants.log_err() {
 656                    for (participant, user) in room.participants.into_iter().zip(participants) {
 657                        let Some(peer_id) = participant.peer_id else {
 658                            continue;
 659                        };
 660                        this.participant_user_ids.insert(participant.user_id);
 661
 662                        let old_projects = this
 663                            .remote_participants
 664                            .get(&participant.user_id)
 665                            .into_iter()
 666                            .flat_map(|existing| &existing.projects)
 667                            .map(|project| project.id)
 668                            .collect::<HashSet<_>>();
 669                        let new_projects = participant
 670                            .projects
 671                            .iter()
 672                            .map(|project| project.id)
 673                            .collect::<HashSet<_>>();
 674
 675                        for project in &participant.projects {
 676                            if !old_projects.contains(&project.id) {
 677                                cx.emit(Event::RemoteProjectShared {
 678                                    owner: user.clone(),
 679                                    project_id: project.id,
 680                                    worktree_root_names: project.worktree_root_names.clone(),
 681                                });
 682                            }
 683                        }
 684
 685                        for unshared_project_id in old_projects.difference(&new_projects) {
 686                            this.joined_projects.retain(|project| {
 687                                if let Some(project) = project.upgrade(cx) {
 688                                    project.update(cx, |project, cx| {
 689                                        if project.remote_id() == Some(*unshared_project_id) {
 690                                            project.disconnected_from_host(cx);
 691                                            false
 692                                        } else {
 693                                            true
 694                                        }
 695                                    })
 696                                } else {
 697                                    false
 698                                }
 699                            });
 700                            cx.emit(Event::RemoteProjectUnshared {
 701                                project_id: *unshared_project_id,
 702                            });
 703                        }
 704
 705                        let location = ParticipantLocation::from_proto(participant.location)
 706                            .unwrap_or(ParticipantLocation::External);
 707                        if let Some(remote_participant) =
 708                            this.remote_participants.get_mut(&participant.user_id)
 709                        {
 710                            remote_participant.projects = participant.projects;
 711                            remote_participant.peer_id = peer_id;
 712                            if location != remote_participant.location {
 713                                remote_participant.location = location;
 714                                cx.emit(Event::ParticipantLocationChanged {
 715                                    participant_id: peer_id,
 716                                });
 717                            }
 718                        } else {
 719                            this.remote_participants.insert(
 720                                participant.user_id,
 721                                RemoteParticipant {
 722                                    user: user.clone(),
 723                                    participant_index: ParticipantIndex(
 724                                        participant.participant_index,
 725                                    ),
 726                                    peer_id,
 727                                    projects: participant.projects,
 728                                    location,
 729                                    muted: true,
 730                                    speaking: false,
 731                                    video_tracks: Default::default(),
 732                                    audio_tracks: Default::default(),
 733                                },
 734                            );
 735
 736                            Audio::play_sound(Sound::Joined, cx);
 737
 738                            if let Some(live_kit) = this.live_kit.as_ref() {
 739                                let video_tracks =
 740                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 741                                let audio_tracks =
 742                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 743                                let publications = live_kit
 744                                    .room
 745                                    .remote_audio_track_publications(&user.id.to_string());
 746
 747                                for track in video_tracks {
 748                                    this.remote_video_track_updated(
 749                                        RemoteVideoTrackUpdate::Subscribed(track),
 750                                        cx,
 751                                    )
 752                                    .log_err();
 753                                }
 754
 755                                for (track, publication) in
 756                                    audio_tracks.iter().zip(publications.iter())
 757                                {
 758                                    this.remote_audio_track_updated(
 759                                        RemoteAudioTrackUpdate::Subscribed(
 760                                            track.clone(),
 761                                            publication.clone(),
 762                                        ),
 763                                        cx,
 764                                    )
 765                                    .log_err();
 766                                }
 767                            }
 768                        }
 769                    }
 770
 771                    this.remote_participants.retain(|user_id, participant| {
 772                        if this.participant_user_ids.contains(user_id) {
 773                            true
 774                        } else {
 775                            for project in &participant.projects {
 776                                cx.emit(Event::RemoteProjectUnshared {
 777                                    project_id: project.id,
 778                                });
 779                            }
 780                            false
 781                        }
 782                    });
 783                }
 784
 785                if let Some(pending_participants) = pending_participants.log_err() {
 786                    this.pending_participants = pending_participants;
 787                    for participant in &this.pending_participants {
 788                        this.participant_user_ids.insert(participant.id);
 789                    }
 790                }
 791
 792                this.follows_by_leader_id_project_id.clear();
 793                for follower in room.followers {
 794                    let project_id = follower.project_id;
 795                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 796                        (Some(leader), Some(follower)) => (leader, follower),
 797
 798                        _ => {
 799                            log::error!("Follower message {follower:?} missing some state");
 800                            continue;
 801                        }
 802                    };
 803
 804                    let list = this
 805                        .follows_by_leader_id_project_id
 806                        .entry((leader, project_id))
 807                        .or_insert(Vec::new());
 808                    if !list.contains(&follower) {
 809                        list.push(follower);
 810                    }
 811                }
 812
 813                this.pending_room_update.take();
 814                if this.should_leave() {
 815                    log::info!("room is empty, leaving");
 816                    let _ = this.leave(cx);
 817                }
 818
 819                this.user_store.update(cx, |user_store, cx| {
 820                    let participant_indices_by_user_id = this
 821                        .remote_participants
 822                        .iter()
 823                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 824                        .collect();
 825                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 826                });
 827
 828                this.check_invariants();
 829                cx.notify();
 830            });
 831        }));
 832
 833        cx.notify();
 834        Ok(())
 835    }
 836
 837    fn remote_video_track_updated(
 838        &mut self,
 839        change: RemoteVideoTrackUpdate,
 840        cx: &mut ModelContext<Self>,
 841    ) -> Result<()> {
 842        match change {
 843            RemoteVideoTrackUpdate::Subscribed(track) => {
 844                let user_id = track.publisher_id().parse()?;
 845                let track_id = track.sid().to_string();
 846                let participant = self
 847                    .remote_participants
 848                    .get_mut(&user_id)
 849                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 850                participant.video_tracks.insert(
 851                    track_id.clone(),
 852                    Arc::new(RemoteVideoTrack {
 853                        live_kit_track: track,
 854                    }),
 855                );
 856                cx.emit(Event::RemoteVideoTracksChanged {
 857                    participant_id: participant.peer_id,
 858                });
 859            }
 860            RemoteVideoTrackUpdate::Unsubscribed {
 861                publisher_id,
 862                track_id,
 863            } => {
 864                let user_id = publisher_id.parse()?;
 865                let participant = self
 866                    .remote_participants
 867                    .get_mut(&user_id)
 868                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 869                participant.video_tracks.remove(&track_id);
 870                cx.emit(Event::RemoteVideoTracksChanged {
 871                    participant_id: participant.peer_id,
 872                });
 873            }
 874        }
 875
 876        cx.notify();
 877        Ok(())
 878    }
 879
 880    fn remote_audio_track_updated(
 881        &mut self,
 882        change: RemoteAudioTrackUpdate,
 883        cx: &mut ModelContext<Self>,
 884    ) -> Result<()> {
 885        match change {
 886            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 887                let mut speaker_ids = speakers
 888                    .into_iter()
 889                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 890                    .collect::<Vec<u64>>();
 891                speaker_ids.sort_unstable();
 892                for (sid, participant) in &mut self.remote_participants {
 893                    if let Ok(_) = speaker_ids.binary_search(sid) {
 894                        participant.speaking = true;
 895                    } else {
 896                        participant.speaking = false;
 897                    }
 898                }
 899                if let Some(id) = self.client.user_id() {
 900                    if let Some(room) = &mut self.live_kit {
 901                        if let Ok(_) = speaker_ids.binary_search(&id) {
 902                            room.speaking = true;
 903                        } else {
 904                            room.speaking = false;
 905                        }
 906                    }
 907                }
 908                cx.notify();
 909            }
 910            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 911                let mut found = false;
 912                for participant in &mut self.remote_participants.values_mut() {
 913                    for track in participant.audio_tracks.values() {
 914                        if track.sid() == track_id {
 915                            found = true;
 916                            break;
 917                        }
 918                    }
 919                    if found {
 920                        participant.muted = muted;
 921                        break;
 922                    }
 923                }
 924
 925                cx.notify();
 926            }
 927            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 928                let user_id = track.publisher_id().parse()?;
 929                let track_id = track.sid().to_string();
 930                let participant = self
 931                    .remote_participants
 932                    .get_mut(&user_id)
 933                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 934
 935                participant.audio_tracks.insert(track_id.clone(), track);
 936                participant.muted = publication.is_muted();
 937
 938                cx.emit(Event::RemoteAudioTracksChanged {
 939                    participant_id: participant.peer_id,
 940                });
 941            }
 942            RemoteAudioTrackUpdate::Unsubscribed {
 943                publisher_id,
 944                track_id,
 945            } => {
 946                let user_id = publisher_id.parse()?;
 947                let participant = self
 948                    .remote_participants
 949                    .get_mut(&user_id)
 950                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 951                participant.audio_tracks.remove(&track_id);
 952                cx.emit(Event::RemoteAudioTracksChanged {
 953                    participant_id: participant.peer_id,
 954                });
 955            }
 956        }
 957
 958        cx.notify();
 959        Ok(())
 960    }
 961
 962    fn check_invariants(&self) {
 963        #[cfg(any(test, feature = "test-support"))]
 964        {
 965            for participant in self.remote_participants.values() {
 966                assert!(self.participant_user_ids.contains(&participant.user.id));
 967                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 968            }
 969
 970            for participant in &self.pending_participants {
 971                assert!(self.participant_user_ids.contains(&participant.id));
 972                assert_ne!(participant.id, self.client.user_id().unwrap());
 973            }
 974
 975            assert_eq!(
 976                self.participant_user_ids.len(),
 977                self.remote_participants.len() + self.pending_participants.len()
 978            );
 979        }
 980    }
 981
 982    pub(crate) fn call(
 983        &mut self,
 984        called_user_id: u64,
 985        initial_project_id: Option<u64>,
 986        cx: &mut ModelContext<Self>,
 987    ) -> Task<Result<()>> {
 988        if self.status.is_offline() {
 989            return Task::ready(Err(anyhow!("room is offline")));
 990        }
 991
 992        cx.notify();
 993        let client = self.client.clone();
 994        let room_id = self.id;
 995        self.pending_call_count += 1;
 996        cx.spawn(|this, mut cx| async move {
 997            let result = client
 998                .request(proto::Call {
 999                    room_id,
1000                    called_user_id,
1001                    initial_project_id,
1002                })
1003                .await;
1004            this.update(&mut cx, |this, cx| {
1005                this.pending_call_count -= 1;
1006                if this.should_leave() {
1007                    this.leave(cx).detach_and_log_err(cx);
1008                }
1009            });
1010            result?;
1011            Ok(())
1012        })
1013    }
1014
1015    pub fn join_project(
1016        &mut self,
1017        id: u64,
1018        language_registry: Arc<LanguageRegistry>,
1019        fs: Arc<dyn Fs>,
1020        cx: &mut ModelContext<Self>,
1021    ) -> Task<Result<ModelHandle<Project>>> {
1022        let client = self.client.clone();
1023        let user_store = self.user_store.clone();
1024        cx.emit(Event::RemoteProjectJoined { project_id: id });
1025        cx.spawn(|this, mut cx| async move {
1026            let project =
1027                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1028
1029            this.update(&mut cx, |this, cx| {
1030                this.joined_projects.retain(|project| {
1031                    if let Some(project) = project.upgrade(cx) {
1032                        !project.read(cx).is_read_only()
1033                    } else {
1034                        false
1035                    }
1036                });
1037                this.joined_projects.insert(project.downgrade());
1038            });
1039            Ok(project)
1040        })
1041    }
1042
1043    pub(crate) fn share_project(
1044        &mut self,
1045        project: ModelHandle<Project>,
1046        cx: &mut ModelContext<Self>,
1047    ) -> Task<Result<u64>> {
1048        if let Some(project_id) = project.read(cx).remote_id() {
1049            return Task::ready(Ok(project_id));
1050        }
1051
1052        let request = self.client.request(proto::ShareProject {
1053            room_id: self.id(),
1054            worktrees: project.read(cx).worktree_metadata_protos(cx),
1055        });
1056        cx.spawn(|this, mut cx| async move {
1057            let response = request.await?;
1058
1059            project.update(&mut cx, |project, cx| {
1060                project.shared(response.project_id, cx)
1061            })?;
1062
1063            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1064            this.update(&mut cx, |this, cx| {
1065                this.shared_projects.insert(project.downgrade());
1066                let active_project = this.local_participant.active_project.as_ref();
1067                if active_project.map_or(false, |location| *location == project) {
1068                    this.set_location(Some(&project), cx)
1069                } else {
1070                    Task::ready(Ok(()))
1071                }
1072            })
1073            .await?;
1074
1075            Ok(response.project_id)
1076        })
1077    }
1078
1079    pub(crate) fn unshare_project(
1080        &mut self,
1081        project: ModelHandle<Project>,
1082        cx: &mut ModelContext<Self>,
1083    ) -> Result<()> {
1084        let project_id = match project.read(cx).remote_id() {
1085            Some(project_id) => project_id,
1086            None => return Ok(()),
1087        };
1088
1089        self.client.send(proto::UnshareProject { project_id })?;
1090        project.update(cx, |this, cx| this.unshare(cx))
1091    }
1092
1093    pub(crate) fn set_location(
1094        &mut self,
1095        project: Option<&ModelHandle<Project>>,
1096        cx: &mut ModelContext<Self>,
1097    ) -> Task<Result<()>> {
1098        if self.status.is_offline() {
1099            return Task::ready(Err(anyhow!("room is offline")));
1100        }
1101
1102        let client = self.client.clone();
1103        let room_id = self.id;
1104        let location = if let Some(project) = project {
1105            self.local_participant.active_project = Some(project.downgrade());
1106            if let Some(project_id) = project.read(cx).remote_id() {
1107                proto::participant_location::Variant::SharedProject(
1108                    proto::participant_location::SharedProject { id: project_id },
1109                )
1110            } else {
1111                proto::participant_location::Variant::UnsharedProject(
1112                    proto::participant_location::UnsharedProject {},
1113                )
1114            }
1115        } else {
1116            self.local_participant.active_project = None;
1117            proto::participant_location::Variant::External(proto::participant_location::External {})
1118        };
1119
1120        cx.notify();
1121        cx.foreground().spawn(async move {
1122            client
1123                .request(proto::UpdateParticipantLocation {
1124                    room_id,
1125                    location: Some(proto::ParticipantLocation {
1126                        variant: Some(location),
1127                    }),
1128                })
1129                .await?;
1130            Ok(())
1131        })
1132    }
1133
1134    pub fn is_screen_sharing(&self) -> bool {
1135        self.live_kit.as_ref().map_or(false, |live_kit| {
1136            !matches!(live_kit.screen_track, LocalTrack::None)
1137        })
1138    }
1139
1140    pub fn is_sharing_mic(&self) -> bool {
1141        self.live_kit.as_ref().map_or(false, |live_kit| {
1142            !matches!(live_kit.microphone_track, LocalTrack::None)
1143        })
1144    }
1145
1146    pub fn is_muted(&self, cx: &AppContext) -> bool {
1147        self.live_kit
1148            .as_ref()
1149            .and_then(|live_kit| match &live_kit.microphone_track {
1150                LocalTrack::None => Some(Self::mute_on_join(cx)),
1151                LocalTrack::Pending { muted, .. } => Some(*muted),
1152                LocalTrack::Published { muted, .. } => Some(*muted),
1153            })
1154            .unwrap_or(false)
1155    }
1156
1157    pub fn is_speaking(&self) -> bool {
1158        self.live_kit
1159            .as_ref()
1160            .map_or(false, |live_kit| live_kit.speaking)
1161    }
1162
1163    pub fn is_deafened(&self) -> Option<bool> {
1164        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1165    }
1166
1167    #[track_caller]
1168    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1169        if self.status.is_offline() {
1170            return Task::ready(Err(anyhow!("room is offline")));
1171        } else if self.is_sharing_mic() {
1172            return Task::ready(Err(anyhow!("microphone was already shared")));
1173        }
1174
1175        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1176            let publish_id = post_inc(&mut live_kit.next_publish_id);
1177            live_kit.microphone_track = LocalTrack::Pending {
1178                publish_id,
1179                muted: false,
1180            };
1181            cx.notify();
1182            publish_id
1183        } else {
1184            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1185        };
1186
1187        cx.spawn_weak(|this, mut cx| async move {
1188            let publish_track = async {
1189                let track = LocalAudioTrack::create();
1190                this.upgrade(&cx)
1191                    .ok_or_else(|| anyhow!("room was dropped"))?
1192                    .read_with(&cx, |this, _| {
1193                        this.live_kit
1194                            .as_ref()
1195                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1196                    })
1197                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1198                    .await
1199            };
1200
1201            let publication = publish_track.await;
1202            this.upgrade(&cx)
1203                .ok_or_else(|| anyhow!("room was dropped"))?
1204                .update(&mut cx, |this, cx| {
1205                    let live_kit = this
1206                        .live_kit
1207                        .as_mut()
1208                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1209
1210                    let (canceled, muted) = if let LocalTrack::Pending {
1211                        publish_id: cur_publish_id,
1212                        muted,
1213                    } = &live_kit.microphone_track
1214                    {
1215                        (*cur_publish_id != publish_id, *muted)
1216                    } else {
1217                        (true, false)
1218                    };
1219
1220                    match publication {
1221                        Ok(publication) => {
1222                            if canceled {
1223                                live_kit.room.unpublish_track(publication);
1224                            } else {
1225                                if muted {
1226                                    cx.background().spawn(publication.set_mute(muted)).detach();
1227                                }
1228                                live_kit.microphone_track = LocalTrack::Published {
1229                                    track_publication: publication,
1230                                    muted,
1231                                };
1232                                cx.notify();
1233                            }
1234                            Ok(())
1235                        }
1236                        Err(error) => {
1237                            if canceled {
1238                                Ok(())
1239                            } else {
1240                                live_kit.microphone_track = LocalTrack::None;
1241                                cx.notify();
1242                                Err(error)
1243                            }
1244                        }
1245                    }
1246                })
1247        })
1248    }
1249
1250    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1251        if self.status.is_offline() {
1252            return Task::ready(Err(anyhow!("room is offline")));
1253        } else if self.is_screen_sharing() {
1254            return Task::ready(Err(anyhow!("screen was already shared")));
1255        }
1256
1257        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1258            let publish_id = post_inc(&mut live_kit.next_publish_id);
1259            live_kit.screen_track = LocalTrack::Pending {
1260                publish_id,
1261                muted: false,
1262            };
1263            cx.notify();
1264            (live_kit.room.display_sources(), publish_id)
1265        } else {
1266            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1267        };
1268
1269        cx.spawn_weak(|this, mut cx| async move {
1270            let publish_track = async {
1271                let displays = displays.await?;
1272                let display = displays
1273                    .first()
1274                    .ok_or_else(|| anyhow!("no display found"))?;
1275                let track = LocalVideoTrack::screen_share_for_display(&display);
1276                this.upgrade(&cx)
1277                    .ok_or_else(|| anyhow!("room was dropped"))?
1278                    .read_with(&cx, |this, _| {
1279                        this.live_kit
1280                            .as_ref()
1281                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1282                    })
1283                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1284                    .await
1285            };
1286
1287            let publication = publish_track.await;
1288            this.upgrade(&cx)
1289                .ok_or_else(|| anyhow!("room was dropped"))?
1290                .update(&mut cx, |this, cx| {
1291                    let live_kit = this
1292                        .live_kit
1293                        .as_mut()
1294                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1295
1296                    let (canceled, muted) = if let LocalTrack::Pending {
1297                        publish_id: cur_publish_id,
1298                        muted,
1299                    } = &live_kit.screen_track
1300                    {
1301                        (*cur_publish_id != publish_id, *muted)
1302                    } else {
1303                        (true, false)
1304                    };
1305
1306                    match publication {
1307                        Ok(publication) => {
1308                            if canceled {
1309                                live_kit.room.unpublish_track(publication);
1310                            } else {
1311                                if muted {
1312                                    cx.background().spawn(publication.set_mute(muted)).detach();
1313                                }
1314                                live_kit.screen_track = LocalTrack::Published {
1315                                    track_publication: publication,
1316                                    muted,
1317                                };
1318                                cx.notify();
1319                            }
1320
1321                            Audio::play_sound(Sound::StartScreenshare, cx);
1322
1323                            Ok(())
1324                        }
1325                        Err(error) => {
1326                            if canceled {
1327                                Ok(())
1328                            } else {
1329                                live_kit.screen_track = LocalTrack::None;
1330                                cx.notify();
1331                                Err(error)
1332                            }
1333                        }
1334                    }
1335                })
1336        })
1337    }
1338
1339    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1340        let should_mute = !self.is_muted(cx);
1341        if let Some(live_kit) = self.live_kit.as_mut() {
1342            if matches!(live_kit.microphone_track, LocalTrack::None) {
1343                return Ok(self.share_microphone(cx));
1344            }
1345
1346            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1347            live_kit.muted_by_user = should_mute;
1348
1349            if old_muted == true && live_kit.deafened == true {
1350                if let Some(task) = self.toggle_deafen(cx).ok() {
1351                    task.detach();
1352                }
1353            }
1354
1355            Ok(ret_task)
1356        } else {
1357            Err(anyhow!("LiveKit not started"))
1358        }
1359    }
1360
1361    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1362        if let Some(live_kit) = self.live_kit.as_mut() {
1363            (*live_kit).deafened = !live_kit.deafened;
1364
1365            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1366            // Context notification is sent within set_mute itself.
1367            let mut mute_task = None;
1368            // When deafening, mute user's mic as well.
1369            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1370            if live_kit.deafened || !live_kit.muted_by_user {
1371                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1372            };
1373            for participant in self.remote_participants.values() {
1374                for track in live_kit
1375                    .room
1376                    .remote_audio_track_publications(&participant.user.id.to_string())
1377                {
1378                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1379                }
1380            }
1381
1382            Ok(cx.foreground().spawn(async move {
1383                if let Some(mute_task) = mute_task {
1384                    mute_task.await?;
1385                }
1386                for task in tasks {
1387                    task.await?;
1388                }
1389                Ok(())
1390            }))
1391        } else {
1392            Err(anyhow!("LiveKit not started"))
1393        }
1394    }
1395
1396    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1397        if self.status.is_offline() {
1398            return Err(anyhow!("room is offline"));
1399        }
1400
1401        let live_kit = self
1402            .live_kit
1403            .as_mut()
1404            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1405        match mem::take(&mut live_kit.screen_track) {
1406            LocalTrack::None => Err(anyhow!("screen was not shared")),
1407            LocalTrack::Pending { .. } => {
1408                cx.notify();
1409                Ok(())
1410            }
1411            LocalTrack::Published {
1412                track_publication, ..
1413            } => {
1414                live_kit.room.unpublish_track(track_publication);
1415                cx.notify();
1416
1417                Audio::play_sound(Sound::StopScreenshare, cx);
1418                Ok(())
1419            }
1420        }
1421    }
1422
1423    #[cfg(any(test, feature = "test-support"))]
1424    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1425        self.live_kit
1426            .as_ref()
1427            .unwrap()
1428            .room
1429            .set_display_sources(sources);
1430    }
1431}
1432
1433struct LiveKitRoom {
1434    room: Arc<live_kit_client::Room>,
1435    screen_track: LocalTrack,
1436    microphone_track: LocalTrack,
1437    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1438    muted_by_user: bool,
1439    deafened: bool,
1440    speaking: bool,
1441    next_publish_id: usize,
1442    _maintain_room: Task<()>,
1443    _maintain_tracks: [Task<()>; 2],
1444}
1445
1446impl LiveKitRoom {
1447    fn set_mute(
1448        self: &mut LiveKitRoom,
1449        should_mute: bool,
1450        cx: &mut ModelContext<Room>,
1451    ) -> Result<(Task<Result<()>>, bool)> {
1452        if !should_mute {
1453            // clear user muting state.
1454            self.muted_by_user = false;
1455        }
1456
1457        let (result, old_muted) = match &mut self.microphone_track {
1458            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1459            LocalTrack::Pending { muted, .. } => {
1460                let old_muted = *muted;
1461                *muted = should_mute;
1462                cx.notify();
1463                Ok((Task::Ready(Some(Ok(()))), old_muted))
1464            }
1465            LocalTrack::Published {
1466                track_publication,
1467                muted,
1468            } => {
1469                let old_muted = *muted;
1470                *muted = should_mute;
1471                cx.notify();
1472                Ok((
1473                    cx.background().spawn(track_publication.set_mute(*muted)),
1474                    old_muted,
1475                ))
1476            }
1477        }?;
1478
1479        if old_muted != should_mute {
1480            if should_mute {
1481                Audio::play_sound(Sound::Mute, cx);
1482            } else {
1483                Audio::play_sound(Sound::Unmute, cx);
1484            }
1485        }
1486
1487        Ok((result, old_muted))
1488    }
1489}
1490
1491enum LocalTrack {
1492    None,
1493    Pending {
1494        publish_id: usize,
1495        muted: bool,
1496    },
1497    Published {
1498        track_publication: LocalTrackPublication,
1499        muted: bool,
1500    },
1501}
1502
1503impl Default for LocalTrack {
1504    fn default() -> Self {
1505        Self::None
1506    }
1507}
1508
1509#[derive(Copy, Clone, PartialEq, Eq)]
1510pub enum RoomStatus {
1511    Online,
1512    Rejoining,
1513    Offline,
1514}
1515
1516impl RoomStatus {
1517    pub fn is_offline(&self) -> bool {
1518        matches!(self, RoomStatus::Offline)
1519    }
1520
1521    pub fn is_online(&self) -> bool {
1522        matches!(self, RoomStatus::Online)
1523    }
1524}