room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    Left,
  54}
  55
  56pub struct Room {
  57    id: u64,
  58    channel_id: Option<u64>,
  59    live_kit: Option<LiveKitRoom>,
  60    status: RoomStatus,
  61    shared_projects: HashSet<WeakModelHandle<Project>>,
  62    joined_projects: HashSet<WeakModelHandle<Project>>,
  63    local_participant: LocalParticipant,
  64    remote_participants: BTreeMap<u64, RemoteParticipant>,
  65    pending_participants: Vec<Arc<User>>,
  66    participant_user_ids: HashSet<u64>,
  67    pending_call_count: usize,
  68    leave_when_empty: bool,
  69    client: Arc<Client>,
  70    user_store: ModelHandle<UserStore>,
  71    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  72    subscriptions: Vec<client::Subscription>,
  73    pending_room_update: Option<Task<()>>,
  74    maintain_connection: Option<Task<Option<()>>>,
  75}
  76
  77impl Entity for Room {
  78    type Event = Event;
  79
  80    fn release(&mut self, cx: &mut AppContext) {
  81        if self.status.is_online() {
  82            self.leave_internal(cx).detach_and_log_err(cx);
  83        }
  84    }
  85
  86    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  87        if self.status.is_online() {
  88            let leave = self.leave_internal(cx);
  89            Some(
  90                cx.background()
  91                    .spawn(async move {
  92                        leave.await.log_err();
  93                    })
  94                    .boxed(),
  95            )
  96        } else {
  97            None
  98        }
  99    }
 100}
 101
 102impl Room {
 103    pub fn channel_id(&self) -> Option<u64> {
 104        self.channel_id
 105    }
 106
 107    pub fn is_sharing_project(&self) -> bool {
 108        !self.shared_projects.is_empty()
 109    }
 110
 111    #[cfg(any(test, feature = "test-support"))]
 112    pub fn is_connected(&self) -> bool {
 113        if let Some(live_kit) = self.live_kit.as_ref() {
 114            matches!(
 115                *live_kit.room.status().borrow(),
 116                live_kit_client::ConnectionState::Connected { .. }
 117            )
 118        } else {
 119            false
 120        }
 121    }
 122
 123    fn new(
 124        id: u64,
 125        channel_id: Option<u64>,
 126        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 127        client: Arc<Client>,
 128        user_store: ModelHandle<UserStore>,
 129        cx: &mut ModelContext<Self>,
 130    ) -> Self {
 131        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 132            let room = live_kit_client::Room::new();
 133            let mut status = room.status();
 134            // Consume the initial status of the room.
 135            let _ = status.try_recv();
 136            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 137                while let Some(status) = status.next().await {
 138                    let this = if let Some(this) = this.upgrade(&cx) {
 139                        this
 140                    } else {
 141                        break;
 142                    };
 143
 144                    if status == live_kit_client::ConnectionState::Disconnected {
 145                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 146                        break;
 147                    }
 148                }
 149            });
 150
 151            let mut track_video_changes = room.remote_video_track_updates();
 152            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 153                while let Some(track_change) = track_video_changes.next().await {
 154                    let this = if let Some(this) = this.upgrade(&cx) {
 155                        this
 156                    } else {
 157                        break;
 158                    };
 159
 160                    this.update(&mut cx, |this, cx| {
 161                        this.remote_video_track_updated(track_change, cx).log_err()
 162                    });
 163                }
 164            });
 165
 166            let mut track_audio_changes = room.remote_audio_track_updates();
 167            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 168                while let Some(track_change) = track_audio_changes.next().await {
 169                    let this = if let Some(this) = this.upgrade(&cx) {
 170                        this
 171                    } else {
 172                        break;
 173                    };
 174
 175                    this.update(&mut cx, |this, cx| {
 176                        this.remote_audio_track_updated(track_change, cx).log_err()
 177                    });
 178                }
 179            });
 180
 181            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 182            cx.spawn(|this, mut cx| async move {
 183                connect.await?;
 184
 185                if !cx.read(Self::mute_on_join) {
 186                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 187                        .await?;
 188                }
 189
 190                anyhow::Ok(())
 191            })
 192            .detach_and_log_err(cx);
 193
 194            Some(LiveKitRoom {
 195                room,
 196                screen_track: LocalTrack::None,
 197                microphone_track: LocalTrack::None,
 198                next_publish_id: 0,
 199                muted_by_user: false,
 200                deafened: false,
 201                speaking: false,
 202                _maintain_room,
 203                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 204            })
 205        } else {
 206            None
 207        };
 208
 209        let maintain_connection =
 210            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 211
 212        Audio::play_sound(Sound::Joined, cx);
 213
 214        Self {
 215            id,
 216            channel_id,
 217            live_kit: live_kit_room,
 218            status: RoomStatus::Online,
 219            shared_projects: Default::default(),
 220            joined_projects: Default::default(),
 221            participant_user_ids: Default::default(),
 222            local_participant: Default::default(),
 223            remote_participants: Default::default(),
 224            pending_participants: Default::default(),
 225            pending_call_count: 0,
 226            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 227            leave_when_empty: false,
 228            pending_room_update: None,
 229            client,
 230            user_store,
 231            follows_by_leader_id_project_id: Default::default(),
 232            maintain_connection: Some(maintain_connection),
 233        }
 234    }
 235
 236    pub(crate) fn create(
 237        called_user_id: u64,
 238        initial_project: Option<ModelHandle<Project>>,
 239        client: Arc<Client>,
 240        user_store: ModelHandle<UserStore>,
 241        cx: &mut AppContext,
 242    ) -> Task<Result<ModelHandle<Self>>> {
 243        cx.spawn(|mut cx| async move {
 244            let response = client.request(proto::CreateRoom {}).await?;
 245            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 246            let room = cx.add_model(|cx| {
 247                Self::new(
 248                    room_proto.id,
 249                    None,
 250                    response.live_kit_connection_info,
 251                    client,
 252                    user_store,
 253                    cx,
 254                )
 255            });
 256
 257            let initial_project_id = if let Some(initial_project) = initial_project {
 258                let initial_project_id = room
 259                    .update(&mut cx, |room, cx| {
 260                        room.share_project(initial_project.clone(), cx)
 261                    })
 262                    .await?;
 263                Some(initial_project_id)
 264            } else {
 265                None
 266            };
 267
 268            match room
 269                .update(&mut cx, |room, cx| {
 270                    room.leave_when_empty = true;
 271                    room.call(called_user_id, initial_project_id, cx)
 272                })
 273                .await
 274            {
 275                Ok(()) => Ok(room),
 276                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 277            }
 278        })
 279    }
 280
 281    pub(crate) fn join_channel(
 282        channel_id: u64,
 283        client: Arc<Client>,
 284        user_store: ModelHandle<UserStore>,
 285        cx: &mut AppContext,
 286    ) -> Task<Result<ModelHandle<Self>>> {
 287        cx.spawn(|cx| async move {
 288            Self::from_join_response(
 289                client.request(proto::JoinChannel { channel_id }).await?,
 290                client,
 291                user_store,
 292                cx,
 293            )
 294        })
 295    }
 296
 297    pub(crate) fn join(
 298        call: &IncomingCall,
 299        client: Arc<Client>,
 300        user_store: ModelHandle<UserStore>,
 301        cx: &mut AppContext,
 302    ) -> Task<Result<ModelHandle<Self>>> {
 303        let id = call.room_id;
 304        cx.spawn(|cx| async move {
 305            Self::from_join_response(
 306                client.request(proto::JoinRoom { id }).await?,
 307                client,
 308                user_store,
 309                cx,
 310            )
 311        })
 312    }
 313
 314    pub fn mute_on_join(cx: &AppContext) -> bool {
 315        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 316    }
 317
 318    fn from_join_response(
 319        response: proto::JoinRoomResponse,
 320        client: Arc<Client>,
 321        user_store: ModelHandle<UserStore>,
 322        mut cx: AsyncAppContext,
 323    ) -> Result<ModelHandle<Self>> {
 324        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 325        let room = cx.add_model(|cx| {
 326            Self::new(
 327                room_proto.id,
 328                response.channel_id,
 329                response.live_kit_connection_info,
 330                client,
 331                user_store,
 332                cx,
 333            )
 334        });
 335        room.update(&mut cx, |room, cx| {
 336            room.leave_when_empty = room.channel_id.is_none();
 337            room.apply_room_update(room_proto, cx)?;
 338            anyhow::Ok(())
 339        })?;
 340        Ok(room)
 341    }
 342
 343    fn should_leave(&self) -> bool {
 344        self.leave_when_empty
 345            && self.pending_room_update.is_none()
 346            && self.pending_participants.is_empty()
 347            && self.remote_participants.is_empty()
 348            && self.pending_call_count == 0
 349    }
 350
 351    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 352        cx.notify();
 353        cx.emit(Event::Left);
 354        self.leave_internal(cx)
 355    }
 356
 357    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 358        if self.status.is_offline() {
 359            return Task::ready(Err(anyhow!("room is offline")));
 360        }
 361
 362        log::info!("leaving room");
 363        Audio::play_sound(Sound::Leave, cx);
 364
 365        self.clear_state(cx);
 366
 367        let leave_room = self.client.request(proto::LeaveRoom {});
 368        cx.background().spawn(async move {
 369            leave_room.await?;
 370            anyhow::Ok(())
 371        })
 372    }
 373
 374    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 375        for project in self.shared_projects.drain() {
 376            if let Some(project) = project.upgrade(cx) {
 377                project.update(cx, |project, cx| {
 378                    project.unshare(cx).log_err();
 379                });
 380            }
 381        }
 382        for project in self.joined_projects.drain() {
 383            if let Some(project) = project.upgrade(cx) {
 384                project.update(cx, |project, cx| {
 385                    project.disconnected_from_host(cx);
 386                    project.close(cx);
 387                });
 388            }
 389        }
 390
 391        self.status = RoomStatus::Offline;
 392        self.remote_participants.clear();
 393        self.pending_participants.clear();
 394        self.participant_user_ids.clear();
 395        self.subscriptions.clear();
 396        self.live_kit.take();
 397        self.pending_room_update.take();
 398        self.maintain_connection.take();
 399    }
 400
 401    async fn maintain_connection(
 402        this: WeakModelHandle<Self>,
 403        client: Arc<Client>,
 404        mut cx: AsyncAppContext,
 405    ) -> Result<()> {
 406        let mut client_status = client.status();
 407        loop {
 408            let _ = client_status.try_recv();
 409            let is_connected = client_status.borrow().is_connected();
 410            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 411            if !is_connected || client_status.next().await.is_some() {
 412                log::info!("detected client disconnection");
 413
 414                this.upgrade(&cx)
 415                    .ok_or_else(|| anyhow!("room was dropped"))?
 416                    .update(&mut cx, |this, cx| {
 417                        this.status = RoomStatus::Rejoining;
 418                        cx.notify();
 419                    });
 420
 421                // Wait for client to re-establish a connection to the server.
 422                {
 423                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 424                    let client_reconnection = async {
 425                        let mut remaining_attempts = 3;
 426                        while remaining_attempts > 0 {
 427                            if client_status.borrow().is_connected() {
 428                                log::info!("client reconnected, attempting to rejoin room");
 429
 430                                let Some(this) = this.upgrade(&cx) else { break };
 431                                if this
 432                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 433                                    .await
 434                                    .log_err()
 435                                    .is_some()
 436                                {
 437                                    return true;
 438                                } else {
 439                                    remaining_attempts -= 1;
 440                                }
 441                            } else if client_status.borrow().is_signed_out() {
 442                                return false;
 443                            }
 444
 445                            log::info!(
 446                                "waiting for client status change, remaining attempts {}",
 447                                remaining_attempts
 448                            );
 449                            client_status.next().await;
 450                        }
 451                        false
 452                    }
 453                    .fuse();
 454                    futures::pin_mut!(client_reconnection);
 455
 456                    futures::select_biased! {
 457                        reconnected = client_reconnection => {
 458                            if reconnected {
 459                                log::info!("successfully reconnected to room");
 460                                // If we successfully joined the room, go back around the loop
 461                                // waiting for future connection status changes.
 462                                continue;
 463                            }
 464                        }
 465                        _ = reconnection_timeout => {
 466                            log::info!("room reconnection timeout expired");
 467                        }
 468                    }
 469                }
 470
 471                break;
 472            }
 473        }
 474
 475        // The client failed to re-establish a connection to the server
 476        // or an error occurred while trying to re-join the room. Either way
 477        // we leave the room and return an error.
 478        if let Some(this) = this.upgrade(&cx) {
 479            log::info!("reconnection failed, leaving room");
 480            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 481        }
 482        Err(anyhow!(
 483            "can't reconnect to room: client failed to re-establish connection"
 484        ))
 485    }
 486
 487    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 488        let mut projects = HashMap::default();
 489        let mut reshared_projects = Vec::new();
 490        let mut rejoined_projects = Vec::new();
 491        self.shared_projects.retain(|project| {
 492            if let Some(handle) = project.upgrade(cx) {
 493                let project = handle.read(cx);
 494                if let Some(project_id) = project.remote_id() {
 495                    projects.insert(project_id, handle.clone());
 496                    reshared_projects.push(proto::UpdateProject {
 497                        project_id,
 498                        worktrees: project.worktree_metadata_protos(cx),
 499                    });
 500                    return true;
 501                }
 502            }
 503            false
 504        });
 505        self.joined_projects.retain(|project| {
 506            if let Some(handle) = project.upgrade(cx) {
 507                let project = handle.read(cx);
 508                if let Some(project_id) = project.remote_id() {
 509                    projects.insert(project_id, handle.clone());
 510                    rejoined_projects.push(proto::RejoinProject {
 511                        id: project_id,
 512                        worktrees: project
 513                            .worktrees(cx)
 514                            .map(|worktree| {
 515                                let worktree = worktree.read(cx);
 516                                proto::RejoinWorktree {
 517                                    id: worktree.id().to_proto(),
 518                                    scan_id: worktree.completed_scan_id() as u64,
 519                                }
 520                            })
 521                            .collect(),
 522                    });
 523                }
 524                return true;
 525            }
 526            false
 527        });
 528
 529        let response = self.client.request_envelope(proto::RejoinRoom {
 530            id: self.id,
 531            reshared_projects,
 532            rejoined_projects,
 533        });
 534
 535        cx.spawn(|this, mut cx| async move {
 536            let response = response.await?;
 537            let message_id = response.message_id;
 538            let response = response.payload;
 539            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 540            this.update(&mut cx, |this, cx| {
 541                this.status = RoomStatus::Online;
 542                this.apply_room_update(room_proto, cx)?;
 543
 544                for reshared_project in response.reshared_projects {
 545                    if let Some(project) = projects.get(&reshared_project.id) {
 546                        project.update(cx, |project, cx| {
 547                            project.reshared(reshared_project, cx).log_err();
 548                        });
 549                    }
 550                }
 551
 552                for rejoined_project in response.rejoined_projects {
 553                    if let Some(project) = projects.get(&rejoined_project.id) {
 554                        project.update(cx, |project, cx| {
 555                            project.rejoined(rejoined_project, message_id, cx).log_err();
 556                        });
 557                    }
 558                }
 559
 560                anyhow::Ok(())
 561            })
 562        })
 563    }
 564
 565    pub fn id(&self) -> u64 {
 566        self.id
 567    }
 568
 569    pub fn status(&self) -> RoomStatus {
 570        self.status
 571    }
 572
 573    pub fn local_participant(&self) -> &LocalParticipant {
 574        &self.local_participant
 575    }
 576
 577    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 578        &self.remote_participants
 579    }
 580
 581    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 582        self.remote_participants
 583            .values()
 584            .find(|p| p.peer_id == peer_id)
 585    }
 586
 587    pub fn pending_participants(&self) -> &[Arc<User>] {
 588        &self.pending_participants
 589    }
 590
 591    pub fn contains_participant(&self, user_id: u64) -> bool {
 592        self.participant_user_ids.contains(&user_id)
 593    }
 594
 595    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 596        self.follows_by_leader_id_project_id
 597            .get(&(leader_id, project_id))
 598            .map_or(&[], |v| v.as_slice())
 599    }
 600
 601    /// Returns the most 'active' projects, defined as most people in the project
 602    pub fn most_active_project(&self) -> Option<(u64, u64)> {
 603        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 604        for participant in self.remote_participants.values() {
 605            match participant.location {
 606                ParticipantLocation::SharedProject { project_id } => {
 607                    project_hosts_and_guest_counts
 608                        .entry(project_id)
 609                        .or_default()
 610                        .1 += 1;
 611                }
 612                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 613            }
 614            for project in &participant.projects {
 615                project_hosts_and_guest_counts
 616                    .entry(project.id)
 617                    .or_default()
 618                    .0 = Some(participant.user.id);
 619            }
 620        }
 621
 622        project_hosts_and_guest_counts
 623            .into_iter()
 624            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 625            .max_by_key(|(_, _, guest_count)| *guest_count)
 626            .map(|(id, host, _)| (id, host))
 627    }
 628
 629    async fn handle_room_updated(
 630        this: ModelHandle<Self>,
 631        envelope: TypedEnvelope<proto::RoomUpdated>,
 632        _: Arc<Client>,
 633        mut cx: AsyncAppContext,
 634    ) -> Result<()> {
 635        let room = envelope
 636            .payload
 637            .room
 638            .ok_or_else(|| anyhow!("invalid room"))?;
 639        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 640    }
 641
 642    fn apply_room_update(
 643        &mut self,
 644        mut room: proto::Room,
 645        cx: &mut ModelContext<Self>,
 646    ) -> Result<()> {
 647        // Filter ourselves out from the room's participants.
 648        let local_participant_ix = room
 649            .participants
 650            .iter()
 651            .position(|participant| Some(participant.user_id) == self.client.user_id());
 652        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 653
 654        let pending_participant_user_ids = room
 655            .pending_participants
 656            .iter()
 657            .map(|p| p.user_id)
 658            .collect::<Vec<_>>();
 659
 660        let remote_participant_user_ids = room
 661            .participants
 662            .iter()
 663            .map(|p| p.user_id)
 664            .collect::<Vec<_>>();
 665
 666        let (remote_participants, pending_participants) =
 667            self.user_store.update(cx, move |user_store, cx| {
 668                (
 669                    user_store.get_users(remote_participant_user_ids, cx),
 670                    user_store.get_users(pending_participant_user_ids, cx),
 671                )
 672            });
 673
 674        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 675            let (remote_participants, pending_participants) =
 676                futures::join!(remote_participants, pending_participants);
 677
 678            this.update(&mut cx, |this, cx| {
 679                this.participant_user_ids.clear();
 680
 681                if let Some(participant) = local_participant {
 682                    this.local_participant.projects = participant.projects;
 683                } else {
 684                    this.local_participant.projects.clear();
 685                }
 686
 687                if let Some(participants) = remote_participants.log_err() {
 688                    for (participant, user) in room.participants.into_iter().zip(participants) {
 689                        let Some(peer_id) = participant.peer_id else {
 690                            continue;
 691                        };
 692                        let participant_index = ParticipantIndex(participant.participant_index);
 693                        this.participant_user_ids.insert(participant.user_id);
 694
 695                        let old_projects = this
 696                            .remote_participants
 697                            .get(&participant.user_id)
 698                            .into_iter()
 699                            .flat_map(|existing| &existing.projects)
 700                            .map(|project| project.id)
 701                            .collect::<HashSet<_>>();
 702                        let new_projects = participant
 703                            .projects
 704                            .iter()
 705                            .map(|project| project.id)
 706                            .collect::<HashSet<_>>();
 707
 708                        for project in &participant.projects {
 709                            if !old_projects.contains(&project.id) {
 710                                cx.emit(Event::RemoteProjectShared {
 711                                    owner: user.clone(),
 712                                    project_id: project.id,
 713                                    worktree_root_names: project.worktree_root_names.clone(),
 714                                });
 715                            }
 716                        }
 717
 718                        for unshared_project_id in old_projects.difference(&new_projects) {
 719                            this.joined_projects.retain(|project| {
 720                                if let Some(project) = project.upgrade(cx) {
 721                                    project.update(cx, |project, cx| {
 722                                        if project.remote_id() == Some(*unshared_project_id) {
 723                                            project.disconnected_from_host(cx);
 724                                            false
 725                                        } else {
 726                                            true
 727                                        }
 728                                    })
 729                                } else {
 730                                    false
 731                                }
 732                            });
 733                            cx.emit(Event::RemoteProjectUnshared {
 734                                project_id: *unshared_project_id,
 735                            });
 736                        }
 737
 738                        let location = ParticipantLocation::from_proto(participant.location)
 739                            .unwrap_or(ParticipantLocation::External);
 740                        if let Some(remote_participant) =
 741                            this.remote_participants.get_mut(&participant.user_id)
 742                        {
 743                            remote_participant.peer_id = peer_id;
 744                            remote_participant.projects = participant.projects;
 745                            remote_participant.participant_index = participant_index;
 746                            if location != remote_participant.location {
 747                                remote_participant.location = location;
 748                                cx.emit(Event::ParticipantLocationChanged {
 749                                    participant_id: peer_id,
 750                                });
 751                            }
 752                        } else {
 753                            this.remote_participants.insert(
 754                                participant.user_id,
 755                                RemoteParticipant {
 756                                    user: user.clone(),
 757                                    participant_index,
 758                                    peer_id,
 759                                    projects: participant.projects,
 760                                    location,
 761                                    muted: true,
 762                                    speaking: false,
 763                                    video_tracks: Default::default(),
 764                                    audio_tracks: Default::default(),
 765                                },
 766                            );
 767
 768                            Audio::play_sound(Sound::Joined, cx);
 769
 770                            if let Some(live_kit) = this.live_kit.as_ref() {
 771                                let video_tracks =
 772                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 773                                let audio_tracks =
 774                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 775                                let publications = live_kit
 776                                    .room
 777                                    .remote_audio_track_publications(&user.id.to_string());
 778
 779                                for track in video_tracks {
 780                                    this.remote_video_track_updated(
 781                                        RemoteVideoTrackUpdate::Subscribed(track),
 782                                        cx,
 783                                    )
 784                                    .log_err();
 785                                }
 786
 787                                for (track, publication) in
 788                                    audio_tracks.iter().zip(publications.iter())
 789                                {
 790                                    this.remote_audio_track_updated(
 791                                        RemoteAudioTrackUpdate::Subscribed(
 792                                            track.clone(),
 793                                            publication.clone(),
 794                                        ),
 795                                        cx,
 796                                    )
 797                                    .log_err();
 798                                }
 799                            }
 800                        }
 801                    }
 802
 803                    this.remote_participants.retain(|user_id, participant| {
 804                        if this.participant_user_ids.contains(user_id) {
 805                            true
 806                        } else {
 807                            for project in &participant.projects {
 808                                cx.emit(Event::RemoteProjectUnshared {
 809                                    project_id: project.id,
 810                                });
 811                            }
 812                            false
 813                        }
 814                    });
 815                }
 816
 817                if let Some(pending_participants) = pending_participants.log_err() {
 818                    this.pending_participants = pending_participants;
 819                    for participant in &this.pending_participants {
 820                        this.participant_user_ids.insert(participant.id);
 821                    }
 822                }
 823
 824                this.follows_by_leader_id_project_id.clear();
 825                for follower in room.followers {
 826                    let project_id = follower.project_id;
 827                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 828                        (Some(leader), Some(follower)) => (leader, follower),
 829
 830                        _ => {
 831                            log::error!("Follower message {follower:?} missing some state");
 832                            continue;
 833                        }
 834                    };
 835
 836                    let list = this
 837                        .follows_by_leader_id_project_id
 838                        .entry((leader, project_id))
 839                        .or_insert(Vec::new());
 840                    if !list.contains(&follower) {
 841                        list.push(follower);
 842                    }
 843                }
 844
 845                this.pending_room_update.take();
 846                if this.should_leave() {
 847                    log::info!("room is empty, leaving");
 848                    let _ = this.leave(cx);
 849                }
 850
 851                this.user_store.update(cx, |user_store, cx| {
 852                    let participant_indices_by_user_id = this
 853                        .remote_participants
 854                        .iter()
 855                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 856                        .collect();
 857                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 858                });
 859
 860                this.check_invariants();
 861                cx.notify();
 862            });
 863        }));
 864
 865        cx.notify();
 866        Ok(())
 867    }
 868
 869    fn remote_video_track_updated(
 870        &mut self,
 871        change: RemoteVideoTrackUpdate,
 872        cx: &mut ModelContext<Self>,
 873    ) -> Result<()> {
 874        match change {
 875            RemoteVideoTrackUpdate::Subscribed(track) => {
 876                let user_id = track.publisher_id().parse()?;
 877                let track_id = track.sid().to_string();
 878                let participant = self
 879                    .remote_participants
 880                    .get_mut(&user_id)
 881                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 882                participant.video_tracks.insert(
 883                    track_id.clone(),
 884                    Arc::new(RemoteVideoTrack {
 885                        live_kit_track: track,
 886                    }),
 887                );
 888                cx.emit(Event::RemoteVideoTracksChanged {
 889                    participant_id: participant.peer_id,
 890                });
 891            }
 892            RemoteVideoTrackUpdate::Unsubscribed {
 893                publisher_id,
 894                track_id,
 895            } => {
 896                let user_id = publisher_id.parse()?;
 897                let participant = self
 898                    .remote_participants
 899                    .get_mut(&user_id)
 900                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 901                participant.video_tracks.remove(&track_id);
 902                cx.emit(Event::RemoteVideoTracksChanged {
 903                    participant_id: participant.peer_id,
 904                });
 905            }
 906        }
 907
 908        cx.notify();
 909        Ok(())
 910    }
 911
 912    fn remote_audio_track_updated(
 913        &mut self,
 914        change: RemoteAudioTrackUpdate,
 915        cx: &mut ModelContext<Self>,
 916    ) -> Result<()> {
 917        match change {
 918            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 919                let mut speaker_ids = speakers
 920                    .into_iter()
 921                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 922                    .collect::<Vec<u64>>();
 923                speaker_ids.sort_unstable();
 924                for (sid, participant) in &mut self.remote_participants {
 925                    if let Ok(_) = speaker_ids.binary_search(sid) {
 926                        participant.speaking = true;
 927                    } else {
 928                        participant.speaking = false;
 929                    }
 930                }
 931                if let Some(id) = self.client.user_id() {
 932                    if let Some(room) = &mut self.live_kit {
 933                        if let Ok(_) = speaker_ids.binary_search(&id) {
 934                            room.speaking = true;
 935                        } else {
 936                            room.speaking = false;
 937                        }
 938                    }
 939                }
 940                cx.notify();
 941            }
 942            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 943                let mut found = false;
 944                for participant in &mut self.remote_participants.values_mut() {
 945                    for track in participant.audio_tracks.values() {
 946                        if track.sid() == track_id {
 947                            found = true;
 948                            break;
 949                        }
 950                    }
 951                    if found {
 952                        participant.muted = muted;
 953                        break;
 954                    }
 955                }
 956
 957                cx.notify();
 958            }
 959            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 960                let user_id = track.publisher_id().parse()?;
 961                let track_id = track.sid().to_string();
 962                let participant = self
 963                    .remote_participants
 964                    .get_mut(&user_id)
 965                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 966
 967                participant.audio_tracks.insert(track_id.clone(), track);
 968                participant.muted = publication.is_muted();
 969
 970                cx.emit(Event::RemoteAudioTracksChanged {
 971                    participant_id: participant.peer_id,
 972                });
 973            }
 974            RemoteAudioTrackUpdate::Unsubscribed {
 975                publisher_id,
 976                track_id,
 977            } => {
 978                let user_id = publisher_id.parse()?;
 979                let participant = self
 980                    .remote_participants
 981                    .get_mut(&user_id)
 982                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 983                participant.audio_tracks.remove(&track_id);
 984                cx.emit(Event::RemoteAudioTracksChanged {
 985                    participant_id: participant.peer_id,
 986                });
 987            }
 988        }
 989
 990        cx.notify();
 991        Ok(())
 992    }
 993
 994    fn check_invariants(&self) {
 995        #[cfg(any(test, feature = "test-support"))]
 996        {
 997            for participant in self.remote_participants.values() {
 998                assert!(self.participant_user_ids.contains(&participant.user.id));
 999                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1000            }
1001
1002            for participant in &self.pending_participants {
1003                assert!(self.participant_user_ids.contains(&participant.id));
1004                assert_ne!(participant.id, self.client.user_id().unwrap());
1005            }
1006
1007            assert_eq!(
1008                self.participant_user_ids.len(),
1009                self.remote_participants.len() + self.pending_participants.len()
1010            );
1011        }
1012    }
1013
1014    pub(crate) fn call(
1015        &mut self,
1016        called_user_id: u64,
1017        initial_project_id: Option<u64>,
1018        cx: &mut ModelContext<Self>,
1019    ) -> Task<Result<()>> {
1020        if self.status.is_offline() {
1021            return Task::ready(Err(anyhow!("room is offline")));
1022        }
1023
1024        cx.notify();
1025        let client = self.client.clone();
1026        let room_id = self.id;
1027        self.pending_call_count += 1;
1028        cx.spawn(|this, mut cx| async move {
1029            let result = client
1030                .request(proto::Call {
1031                    room_id,
1032                    called_user_id,
1033                    initial_project_id,
1034                })
1035                .await;
1036            this.update(&mut cx, |this, cx| {
1037                this.pending_call_count -= 1;
1038                if this.should_leave() {
1039                    this.leave(cx).detach_and_log_err(cx);
1040                }
1041            });
1042            result?;
1043            Ok(())
1044        })
1045    }
1046
1047    pub fn join_project(
1048        &mut self,
1049        id: u64,
1050        language_registry: Arc<LanguageRegistry>,
1051        fs: Arc<dyn Fs>,
1052        cx: &mut ModelContext<Self>,
1053    ) -> Task<Result<ModelHandle<Project>>> {
1054        let client = self.client.clone();
1055        let user_store = self.user_store.clone();
1056        cx.emit(Event::RemoteProjectJoined { project_id: id });
1057        cx.spawn(|this, mut cx| async move {
1058            let project =
1059                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1060
1061            this.update(&mut cx, |this, cx| {
1062                this.joined_projects.retain(|project| {
1063                    if let Some(project) = project.upgrade(cx) {
1064                        !project.read(cx).is_read_only()
1065                    } else {
1066                        false
1067                    }
1068                });
1069                this.joined_projects.insert(project.downgrade());
1070            });
1071            Ok(project)
1072        })
1073    }
1074
1075    pub(crate) fn share_project(
1076        &mut self,
1077        project: ModelHandle<Project>,
1078        cx: &mut ModelContext<Self>,
1079    ) -> Task<Result<u64>> {
1080        if let Some(project_id) = project.read(cx).remote_id() {
1081            return Task::ready(Ok(project_id));
1082        }
1083
1084        let request = self.client.request(proto::ShareProject {
1085            room_id: self.id(),
1086            worktrees: project.read(cx).worktree_metadata_protos(cx),
1087        });
1088        cx.spawn(|this, mut cx| async move {
1089            let response = request.await?;
1090
1091            project.update(&mut cx, |project, cx| {
1092                project.shared(response.project_id, cx)
1093            })?;
1094
1095            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1096            this.update(&mut cx, |this, cx| {
1097                this.shared_projects.insert(project.downgrade());
1098                let active_project = this.local_participant.active_project.as_ref();
1099                if active_project.map_or(false, |location| *location == project) {
1100                    this.set_location(Some(&project), cx)
1101                } else {
1102                    Task::ready(Ok(()))
1103                }
1104            })
1105            .await?;
1106
1107            Ok(response.project_id)
1108        })
1109    }
1110
1111    pub(crate) fn unshare_project(
1112        &mut self,
1113        project: ModelHandle<Project>,
1114        cx: &mut ModelContext<Self>,
1115    ) -> Result<()> {
1116        let project_id = match project.read(cx).remote_id() {
1117            Some(project_id) => project_id,
1118            None => return Ok(()),
1119        };
1120
1121        self.client.send(proto::UnshareProject { project_id })?;
1122        project.update(cx, |this, cx| this.unshare(cx))
1123    }
1124
1125    pub(crate) fn set_location(
1126        &mut self,
1127        project: Option<&ModelHandle<Project>>,
1128        cx: &mut ModelContext<Self>,
1129    ) -> Task<Result<()>> {
1130        if self.status.is_offline() {
1131            return Task::ready(Err(anyhow!("room is offline")));
1132        }
1133
1134        let client = self.client.clone();
1135        let room_id = self.id;
1136        let location = if let Some(project) = project {
1137            self.local_participant.active_project = Some(project.downgrade());
1138            if let Some(project_id) = project.read(cx).remote_id() {
1139                proto::participant_location::Variant::SharedProject(
1140                    proto::participant_location::SharedProject { id: project_id },
1141                )
1142            } else {
1143                proto::participant_location::Variant::UnsharedProject(
1144                    proto::participant_location::UnsharedProject {},
1145                )
1146            }
1147        } else {
1148            self.local_participant.active_project = None;
1149            proto::participant_location::Variant::External(proto::participant_location::External {})
1150        };
1151
1152        cx.notify();
1153        cx.foreground().spawn(async move {
1154            client
1155                .request(proto::UpdateParticipantLocation {
1156                    room_id,
1157                    location: Some(proto::ParticipantLocation {
1158                        variant: Some(location),
1159                    }),
1160                })
1161                .await?;
1162            Ok(())
1163        })
1164    }
1165
1166    pub fn is_screen_sharing(&self) -> bool {
1167        self.live_kit.as_ref().map_or(false, |live_kit| {
1168            !matches!(live_kit.screen_track, LocalTrack::None)
1169        })
1170    }
1171
1172    pub fn is_sharing_mic(&self) -> bool {
1173        self.live_kit.as_ref().map_or(false, |live_kit| {
1174            !matches!(live_kit.microphone_track, LocalTrack::None)
1175        })
1176    }
1177
1178    pub fn is_muted(&self, cx: &AppContext) -> bool {
1179        self.live_kit
1180            .as_ref()
1181            .and_then(|live_kit| match &live_kit.microphone_track {
1182                LocalTrack::None => Some(Self::mute_on_join(cx)),
1183                LocalTrack::Pending { muted, .. } => Some(*muted),
1184                LocalTrack::Published { muted, .. } => Some(*muted),
1185            })
1186            .unwrap_or(false)
1187    }
1188
1189    pub fn is_speaking(&self) -> bool {
1190        self.live_kit
1191            .as_ref()
1192            .map_or(false, |live_kit| live_kit.speaking)
1193    }
1194
1195    pub fn is_deafened(&self) -> Option<bool> {
1196        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1197    }
1198
1199    #[track_caller]
1200    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1201        if self.status.is_offline() {
1202            return Task::ready(Err(anyhow!("room is offline")));
1203        } else if self.is_sharing_mic() {
1204            return Task::ready(Err(anyhow!("microphone was already shared")));
1205        }
1206
1207        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1208            let publish_id = post_inc(&mut live_kit.next_publish_id);
1209            live_kit.microphone_track = LocalTrack::Pending {
1210                publish_id,
1211                muted: false,
1212            };
1213            cx.notify();
1214            publish_id
1215        } else {
1216            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1217        };
1218
1219        cx.spawn_weak(|this, mut cx| async move {
1220            let publish_track = async {
1221                let track = LocalAudioTrack::create();
1222                this.upgrade(&cx)
1223                    .ok_or_else(|| anyhow!("room was dropped"))?
1224                    .read_with(&cx, |this, _| {
1225                        this.live_kit
1226                            .as_ref()
1227                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1228                    })
1229                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1230                    .await
1231            };
1232
1233            let publication = publish_track.await;
1234            this.upgrade(&cx)
1235                .ok_or_else(|| anyhow!("room was dropped"))?
1236                .update(&mut cx, |this, cx| {
1237                    let live_kit = this
1238                        .live_kit
1239                        .as_mut()
1240                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1241
1242                    let (canceled, muted) = if let LocalTrack::Pending {
1243                        publish_id: cur_publish_id,
1244                        muted,
1245                    } = &live_kit.microphone_track
1246                    {
1247                        (*cur_publish_id != publish_id, *muted)
1248                    } else {
1249                        (true, false)
1250                    };
1251
1252                    match publication {
1253                        Ok(publication) => {
1254                            if canceled {
1255                                live_kit.room.unpublish_track(publication);
1256                            } else {
1257                                if muted {
1258                                    cx.background().spawn(publication.set_mute(muted)).detach();
1259                                }
1260                                live_kit.microphone_track = LocalTrack::Published {
1261                                    track_publication: publication,
1262                                    muted,
1263                                };
1264                                cx.notify();
1265                            }
1266                            Ok(())
1267                        }
1268                        Err(error) => {
1269                            if canceled {
1270                                Ok(())
1271                            } else {
1272                                live_kit.microphone_track = LocalTrack::None;
1273                                cx.notify();
1274                                Err(error)
1275                            }
1276                        }
1277                    }
1278                })
1279        })
1280    }
1281
1282    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1283        if self.status.is_offline() {
1284            return Task::ready(Err(anyhow!("room is offline")));
1285        } else if self.is_screen_sharing() {
1286            return Task::ready(Err(anyhow!("screen was already shared")));
1287        }
1288
1289        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1290            let publish_id = post_inc(&mut live_kit.next_publish_id);
1291            live_kit.screen_track = LocalTrack::Pending {
1292                publish_id,
1293                muted: false,
1294            };
1295            cx.notify();
1296            (live_kit.room.display_sources(), publish_id)
1297        } else {
1298            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1299        };
1300
1301        cx.spawn_weak(|this, mut cx| async move {
1302            let publish_track = async {
1303                let displays = displays.await?;
1304                let display = displays
1305                    .first()
1306                    .ok_or_else(|| anyhow!("no display found"))?;
1307                let track = LocalVideoTrack::screen_share_for_display(&display);
1308                this.upgrade(&cx)
1309                    .ok_or_else(|| anyhow!("room was dropped"))?
1310                    .read_with(&cx, |this, _| {
1311                        this.live_kit
1312                            .as_ref()
1313                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1314                    })
1315                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1316                    .await
1317            };
1318
1319            let publication = publish_track.await;
1320            this.upgrade(&cx)
1321                .ok_or_else(|| anyhow!("room was dropped"))?
1322                .update(&mut cx, |this, cx| {
1323                    let live_kit = this
1324                        .live_kit
1325                        .as_mut()
1326                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1327
1328                    let (canceled, muted) = if let LocalTrack::Pending {
1329                        publish_id: cur_publish_id,
1330                        muted,
1331                    } = &live_kit.screen_track
1332                    {
1333                        (*cur_publish_id != publish_id, *muted)
1334                    } else {
1335                        (true, false)
1336                    };
1337
1338                    match publication {
1339                        Ok(publication) => {
1340                            if canceled {
1341                                live_kit.room.unpublish_track(publication);
1342                            } else {
1343                                if muted {
1344                                    cx.background().spawn(publication.set_mute(muted)).detach();
1345                                }
1346                                live_kit.screen_track = LocalTrack::Published {
1347                                    track_publication: publication,
1348                                    muted,
1349                                };
1350                                cx.notify();
1351                            }
1352
1353                            Audio::play_sound(Sound::StartScreenshare, cx);
1354
1355                            Ok(())
1356                        }
1357                        Err(error) => {
1358                            if canceled {
1359                                Ok(())
1360                            } else {
1361                                live_kit.screen_track = LocalTrack::None;
1362                                cx.notify();
1363                                Err(error)
1364                            }
1365                        }
1366                    }
1367                })
1368        })
1369    }
1370
1371    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1372        let should_mute = !self.is_muted(cx);
1373        if let Some(live_kit) = self.live_kit.as_mut() {
1374            if matches!(live_kit.microphone_track, LocalTrack::None) {
1375                return Ok(self.share_microphone(cx));
1376            }
1377
1378            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1379            live_kit.muted_by_user = should_mute;
1380
1381            if old_muted == true && live_kit.deafened == true {
1382                if let Some(task) = self.toggle_deafen(cx).ok() {
1383                    task.detach();
1384                }
1385            }
1386
1387            Ok(ret_task)
1388        } else {
1389            Err(anyhow!("LiveKit not started"))
1390        }
1391    }
1392
1393    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1394        if let Some(live_kit) = self.live_kit.as_mut() {
1395            (*live_kit).deafened = !live_kit.deafened;
1396
1397            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1398            // Context notification is sent within set_mute itself.
1399            let mut mute_task = None;
1400            // When deafening, mute user's mic as well.
1401            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1402            if live_kit.deafened || !live_kit.muted_by_user {
1403                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1404            };
1405            for participant in self.remote_participants.values() {
1406                for track in live_kit
1407                    .room
1408                    .remote_audio_track_publications(&participant.user.id.to_string())
1409                {
1410                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1411                }
1412            }
1413
1414            Ok(cx.foreground().spawn(async move {
1415                if let Some(mute_task) = mute_task {
1416                    mute_task.await?;
1417                }
1418                for task in tasks {
1419                    task.await?;
1420                }
1421                Ok(())
1422            }))
1423        } else {
1424            Err(anyhow!("LiveKit not started"))
1425        }
1426    }
1427
1428    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1429        if self.status.is_offline() {
1430            return Err(anyhow!("room is offline"));
1431        }
1432
1433        let live_kit = self
1434            .live_kit
1435            .as_mut()
1436            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1437        match mem::take(&mut live_kit.screen_track) {
1438            LocalTrack::None => Err(anyhow!("screen was not shared")),
1439            LocalTrack::Pending { .. } => {
1440                cx.notify();
1441                Ok(())
1442            }
1443            LocalTrack::Published {
1444                track_publication, ..
1445            } => {
1446                live_kit.room.unpublish_track(track_publication);
1447                cx.notify();
1448
1449                Audio::play_sound(Sound::StopScreenshare, cx);
1450                Ok(())
1451            }
1452        }
1453    }
1454
1455    #[cfg(any(test, feature = "test-support"))]
1456    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1457        self.live_kit
1458            .as_ref()
1459            .unwrap()
1460            .room
1461            .set_display_sources(sources);
1462    }
1463}
1464
1465struct LiveKitRoom {
1466    room: Arc<live_kit_client::Room>,
1467    screen_track: LocalTrack,
1468    microphone_track: LocalTrack,
1469    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1470    muted_by_user: bool,
1471    deafened: bool,
1472    speaking: bool,
1473    next_publish_id: usize,
1474    _maintain_room: Task<()>,
1475    _maintain_tracks: [Task<()>; 2],
1476}
1477
1478impl LiveKitRoom {
1479    fn set_mute(
1480        self: &mut LiveKitRoom,
1481        should_mute: bool,
1482        cx: &mut ModelContext<Room>,
1483    ) -> Result<(Task<Result<()>>, bool)> {
1484        if !should_mute {
1485            // clear user muting state.
1486            self.muted_by_user = false;
1487        }
1488
1489        let (result, old_muted) = match &mut self.microphone_track {
1490            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1491            LocalTrack::Pending { muted, .. } => {
1492                let old_muted = *muted;
1493                *muted = should_mute;
1494                cx.notify();
1495                Ok((Task::Ready(Some(Ok(()))), old_muted))
1496            }
1497            LocalTrack::Published {
1498                track_publication,
1499                muted,
1500            } => {
1501                let old_muted = *muted;
1502                *muted = should_mute;
1503                cx.notify();
1504                Ok((
1505                    cx.background().spawn(track_publication.set_mute(*muted)),
1506                    old_muted,
1507                ))
1508            }
1509        }?;
1510
1511        if old_muted != should_mute {
1512            if should_mute {
1513                Audio::play_sound(Sound::Mute, cx);
1514            } else {
1515                Audio::play_sound(Sound::Unmute, cx);
1516            }
1517        }
1518
1519        Ok((result, old_muted))
1520    }
1521}
1522
1523enum LocalTrack {
1524    None,
1525    Pending {
1526        publish_id: usize,
1527        muted: bool,
1528    },
1529    Published {
1530        track_publication: LocalTrackPublication,
1531        muted: bool,
1532    },
1533}
1534
1535impl Default for LocalTrack {
1536    fn default() -> Self {
1537        Self::None
1538    }
1539}
1540
1541#[derive(Copy, Clone, PartialEq, Eq)]
1542pub enum RoomStatus {
1543    Online,
1544    Rejoining,
1545    Offline,
1546}
1547
1548impl RoomStatus {
1549    pub fn is_offline(&self) -> bool {
1550        matches!(self, RoomStatus::Offline)
1551    }
1552
1553    pub fn is_online(&self) -> bool {
1554        matches!(self, RoomStatus::Online)
1555    }
1556}