room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{
  16    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  17};
  18use language::LanguageRegistry;
  19use live_kit_client::{
  20    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  21    RemoteVideoTrackUpdate,
  22};
  23use postage::{sink::Sink, stream::Stream, watch};
  24use project::Project;
  25use settings::Settings;
  26use std::{future::Future, mem, sync::Arc, time::Duration};
  27use util::{post_inc, ResultExt, TryFutureExt};
  28
  29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  30
  31#[derive(Clone, Debug, PartialEq, Eq)]
  32pub enum Event {
  33    ParticipantLocationChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteVideoTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteAudioTracksChanged {
  40        participant_id: proto::PeerId,
  41    },
  42    RemoteProjectShared {
  43        owner: Arc<User>,
  44        project_id: u64,
  45        worktree_root_names: Vec<String>,
  46    },
  47    RemoteProjectUnshared {
  48        project_id: u64,
  49    },
  50    RemoteProjectJoined {
  51        project_id: u64,
  52    },
  53    RemoteProjectInvitationDiscarded {
  54        project_id: u64,
  55    },
  56    Left,
  57}
  58
  59pub struct Room {
  60    id: u64,
  61    channel_id: Option<u64>,
  62    live_kit: Option<LiveKitRoom>,
  63    status: RoomStatus,
  64    shared_projects: HashSet<WeakModel<Project>>,
  65    joined_projects: HashSet<WeakModel<Project>>,
  66    local_participant: LocalParticipant,
  67    remote_participants: BTreeMap<u64, RemoteParticipant>,
  68    pending_participants: Vec<Arc<User>>,
  69    participant_user_ids: HashSet<u64>,
  70    pending_call_count: usize,
  71    leave_when_empty: bool,
  72    client: Arc<Client>,
  73    user_store: Model<UserStore>,
  74    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  75    client_subscriptions: Vec<client::Subscription>,
  76    _subscriptions: Vec<gpui::Subscription>,
  77    room_update_completed_tx: watch::Sender<Option<()>>,
  78    room_update_completed_rx: watch::Receiver<Option<()>>,
  79    pending_room_update: Option<Task<()>>,
  80    maintain_connection: Option<Task<Option<()>>>,
  81}
  82
  83impl EventEmitter for Room {
  84    type Event = Event;
  85}
  86
  87impl Room {
  88    pub fn channel_id(&self) -> Option<u64> {
  89        self.channel_id
  90    }
  91
  92    pub fn is_sharing_project(&self) -> bool {
  93        !self.shared_projects.is_empty()
  94    }
  95
  96    #[cfg(any(test, feature = "test-support"))]
  97    pub fn is_connected(&self) -> bool {
  98        if let Some(live_kit) = self.live_kit.as_ref() {
  99            matches!(
 100                *live_kit.room.status().borrow(),
 101                live_kit_client::ConnectionState::Connected { .. }
 102            )
 103        } else {
 104            false
 105        }
 106    }
 107
 108    fn new(
 109        id: u64,
 110        channel_id: Option<u64>,
 111        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 112        client: Arc<Client>,
 113        user_store: Model<UserStore>,
 114        cx: &mut ModelContext<Self>,
 115    ) -> Self {
 116        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 117            let room = live_kit_client::Room::new();
 118            let mut status = room.status();
 119            // Consume the initial status of the room.
 120            let _ = status.try_recv();
 121            let _maintain_room = cx.spawn(|this, mut cx| async move {
 122                while let Some(status) = status.next().await {
 123                    let this = if let Some(this) = this.upgrade() {
 124                        this
 125                    } else {
 126                        break;
 127                    };
 128
 129                    if status == live_kit_client::ConnectionState::Disconnected {
 130                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 131                            .ok();
 132                        break;
 133                    }
 134                }
 135            });
 136
 137            let _maintain_video_tracks = cx.spawn({
 138                let room = room.clone();
 139                move |this, mut cx| async move {
 140                    let mut track_video_changes = room.remote_video_track_updates();
 141                    while let Some(track_change) = track_video_changes.next().await {
 142                        let this = if let Some(this) = this.upgrade() {
 143                            this
 144                        } else {
 145                            break;
 146                        };
 147
 148                        this.update(&mut cx, |this, cx| {
 149                            this.remote_video_track_updated(track_change, cx).log_err()
 150                        })
 151                        .ok();
 152                    }
 153                }
 154            });
 155
 156            let _maintain_audio_tracks = cx.spawn({
 157                let room = room.clone();
 158                |this, mut cx| async move {
 159                    let mut track_audio_changes = room.remote_audio_track_updates();
 160                    while let Some(track_change) = track_audio_changes.next().await {
 161                        let this = if let Some(this) = this.upgrade() {
 162                            this
 163                        } else {
 164                            break;
 165                        };
 166
 167                        this.update(&mut cx, |this, cx| {
 168                            this.remote_audio_track_updated(track_change, cx).log_err()
 169                        })
 170                        .ok();
 171                    }
 172                }
 173            });
 174
 175            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 176            cx.spawn(|this, mut cx| async move {
 177                connect.await?;
 178
 179                if !cx.update(|cx| Self::mute_on_join(cx))? {
 180                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 181                        .await?;
 182                }
 183
 184                anyhow::Ok(())
 185            })
 186            .detach_and_log_err(cx);
 187
 188            Some(LiveKitRoom {
 189                room,
 190                screen_track: LocalTrack::None,
 191                microphone_track: LocalTrack::None,
 192                next_publish_id: 0,
 193                muted_by_user: false,
 194                deafened: false,
 195                speaking: false,
 196                _maintain_room,
 197                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 198            })
 199        } else {
 200            None
 201        };
 202
 203        let maintain_connection = cx.spawn({
 204            let client = client.clone();
 205            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 206        });
 207
 208        Audio::play_sound(Sound::Joined, cx);
 209
 210        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 211
 212        Self {
 213            id,
 214            channel_id,
 215            live_kit: live_kit_room,
 216            status: RoomStatus::Online,
 217            shared_projects: Default::default(),
 218            joined_projects: Default::default(),
 219            participant_user_ids: Default::default(),
 220            local_participant: Default::default(),
 221            remote_participants: Default::default(),
 222            pending_participants: Default::default(),
 223            pending_call_count: 0,
 224            client_subscriptions: vec![
 225                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 226            ],
 227            _subscriptions: vec![
 228                cx.on_release(Self::released),
 229                cx.on_app_quit(Self::app_will_quit),
 230            ],
 231            leave_when_empty: false,
 232            pending_room_update: None,
 233            client,
 234            user_store,
 235            follows_by_leader_id_project_id: Default::default(),
 236            maintain_connection: Some(maintain_connection),
 237            room_update_completed_tx,
 238            room_update_completed_rx,
 239        }
 240    }
 241
 242    pub(crate) fn create(
 243        called_user_id: u64,
 244        initial_project: Option<Model<Project>>,
 245        client: Arc<Client>,
 246        user_store: Model<UserStore>,
 247        cx: &mut AppContext,
 248    ) -> Task<Result<Model<Self>>> {
 249        cx.spawn(move |mut cx| async move {
 250            let response = client.request(proto::CreateRoom {}).await?;
 251            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 252            let room = cx.build_model(|cx| {
 253                Self::new(
 254                    room_proto.id,
 255                    None,
 256                    response.live_kit_connection_info,
 257                    client,
 258                    user_store,
 259                    cx,
 260                )
 261            })?;
 262
 263            let initial_project_id = if let Some(initial_project) = initial_project {
 264                let initial_project_id = room
 265                    .update(&mut cx, |room, cx| {
 266                        room.share_project(initial_project.clone(), cx)
 267                    })?
 268                    .await?;
 269                Some(initial_project_id)
 270            } else {
 271                None
 272            };
 273
 274            match room
 275                .update(&mut cx, |room, cx| {
 276                    room.leave_when_empty = true;
 277                    room.call(called_user_id, initial_project_id, cx)
 278                })?
 279                .await
 280            {
 281                Ok(()) => Ok(room),
 282                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 283            }
 284        })
 285    }
 286
 287    pub(crate) fn join_channel(
 288        channel_id: u64,
 289        client: Arc<Client>,
 290        user_store: Model<UserStore>,
 291        cx: &mut AppContext,
 292    ) -> Task<Result<Model<Self>>> {
 293        cx.spawn(move |cx| async move {
 294            Self::from_join_response(
 295                client.request(proto::JoinChannel { channel_id }).await?,
 296                client,
 297                user_store,
 298                cx,
 299            )
 300        })
 301    }
 302
 303    pub(crate) fn join(
 304        call: &IncomingCall,
 305        client: Arc<Client>,
 306        user_store: Model<UserStore>,
 307        cx: &mut AppContext,
 308    ) -> Task<Result<Model<Self>>> {
 309        let id = call.room_id;
 310        cx.spawn(move |cx| async move {
 311            Self::from_join_response(
 312                client.request(proto::JoinRoom { id }).await?,
 313                client,
 314                user_store,
 315                cx,
 316            )
 317        })
 318    }
 319
 320    fn released(&mut self, cx: &mut AppContext) {
 321        if self.status.is_online() {
 322            self.leave_internal(cx).detach_and_log_err(cx);
 323        }
 324    }
 325
 326    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 327        let task = if self.status.is_online() {
 328            let leave = self.leave_internal(cx);
 329            Some(cx.background_executor().spawn(async move {
 330                leave.await.log_err();
 331            }))
 332        } else {
 333            None
 334        };
 335
 336        async move {
 337            if let Some(task) = task {
 338                task.await;
 339            }
 340        }
 341    }
 342
 343    pub fn mute_on_join(cx: &AppContext) -> bool {
 344        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 345    }
 346
 347    fn from_join_response(
 348        response: proto::JoinRoomResponse,
 349        client: Arc<Client>,
 350        user_store: Model<UserStore>,
 351        mut cx: AsyncAppContext,
 352    ) -> Result<Model<Self>> {
 353        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 354        let room = cx.build_model(|cx| {
 355            Self::new(
 356                room_proto.id,
 357                response.channel_id,
 358                response.live_kit_connection_info,
 359                client,
 360                user_store,
 361                cx,
 362            )
 363        })?;
 364        room.update(&mut cx, |room, cx| {
 365            room.leave_when_empty = room.channel_id.is_none();
 366            room.apply_room_update(room_proto, cx)?;
 367            anyhow::Ok(())
 368        })??;
 369        Ok(room)
 370    }
 371
 372    fn should_leave(&self) -> bool {
 373        self.leave_when_empty
 374            && self.pending_room_update.is_none()
 375            && self.pending_participants.is_empty()
 376            && self.remote_participants.is_empty()
 377            && self.pending_call_count == 0
 378    }
 379
 380    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 381        cx.notify();
 382        cx.emit(Event::Left);
 383        self.leave_internal(cx)
 384    }
 385
 386    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 387        if self.status.is_offline() {
 388            return Task::ready(Err(anyhow!("room is offline")));
 389        }
 390
 391        log::info!("leaving room");
 392        Audio::play_sound(Sound::Leave, cx);
 393
 394        self.clear_state(cx);
 395
 396        let leave_room = self.client.request(proto::LeaveRoom {});
 397        cx.background_executor().spawn(async move {
 398            leave_room.await?;
 399            anyhow::Ok(())
 400        })
 401    }
 402
 403    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 404        for project in self.shared_projects.drain() {
 405            if let Some(project) = project.upgrade() {
 406                project.update(cx, |project, cx| {
 407                    project.unshare(cx).log_err();
 408                });
 409            }
 410        }
 411        for project in self.joined_projects.drain() {
 412            if let Some(project) = project.upgrade() {
 413                project.update(cx, |project, cx| {
 414                    project.disconnected_from_host(cx);
 415                    project.close(cx);
 416                });
 417            }
 418        }
 419
 420        self.status = RoomStatus::Offline;
 421        self.remote_participants.clear();
 422        self.pending_participants.clear();
 423        self.participant_user_ids.clear();
 424        self.client_subscriptions.clear();
 425        self.live_kit.take();
 426        self.pending_room_update.take();
 427        self.maintain_connection.take();
 428    }
 429
 430    async fn maintain_connection(
 431        this: WeakModel<Self>,
 432        client: Arc<Client>,
 433        mut cx: AsyncAppContext,
 434    ) -> Result<()> {
 435        let mut client_status = client.status();
 436        loop {
 437            let _ = client_status.try_recv();
 438            let is_connected = client_status.borrow().is_connected();
 439            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 440            if !is_connected || client_status.next().await.is_some() {
 441                log::info!("detected client disconnection");
 442
 443                this.upgrade()
 444                    .ok_or_else(|| anyhow!("room was dropped"))?
 445                    .update(&mut cx, |this, cx| {
 446                        this.status = RoomStatus::Rejoining;
 447                        cx.notify();
 448                    })?;
 449
 450                // Wait for client to re-establish a connection to the server.
 451                {
 452                    let mut reconnection_timeout =
 453                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 454                    let client_reconnection = async {
 455                        let mut remaining_attempts = 3;
 456                        while remaining_attempts > 0 {
 457                            if client_status.borrow().is_connected() {
 458                                log::info!("client reconnected, attempting to rejoin room");
 459
 460                                let Some(this) = this.upgrade() else { break };
 461                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 462                                    Ok(task) => {
 463                                        if task.await.log_err().is_some() {
 464                                            return true;
 465                                        } else {
 466                                            remaining_attempts -= 1;
 467                                        }
 468                                    }
 469                                    Err(_app_dropped) => return false,
 470                                }
 471                            } else if client_status.borrow().is_signed_out() {
 472                                return false;
 473                            }
 474
 475                            log::info!(
 476                                "waiting for client status change, remaining attempts {}",
 477                                remaining_attempts
 478                            );
 479                            client_status.next().await;
 480                        }
 481                        false
 482                    }
 483                    .fuse();
 484                    futures::pin_mut!(client_reconnection);
 485
 486                    futures::select_biased! {
 487                        reconnected = client_reconnection => {
 488                            if reconnected {
 489                                log::info!("successfully reconnected to room");
 490                                // If we successfully joined the room, go back around the loop
 491                                // waiting for future connection status changes.
 492                                continue;
 493                            }
 494                        }
 495                        _ = reconnection_timeout => {
 496                            log::info!("room reconnection timeout expired");
 497                        }
 498                    }
 499                }
 500
 501                break;
 502            }
 503        }
 504
 505        // The client failed to re-establish a connection to the server
 506        // or an error occurred while trying to re-join the room. Either way
 507        // we leave the room and return an error.
 508        if let Some(this) = this.upgrade() {
 509            log::info!("reconnection failed, leaving room");
 510            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 511        }
 512        Err(anyhow!(
 513            "can't reconnect to room: client failed to re-establish connection"
 514        ))
 515    }
 516
 517    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 518        let mut projects = HashMap::default();
 519        let mut reshared_projects = Vec::new();
 520        let mut rejoined_projects = Vec::new();
 521        self.shared_projects.retain(|project| {
 522            if let Some(handle) = project.upgrade() {
 523                let project = handle.read(cx);
 524                if let Some(project_id) = project.remote_id() {
 525                    projects.insert(project_id, handle.clone());
 526                    reshared_projects.push(proto::UpdateProject {
 527                        project_id,
 528                        worktrees: project.worktree_metadata_protos(cx),
 529                    });
 530                    return true;
 531                }
 532            }
 533            false
 534        });
 535        self.joined_projects.retain(|project| {
 536            if let Some(handle) = project.upgrade() {
 537                let project = handle.read(cx);
 538                if let Some(project_id) = project.remote_id() {
 539                    projects.insert(project_id, handle.clone());
 540                    rejoined_projects.push(proto::RejoinProject {
 541                        id: project_id,
 542                        worktrees: project
 543                            .worktrees()
 544                            .map(|worktree| {
 545                                let worktree = worktree.read(cx);
 546                                proto::RejoinWorktree {
 547                                    id: worktree.id().to_proto(),
 548                                    scan_id: worktree.completed_scan_id() as u64,
 549                                }
 550                            })
 551                            .collect(),
 552                    });
 553                }
 554                return true;
 555            }
 556            false
 557        });
 558
 559        let response = self.client.request_envelope(proto::RejoinRoom {
 560            id: self.id,
 561            reshared_projects,
 562            rejoined_projects,
 563        });
 564
 565        cx.spawn(|this, mut cx| async move {
 566            let response = response.await?;
 567            let message_id = response.message_id;
 568            let response = response.payload;
 569            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 570            this.update(&mut cx, |this, cx| {
 571                this.status = RoomStatus::Online;
 572                this.apply_room_update(room_proto, cx)?;
 573
 574                for reshared_project in response.reshared_projects {
 575                    if let Some(project) = projects.get(&reshared_project.id) {
 576                        project.update(cx, |project, cx| {
 577                            project.reshared(reshared_project, cx).log_err();
 578                        });
 579                    }
 580                }
 581
 582                for rejoined_project in response.rejoined_projects {
 583                    if let Some(project) = projects.get(&rejoined_project.id) {
 584                        project.update(cx, |project, cx| {
 585                            project.rejoined(rejoined_project, message_id, cx).log_err();
 586                        });
 587                    }
 588                }
 589
 590                anyhow::Ok(())
 591            })?
 592        })
 593    }
 594
 595    pub fn id(&self) -> u64 {
 596        self.id
 597    }
 598
 599    pub fn status(&self) -> RoomStatus {
 600        self.status
 601    }
 602
 603    pub fn local_participant(&self) -> &LocalParticipant {
 604        &self.local_participant
 605    }
 606
 607    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 608        &self.remote_participants
 609    }
 610
 611    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 612        self.remote_participants
 613            .values()
 614            .find(|p| p.peer_id == peer_id)
 615    }
 616
 617    pub fn pending_participants(&self) -> &[Arc<User>] {
 618        &self.pending_participants
 619    }
 620
 621    pub fn contains_participant(&self, user_id: u64) -> bool {
 622        self.participant_user_ids.contains(&user_id)
 623    }
 624
 625    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 626        self.follows_by_leader_id_project_id
 627            .get(&(leader_id, project_id))
 628            .map_or(&[], |v| v.as_slice())
 629    }
 630
 631    /// Returns the most 'active' projects, defined as most people in the project
 632    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 633        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 634        for participant in self.remote_participants.values() {
 635            match participant.location {
 636                ParticipantLocation::SharedProject { project_id } => {
 637                    project_hosts_and_guest_counts
 638                        .entry(project_id)
 639                        .or_default()
 640                        .1 += 1;
 641                }
 642                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 643            }
 644            for project in &participant.projects {
 645                project_hosts_and_guest_counts
 646                    .entry(project.id)
 647                    .or_default()
 648                    .0 = Some(participant.user.id);
 649            }
 650        }
 651
 652        if let Some(user) = self.user_store.read(cx).current_user() {
 653            for project in &self.local_participant.projects {
 654                project_hosts_and_guest_counts
 655                    .entry(project.id)
 656                    .or_default()
 657                    .0 = Some(user.id);
 658            }
 659        }
 660
 661        project_hosts_and_guest_counts
 662            .into_iter()
 663            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 664            .max_by_key(|(_, _, guest_count)| *guest_count)
 665            .map(|(id, host, _)| (id, host))
 666    }
 667
 668    async fn handle_room_updated(
 669        this: Model<Self>,
 670        envelope: TypedEnvelope<proto::RoomUpdated>,
 671        _: Arc<Client>,
 672        mut cx: AsyncAppContext,
 673    ) -> Result<()> {
 674        let room = envelope
 675            .payload
 676            .room
 677            .ok_or_else(|| anyhow!("invalid room"))?;
 678        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 679    }
 680
 681    fn apply_room_update(
 682        &mut self,
 683        mut room: proto::Room,
 684        cx: &mut ModelContext<Self>,
 685    ) -> Result<()> {
 686        // Filter ourselves out from the room's participants.
 687        let local_participant_ix = room
 688            .participants
 689            .iter()
 690            .position(|participant| Some(participant.user_id) == self.client.user_id());
 691        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 692
 693        let pending_participant_user_ids = room
 694            .pending_participants
 695            .iter()
 696            .map(|p| p.user_id)
 697            .collect::<Vec<_>>();
 698
 699        let remote_participant_user_ids = room
 700            .participants
 701            .iter()
 702            .map(|p| p.user_id)
 703            .collect::<Vec<_>>();
 704
 705        let (remote_participants, pending_participants) =
 706            self.user_store.update(cx, move |user_store, cx| {
 707                (
 708                    user_store.get_users(remote_participant_user_ids, cx),
 709                    user_store.get_users(pending_participant_user_ids, cx),
 710                )
 711            });
 712
 713        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 714            let (remote_participants, pending_participants) =
 715                futures::join!(remote_participants, pending_participants);
 716
 717            this.update(&mut cx, |this, cx| {
 718                this.participant_user_ids.clear();
 719
 720                if let Some(participant) = local_participant {
 721                    this.local_participant.projects = participant.projects;
 722                } else {
 723                    this.local_participant.projects.clear();
 724                }
 725
 726                if let Some(participants) = remote_participants.log_err() {
 727                    for (participant, user) in room.participants.into_iter().zip(participants) {
 728                        let Some(peer_id) = participant.peer_id else {
 729                            continue;
 730                        };
 731                        let participant_index = ParticipantIndex(participant.participant_index);
 732                        this.participant_user_ids.insert(participant.user_id);
 733
 734                        let old_projects = this
 735                            .remote_participants
 736                            .get(&participant.user_id)
 737                            .into_iter()
 738                            .flat_map(|existing| &existing.projects)
 739                            .map(|project| project.id)
 740                            .collect::<HashSet<_>>();
 741                        let new_projects = participant
 742                            .projects
 743                            .iter()
 744                            .map(|project| project.id)
 745                            .collect::<HashSet<_>>();
 746
 747                        for project in &participant.projects {
 748                            if !old_projects.contains(&project.id) {
 749                                cx.emit(Event::RemoteProjectShared {
 750                                    owner: user.clone(),
 751                                    project_id: project.id,
 752                                    worktree_root_names: project.worktree_root_names.clone(),
 753                                });
 754                            }
 755                        }
 756
 757                        for unshared_project_id in old_projects.difference(&new_projects) {
 758                            this.joined_projects.retain(|project| {
 759                                if let Some(project) = project.upgrade() {
 760                                    project.update(cx, |project, cx| {
 761                                        if project.remote_id() == Some(*unshared_project_id) {
 762                                            project.disconnected_from_host(cx);
 763                                            false
 764                                        } else {
 765                                            true
 766                                        }
 767                                    })
 768                                } else {
 769                                    false
 770                                }
 771                            });
 772                            cx.emit(Event::RemoteProjectUnshared {
 773                                project_id: *unshared_project_id,
 774                            });
 775                        }
 776
 777                        let location = ParticipantLocation::from_proto(participant.location)
 778                            .unwrap_or(ParticipantLocation::External);
 779                        if let Some(remote_participant) =
 780                            this.remote_participants.get_mut(&participant.user_id)
 781                        {
 782                            remote_participant.peer_id = peer_id;
 783                            remote_participant.projects = participant.projects;
 784                            remote_participant.participant_index = participant_index;
 785                            if location != remote_participant.location {
 786                                remote_participant.location = location;
 787                                cx.emit(Event::ParticipantLocationChanged {
 788                                    participant_id: peer_id,
 789                                });
 790                            }
 791                        } else {
 792                            this.remote_participants.insert(
 793                                participant.user_id,
 794                                RemoteParticipant {
 795                                    user: user.clone(),
 796                                    participant_index,
 797                                    peer_id,
 798                                    projects: participant.projects,
 799                                    location,
 800                                    muted: true,
 801                                    speaking: false,
 802                                    video_tracks: Default::default(),
 803                                    audio_tracks: Default::default(),
 804                                },
 805                            );
 806
 807                            Audio::play_sound(Sound::Joined, cx);
 808
 809                            if let Some(live_kit) = this.live_kit.as_ref() {
 810                                let video_tracks =
 811                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 812                                let audio_tracks =
 813                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 814                                let publications = live_kit
 815                                    .room
 816                                    .remote_audio_track_publications(&user.id.to_string());
 817
 818                                for track in video_tracks {
 819                                    this.remote_video_track_updated(
 820                                        RemoteVideoTrackUpdate::Subscribed(track),
 821                                        cx,
 822                                    )
 823                                    .log_err();
 824                                }
 825
 826                                for (track, publication) in
 827                                    audio_tracks.iter().zip(publications.iter())
 828                                {
 829                                    this.remote_audio_track_updated(
 830                                        RemoteAudioTrackUpdate::Subscribed(
 831                                            track.clone(),
 832                                            publication.clone(),
 833                                        ),
 834                                        cx,
 835                                    )
 836                                    .log_err();
 837                                }
 838                            }
 839                        }
 840                    }
 841
 842                    this.remote_participants.retain(|user_id, participant| {
 843                        if this.participant_user_ids.contains(user_id) {
 844                            true
 845                        } else {
 846                            for project in &participant.projects {
 847                                cx.emit(Event::RemoteProjectUnshared {
 848                                    project_id: project.id,
 849                                });
 850                            }
 851                            false
 852                        }
 853                    });
 854                }
 855
 856                if let Some(pending_participants) = pending_participants.log_err() {
 857                    this.pending_participants = pending_participants;
 858                    for participant in &this.pending_participants {
 859                        this.participant_user_ids.insert(participant.id);
 860                    }
 861                }
 862
 863                this.follows_by_leader_id_project_id.clear();
 864                for follower in room.followers {
 865                    let project_id = follower.project_id;
 866                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 867                        (Some(leader), Some(follower)) => (leader, follower),
 868
 869                        _ => {
 870                            log::error!("Follower message {follower:?} missing some state");
 871                            continue;
 872                        }
 873                    };
 874
 875                    let list = this
 876                        .follows_by_leader_id_project_id
 877                        .entry((leader, project_id))
 878                        .or_insert(Vec::new());
 879                    if !list.contains(&follower) {
 880                        list.push(follower);
 881                    }
 882                }
 883
 884                this.pending_room_update.take();
 885                if this.should_leave() {
 886                    log::info!("room is empty, leaving");
 887                    let _ = this.leave(cx);
 888                }
 889
 890                this.user_store.update(cx, |user_store, cx| {
 891                    let participant_indices_by_user_id = this
 892                        .remote_participants
 893                        .iter()
 894                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 895                        .collect();
 896                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 897                });
 898
 899                this.check_invariants();
 900                this.room_update_completed_tx.try_send(Some(())).ok();
 901                cx.notify();
 902            })
 903            .ok();
 904        }));
 905
 906        cx.notify();
 907        Ok(())
 908    }
 909
 910    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 911        let mut done_rx = self.room_update_completed_rx.clone();
 912        async move {
 913            while let Some(result) = done_rx.next().await {
 914                if result.is_some() {
 915                    break;
 916                }
 917            }
 918        }
 919    }
 920
 921    fn remote_video_track_updated(
 922        &mut self,
 923        change: RemoteVideoTrackUpdate,
 924        cx: &mut ModelContext<Self>,
 925    ) -> Result<()> {
 926        match change {
 927            RemoteVideoTrackUpdate::Subscribed(track) => {
 928                let user_id = track.publisher_id().parse()?;
 929                let track_id = track.sid().to_string();
 930                let participant = self
 931                    .remote_participants
 932                    .get_mut(&user_id)
 933                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 934                participant.video_tracks.insert(track_id.clone(), track);
 935                cx.emit(Event::RemoteVideoTracksChanged {
 936                    participant_id: participant.peer_id,
 937                });
 938            }
 939            RemoteVideoTrackUpdate::Unsubscribed {
 940                publisher_id,
 941                track_id,
 942            } => {
 943                let user_id = publisher_id.parse()?;
 944                let participant = self
 945                    .remote_participants
 946                    .get_mut(&user_id)
 947                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 948                participant.video_tracks.remove(&track_id);
 949                cx.emit(Event::RemoteVideoTracksChanged {
 950                    participant_id: participant.peer_id,
 951                });
 952            }
 953        }
 954
 955        cx.notify();
 956        Ok(())
 957    }
 958
 959    fn remote_audio_track_updated(
 960        &mut self,
 961        change: RemoteAudioTrackUpdate,
 962        cx: &mut ModelContext<Self>,
 963    ) -> Result<()> {
 964        match change {
 965            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 966                let mut speaker_ids = speakers
 967                    .into_iter()
 968                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 969                    .collect::<Vec<u64>>();
 970                speaker_ids.sort_unstable();
 971                for (sid, participant) in &mut self.remote_participants {
 972                    if let Ok(_) = speaker_ids.binary_search(sid) {
 973                        participant.speaking = true;
 974                    } else {
 975                        participant.speaking = false;
 976                    }
 977                }
 978                if let Some(id) = self.client.user_id() {
 979                    if let Some(room) = &mut self.live_kit {
 980                        if let Ok(_) = speaker_ids.binary_search(&id) {
 981                            room.speaking = true;
 982                        } else {
 983                            room.speaking = false;
 984                        }
 985                    }
 986                }
 987                cx.notify();
 988            }
 989            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 990                let mut found = false;
 991                for participant in &mut self.remote_participants.values_mut() {
 992                    for track in participant.audio_tracks.values() {
 993                        if track.sid() == track_id {
 994                            found = true;
 995                            break;
 996                        }
 997                    }
 998                    if found {
 999                        participant.muted = muted;
1000                        break;
1001                    }
1002                }
1003
1004                cx.notify();
1005            }
1006            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1007                let user_id = track.publisher_id().parse()?;
1008                let track_id = track.sid().to_string();
1009                let participant = self
1010                    .remote_participants
1011                    .get_mut(&user_id)
1012                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1013                participant.audio_tracks.insert(track_id.clone(), track);
1014                participant.muted = publication.is_muted();
1015
1016                cx.emit(Event::RemoteAudioTracksChanged {
1017                    participant_id: participant.peer_id,
1018                });
1019            }
1020            RemoteAudioTrackUpdate::Unsubscribed {
1021                publisher_id,
1022                track_id,
1023            } => {
1024                let user_id = publisher_id.parse()?;
1025                let participant = self
1026                    .remote_participants
1027                    .get_mut(&user_id)
1028                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1029                participant.audio_tracks.remove(&track_id);
1030                cx.emit(Event::RemoteAudioTracksChanged {
1031                    participant_id: participant.peer_id,
1032                });
1033            }
1034        }
1035
1036        cx.notify();
1037        Ok(())
1038    }
1039
1040    fn check_invariants(&self) {
1041        #[cfg(any(test, feature = "test-support"))]
1042        {
1043            for participant in self.remote_participants.values() {
1044                assert!(self.participant_user_ids.contains(&participant.user.id));
1045                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1046            }
1047
1048            for participant in &self.pending_participants {
1049                assert!(self.participant_user_ids.contains(&participant.id));
1050                assert_ne!(participant.id, self.client.user_id().unwrap());
1051            }
1052
1053            assert_eq!(
1054                self.participant_user_ids.len(),
1055                self.remote_participants.len() + self.pending_participants.len()
1056            );
1057        }
1058    }
1059
1060    pub(crate) fn call(
1061        &mut self,
1062        called_user_id: u64,
1063        initial_project_id: Option<u64>,
1064        cx: &mut ModelContext<Self>,
1065    ) -> Task<Result<()>> {
1066        if self.status.is_offline() {
1067            return Task::ready(Err(anyhow!("room is offline")));
1068        }
1069
1070        cx.notify();
1071        let client = self.client.clone();
1072        let room_id = self.id;
1073        self.pending_call_count += 1;
1074        cx.spawn(move |this, mut cx| async move {
1075            let result = client
1076                .request(proto::Call {
1077                    room_id,
1078                    called_user_id,
1079                    initial_project_id,
1080                })
1081                .await;
1082            this.update(&mut cx, |this, cx| {
1083                this.pending_call_count -= 1;
1084                if this.should_leave() {
1085                    this.leave(cx).detach_and_log_err(cx);
1086                }
1087            })?;
1088            result?;
1089            Ok(())
1090        })
1091    }
1092
1093    pub fn join_project(
1094        &mut self,
1095        id: u64,
1096        language_registry: Arc<LanguageRegistry>,
1097        fs: Arc<dyn Fs>,
1098        cx: &mut ModelContext<Self>,
1099    ) -> Task<Result<Model<Project>>> {
1100        let client = self.client.clone();
1101        let user_store = self.user_store.clone();
1102        cx.emit(Event::RemoteProjectJoined { project_id: id });
1103        cx.spawn(move |this, mut cx| async move {
1104            let project =
1105                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1106
1107            this.update(&mut cx, |this, cx| {
1108                this.joined_projects.retain(|project| {
1109                    if let Some(project) = project.upgrade() {
1110                        !project.read(cx).is_read_only()
1111                    } else {
1112                        false
1113                    }
1114                });
1115                this.joined_projects.insert(project.downgrade());
1116            })?;
1117            Ok(project)
1118        })
1119    }
1120
1121    pub(crate) fn share_project(
1122        &mut self,
1123        project: Model<Project>,
1124        cx: &mut ModelContext<Self>,
1125    ) -> Task<Result<u64>> {
1126        if let Some(project_id) = project.read(cx).remote_id() {
1127            return Task::ready(Ok(project_id));
1128        }
1129
1130        let request = self.client.request(proto::ShareProject {
1131            room_id: self.id(),
1132            worktrees: project.read(cx).worktree_metadata_protos(cx),
1133        });
1134        cx.spawn(|this, mut cx| async move {
1135            let response = request.await?;
1136
1137            project.update(&mut cx, |project, cx| {
1138                project.shared(response.project_id, cx)
1139            })??;
1140
1141            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1142            this.update(&mut cx, |this, cx| {
1143                this.shared_projects.insert(project.downgrade());
1144                let active_project = this.local_participant.active_project.as_ref();
1145                if active_project.map_or(false, |location| *location == project) {
1146                    this.set_location(Some(&project), cx)
1147                } else {
1148                    Task::ready(Ok(()))
1149                }
1150            })?
1151            .await?;
1152
1153            Ok(response.project_id)
1154        })
1155    }
1156
1157    pub(crate) fn unshare_project(
1158        &mut self,
1159        project: Model<Project>,
1160        cx: &mut ModelContext<Self>,
1161    ) -> Result<()> {
1162        let project_id = match project.read(cx).remote_id() {
1163            Some(project_id) => project_id,
1164            None => return Ok(()),
1165        };
1166
1167        self.client.send(proto::UnshareProject { project_id })?;
1168        project.update(cx, |this, cx| this.unshare(cx))
1169    }
1170
1171    pub(crate) fn set_location(
1172        &mut self,
1173        project: Option<&Model<Project>>,
1174        cx: &mut ModelContext<Self>,
1175    ) -> Task<Result<()>> {
1176        if self.status.is_offline() {
1177            return Task::ready(Err(anyhow!("room is offline")));
1178        }
1179
1180        let client = self.client.clone();
1181        let room_id = self.id;
1182        let location = if let Some(project) = project {
1183            self.local_participant.active_project = Some(project.downgrade());
1184            if let Some(project_id) = project.read(cx).remote_id() {
1185                proto::participant_location::Variant::SharedProject(
1186                    proto::participant_location::SharedProject { id: project_id },
1187                )
1188            } else {
1189                proto::participant_location::Variant::UnsharedProject(
1190                    proto::participant_location::UnsharedProject {},
1191                )
1192            }
1193        } else {
1194            self.local_participant.active_project = None;
1195            proto::participant_location::Variant::External(proto::participant_location::External {})
1196        };
1197
1198        cx.notify();
1199        cx.background_executor().spawn(async move {
1200            client
1201                .request(proto::UpdateParticipantLocation {
1202                    room_id,
1203                    location: Some(proto::ParticipantLocation {
1204                        variant: Some(location),
1205                    }),
1206                })
1207                .await?;
1208            Ok(())
1209        })
1210    }
1211
1212    pub fn is_screen_sharing(&self) -> bool {
1213        self.live_kit.as_ref().map_or(false, |live_kit| {
1214            !matches!(live_kit.screen_track, LocalTrack::None)
1215        })
1216    }
1217
1218    pub fn is_sharing_mic(&self) -> bool {
1219        self.live_kit.as_ref().map_or(false, |live_kit| {
1220            !matches!(live_kit.microphone_track, LocalTrack::None)
1221        })
1222    }
1223
1224    pub fn is_muted(&self, cx: &AppContext) -> bool {
1225        self.live_kit
1226            .as_ref()
1227            .and_then(|live_kit| match &live_kit.microphone_track {
1228                LocalTrack::None => Some(Self::mute_on_join(cx)),
1229                LocalTrack::Pending { muted, .. } => Some(*muted),
1230                LocalTrack::Published { muted, .. } => Some(*muted),
1231            })
1232            .unwrap_or(false)
1233    }
1234
1235    pub fn is_speaking(&self) -> bool {
1236        self.live_kit
1237            .as_ref()
1238            .map_or(false, |live_kit| live_kit.speaking)
1239    }
1240
1241    pub fn is_deafened(&self) -> Option<bool> {
1242        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1243    }
1244
1245    #[track_caller]
1246    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1247        if self.status.is_offline() {
1248            return Task::ready(Err(anyhow!("room is offline")));
1249        } else if self.is_sharing_mic() {
1250            return Task::ready(Err(anyhow!("microphone was already shared")));
1251        }
1252
1253        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1254            let publish_id = post_inc(&mut live_kit.next_publish_id);
1255            live_kit.microphone_track = LocalTrack::Pending {
1256                publish_id,
1257                muted: false,
1258            };
1259            cx.notify();
1260            publish_id
1261        } else {
1262            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1263        };
1264
1265        cx.spawn(move |this, mut cx| async move {
1266            let publish_track = async {
1267                let track = LocalAudioTrack::create();
1268                this.upgrade()
1269                    .ok_or_else(|| anyhow!("room was dropped"))?
1270                    .update(&mut cx, |this, _| {
1271                        this.live_kit
1272                            .as_ref()
1273                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1274                    })?
1275                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1276                    .await
1277            };
1278
1279            let publication = publish_track.await;
1280            this.upgrade()
1281                .ok_or_else(|| anyhow!("room was dropped"))?
1282                .update(&mut cx, |this, cx| {
1283                    let live_kit = this
1284                        .live_kit
1285                        .as_mut()
1286                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1287
1288                    let (canceled, muted) = if let LocalTrack::Pending {
1289                        publish_id: cur_publish_id,
1290                        muted,
1291                    } = &live_kit.microphone_track
1292                    {
1293                        (*cur_publish_id != publish_id, *muted)
1294                    } else {
1295                        (true, false)
1296                    };
1297
1298                    match publication {
1299                        Ok(publication) => {
1300                            if canceled {
1301                                live_kit.room.unpublish_track(publication);
1302                            } else {
1303                                if muted {
1304                                    cx.background_executor()
1305                                        .spawn(publication.set_mute(muted))
1306                                        .detach();
1307                                }
1308                                live_kit.microphone_track = LocalTrack::Published {
1309                                    track_publication: publication,
1310                                    muted,
1311                                };
1312                                cx.notify();
1313                            }
1314                            Ok(())
1315                        }
1316                        Err(error) => {
1317                            if canceled {
1318                                Ok(())
1319                            } else {
1320                                live_kit.microphone_track = LocalTrack::None;
1321                                cx.notify();
1322                                Err(error)
1323                            }
1324                        }
1325                    }
1326                })?
1327        })
1328    }
1329
1330    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1331        if self.status.is_offline() {
1332            return Task::ready(Err(anyhow!("room is offline")));
1333        } else if self.is_screen_sharing() {
1334            return Task::ready(Err(anyhow!("screen was already shared")));
1335        }
1336
1337        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1338            let publish_id = post_inc(&mut live_kit.next_publish_id);
1339            live_kit.screen_track = LocalTrack::Pending {
1340                publish_id,
1341                muted: false,
1342            };
1343            cx.notify();
1344            (live_kit.room.display_sources(), publish_id)
1345        } else {
1346            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1347        };
1348
1349        cx.spawn(move |this, mut cx| async move {
1350            let publish_track = async {
1351                let displays = displays.await?;
1352                let display = displays
1353                    .first()
1354                    .ok_or_else(|| anyhow!("no display found"))?;
1355                let track = LocalVideoTrack::screen_share_for_display(&display);
1356                this.upgrade()
1357                    .ok_or_else(|| anyhow!("room was dropped"))?
1358                    .update(&mut cx, |this, _| {
1359                        this.live_kit
1360                            .as_ref()
1361                            .map(|live_kit| live_kit.room.publish_video_track(track))
1362                    })?
1363                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1364                    .await
1365            };
1366
1367            let publication = publish_track.await;
1368            this.upgrade()
1369                .ok_or_else(|| anyhow!("room was dropped"))?
1370                .update(&mut cx, |this, cx| {
1371                    let live_kit = this
1372                        .live_kit
1373                        .as_mut()
1374                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1375
1376                    let (canceled, muted) = if let LocalTrack::Pending {
1377                        publish_id: cur_publish_id,
1378                        muted,
1379                    } = &live_kit.screen_track
1380                    {
1381                        (*cur_publish_id != publish_id, *muted)
1382                    } else {
1383                        (true, false)
1384                    };
1385
1386                    match publication {
1387                        Ok(publication) => {
1388                            if canceled {
1389                                live_kit.room.unpublish_track(publication);
1390                            } else {
1391                                if muted {
1392                                    cx.background_executor()
1393                                        .spawn(publication.set_mute(muted))
1394                                        .detach();
1395                                }
1396                                live_kit.screen_track = LocalTrack::Published {
1397                                    track_publication: publication,
1398                                    muted,
1399                                };
1400                                cx.notify();
1401                            }
1402
1403                            Audio::play_sound(Sound::StartScreenshare, cx);
1404
1405                            Ok(())
1406                        }
1407                        Err(error) => {
1408                            if canceled {
1409                                Ok(())
1410                            } else {
1411                                live_kit.screen_track = LocalTrack::None;
1412                                cx.notify();
1413                                Err(error)
1414                            }
1415                        }
1416                    }
1417                })?
1418        })
1419    }
1420
1421    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1422        let should_mute = !self.is_muted(cx);
1423        if let Some(live_kit) = self.live_kit.as_mut() {
1424            if matches!(live_kit.microphone_track, LocalTrack::None) {
1425                return Ok(self.share_microphone(cx));
1426            }
1427
1428            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1429            live_kit.muted_by_user = should_mute;
1430
1431            if old_muted == true && live_kit.deafened == true {
1432                if let Some(task) = self.toggle_deafen(cx).ok() {
1433                    task.detach();
1434                }
1435            }
1436
1437            Ok(ret_task)
1438        } else {
1439            Err(anyhow!("LiveKit not started"))
1440        }
1441    }
1442
1443    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1444        if let Some(live_kit) = self.live_kit.as_mut() {
1445            (*live_kit).deafened = !live_kit.deafened;
1446
1447            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1448            // Context notification is sent within set_mute itself.
1449            let mut mute_task = None;
1450            // When deafening, mute user's mic as well.
1451            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1452            if live_kit.deafened || !live_kit.muted_by_user {
1453                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1454            };
1455            for participant in self.remote_participants.values() {
1456                for track in live_kit
1457                    .room
1458                    .remote_audio_track_publications(&participant.user.id.to_string())
1459                {
1460                    let deafened = live_kit.deafened;
1461                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1462                }
1463            }
1464
1465            Ok(cx.foreground_executor().spawn(async move {
1466                if let Some(mute_task) = mute_task {
1467                    mute_task.await?;
1468                }
1469                for task in tasks {
1470                    task.await?;
1471                }
1472                Ok(())
1473            }))
1474        } else {
1475            Err(anyhow!("LiveKit not started"))
1476        }
1477    }
1478
1479    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1480        if self.status.is_offline() {
1481            return Err(anyhow!("room is offline"));
1482        }
1483
1484        let live_kit = self
1485            .live_kit
1486            .as_mut()
1487            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1488        match mem::take(&mut live_kit.screen_track) {
1489            LocalTrack::None => Err(anyhow!("screen was not shared")),
1490            LocalTrack::Pending { .. } => {
1491                cx.notify();
1492                Ok(())
1493            }
1494            LocalTrack::Published {
1495                track_publication, ..
1496            } => {
1497                live_kit.room.unpublish_track(track_publication);
1498                cx.notify();
1499
1500                Audio::play_sound(Sound::StopScreenshare, cx);
1501                Ok(())
1502            }
1503        }
1504    }
1505
1506    #[cfg(any(test, feature = "test-support"))]
1507    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1508        self.live_kit
1509            .as_ref()
1510            .unwrap()
1511            .room
1512            .set_display_sources(sources);
1513    }
1514}
1515
1516struct LiveKitRoom {
1517    room: Arc<live_kit_client::Room>,
1518    screen_track: LocalTrack,
1519    microphone_track: LocalTrack,
1520    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1521    muted_by_user: bool,
1522    deafened: bool,
1523    speaking: bool,
1524    next_publish_id: usize,
1525    _maintain_room: Task<()>,
1526    _maintain_tracks: [Task<()>; 2],
1527}
1528
1529impl LiveKitRoom {
1530    fn set_mute(
1531        self: &mut LiveKitRoom,
1532        should_mute: bool,
1533        cx: &mut ModelContext<Room>,
1534    ) -> Result<(Task<Result<()>>, bool)> {
1535        if !should_mute {
1536            // clear user muting state.
1537            self.muted_by_user = false;
1538        }
1539
1540        let (result, old_muted) = match &mut self.microphone_track {
1541            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1542            LocalTrack::Pending { muted, .. } => {
1543                let old_muted = *muted;
1544                *muted = should_mute;
1545                cx.notify();
1546                Ok((Task::Ready(Some(Ok(()))), old_muted))
1547            }
1548            LocalTrack::Published {
1549                track_publication,
1550                muted,
1551            } => {
1552                let old_muted = *muted;
1553                *muted = should_mute;
1554                cx.notify();
1555                Ok((
1556                    cx.background_executor()
1557                        .spawn(track_publication.set_mute(*muted)),
1558                    old_muted,
1559                ))
1560            }
1561        }?;
1562
1563        if old_muted != should_mute {
1564            if should_mute {
1565                Audio::play_sound(Sound::Mute, cx);
1566            } else {
1567                Audio::play_sound(Sound::Unmute, cx);
1568            }
1569        }
1570
1571        Ok((result, old_muted))
1572    }
1573}
1574
1575enum LocalTrack {
1576    None,
1577    Pending {
1578        publish_id: usize,
1579        muted: bool,
1580    },
1581    Published {
1582        track_publication: LocalTrackPublication,
1583        muted: bool,
1584    },
1585}
1586
1587impl Default for LocalTrack {
1588    fn default() -> Self {
1589        Self::None
1590    }
1591}
1592
1593#[derive(Copy, Clone, PartialEq, Eq)]
1594pub enum RoomStatus {
1595    Online,
1596    Rejoining,
1597    Offline,
1598}
1599
1600impl RoomStatus {
1601    pub fn is_offline(&self) -> bool {
1602        matches!(self, RoomStatus::Offline)
1603    }
1604
1605    pub fn is_online(&self) -> bool {
1606        matches!(self, RoomStatus::Online)
1607    }
1608}