room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio2::{Audio, Sound};
   8use client2::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs2::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui2::{
  16    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  17};
  18use language2::LanguageRegistry;
  19use live_kit_client2::{
  20    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  21    RemoteVideoTrackUpdate,
  22};
  23use postage::{sink::Sink, stream::Stream, watch};
  24use project2::Project;
  25use settings2::Settings;
  26use std::{future::Future, mem, sync::Arc, time::Duration};
  27use util::{post_inc, ResultExt, TryFutureExt};
  28
  29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  30
  31#[derive(Clone, Debug, PartialEq, Eq)]
  32pub enum Event {
  33    ParticipantLocationChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteVideoTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteAudioTracksChanged {
  40        participant_id: proto::PeerId,
  41    },
  42    RemoteProjectShared {
  43        owner: Arc<User>,
  44        project_id: u64,
  45        worktree_root_names: Vec<String>,
  46    },
  47    RemoteProjectUnshared {
  48        project_id: u64,
  49    },
  50    RemoteProjectJoined {
  51        project_id: u64,
  52    },
  53    RemoteProjectInvitationDiscarded {
  54        project_id: u64,
  55    },
  56    Left,
  57}
  58
  59pub struct Room {
  60    id: u64,
  61    channel_id: Option<u64>,
  62    live_kit: Option<LiveKitRoom>,
  63    status: RoomStatus,
  64    shared_projects: HashSet<WeakModel<Project>>,
  65    joined_projects: HashSet<WeakModel<Project>>,
  66    local_participant: LocalParticipant,
  67    remote_participants: BTreeMap<u64, RemoteParticipant>,
  68    pending_participants: Vec<Arc<User>>,
  69    participant_user_ids: HashSet<u64>,
  70    pending_call_count: usize,
  71    leave_when_empty: bool,
  72    client: Arc<Client>,
  73    user_store: Model<UserStore>,
  74    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  75    client_subscriptions: Vec<client2::Subscription>,
  76    _subscriptions: Vec<gpui2::Subscription>,
  77    room_update_completed_tx: watch::Sender<Option<()>>,
  78    room_update_completed_rx: watch::Receiver<Option<()>>,
  79    pending_room_update: Option<Task<()>>,
  80    maintain_connection: Option<Task<Option<()>>>,
  81}
  82
  83impl EventEmitter for Room {
  84    type Event = Event;
  85}
  86
  87impl Room {
  88    pub fn channel_id(&self) -> Option<u64> {
  89        self.channel_id
  90    }
  91
  92    pub fn is_sharing_project(&self) -> bool {
  93        !self.shared_projects.is_empty()
  94    }
  95
  96    #[cfg(any(test, feature = "test-support"))]
  97    pub fn is_connected(&self) -> bool {
  98        if let Some(live_kit) = self.live_kit.as_ref() {
  99            matches!(
 100                *live_kit.room.status().borrow(),
 101                live_kit_client2::ConnectionState::Connected { .. }
 102            )
 103        } else {
 104            false
 105        }
 106    }
 107
 108    fn new(
 109        id: u64,
 110        channel_id: Option<u64>,
 111        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 112        client: Arc<Client>,
 113        user_store: Model<UserStore>,
 114        cx: &mut ModelContext<Self>,
 115    ) -> Self {
 116        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 117            let room = live_kit_client2::Room::new();
 118            let mut status = room.status();
 119            // Consume the initial status of the room.
 120            let _ = status.try_recv();
 121            let _maintain_room = cx.spawn(|this, mut cx| async move {
 122                while let Some(status) = status.next().await {
 123                    let this = if let Some(this) = this.upgrade() {
 124                        this
 125                    } else {
 126                        break;
 127                    };
 128
 129                    if status == live_kit_client2::ConnectionState::Disconnected {
 130                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 131                            .ok();
 132                        break;
 133                    }
 134                }
 135            });
 136
 137            let _maintain_video_tracks = cx.spawn_on_main({
 138                let room = room.clone();
 139                move |this, mut cx| async move {
 140                    let mut track_video_changes = room.remote_video_track_updates();
 141                    while let Some(track_change) = track_video_changes.next().await {
 142                        let this = if let Some(this) = this.upgrade() {
 143                            this
 144                        } else {
 145                            break;
 146                        };
 147
 148                        this.update(&mut cx, |this, cx| {
 149                            this.remote_video_track_updated(track_change, cx).log_err()
 150                        })
 151                        .ok();
 152                    }
 153                }
 154            });
 155
 156            let _maintain_audio_tracks = cx.spawn_on_main({
 157                let room = room.clone();
 158                |this, mut cx| async move {
 159                    let mut track_audio_changes = room.remote_audio_track_updates();
 160                    while let Some(track_change) = track_audio_changes.next().await {
 161                        let this = if let Some(this) = this.upgrade() {
 162                            this
 163                        } else {
 164                            break;
 165                        };
 166
 167                        this.update(&mut cx, |this, cx| {
 168                            this.remote_audio_track_updated(track_change, cx).log_err()
 169                        })
 170                        .ok();
 171                    }
 172                }
 173            });
 174
 175            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 176            cx.spawn(|this, mut cx| async move {
 177                connect.await?;
 178
 179                if !cx.update(|cx| Self::mute_on_join(cx))? {
 180                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 181                        .await?;
 182                }
 183
 184                anyhow::Ok(())
 185            })
 186            .detach_and_log_err(cx);
 187
 188            Some(LiveKitRoom {
 189                room,
 190                screen_track: LocalTrack::None,
 191                microphone_track: LocalTrack::None,
 192                next_publish_id: 0,
 193                muted_by_user: false,
 194                deafened: false,
 195                speaking: false,
 196                _maintain_room,
 197                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 198            })
 199        } else {
 200            None
 201        };
 202
 203        let maintain_connection = cx.spawn({
 204            let client = client.clone();
 205            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 206        });
 207
 208        Audio::play_sound(Sound::Joined, cx);
 209
 210        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 211
 212        Self {
 213            id,
 214            channel_id,
 215            live_kit: live_kit_room,
 216            status: RoomStatus::Online,
 217            shared_projects: Default::default(),
 218            joined_projects: Default::default(),
 219            participant_user_ids: Default::default(),
 220            local_participant: Default::default(),
 221            remote_participants: Default::default(),
 222            pending_participants: Default::default(),
 223            pending_call_count: 0,
 224            client_subscriptions: vec![
 225                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 226            ],
 227            _subscriptions: vec![
 228                cx.on_release(Self::released),
 229                cx.on_app_quit(Self::app_will_quit),
 230            ],
 231            leave_when_empty: false,
 232            pending_room_update: None,
 233            client,
 234            user_store,
 235            follows_by_leader_id_project_id: Default::default(),
 236            maintain_connection: Some(maintain_connection),
 237            room_update_completed_tx,
 238            room_update_completed_rx,
 239        }
 240    }
 241
 242    pub(crate) fn create(
 243        called_user_id: u64,
 244        initial_project: Option<Model<Project>>,
 245        client: Arc<Client>,
 246        user_store: Model<UserStore>,
 247        cx: &mut AppContext,
 248    ) -> Task<Result<Model<Self>>> {
 249        cx.spawn(move |mut cx| async move {
 250            let response = client.request(proto::CreateRoom {}).await?;
 251            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 252            let room = cx.build_model(|cx| {
 253                Self::new(
 254                    room_proto.id,
 255                    None,
 256                    response.live_kit_connection_info,
 257                    client,
 258                    user_store,
 259                    cx,
 260                )
 261            })?;
 262
 263            let initial_project_id = if let Some(initial_project) = initial_project {
 264                let initial_project_id = room
 265                    .update(&mut cx, |room, cx| {
 266                        room.share_project(initial_project.clone(), cx)
 267                    })?
 268                    .await?;
 269                Some(initial_project_id)
 270            } else {
 271                None
 272            };
 273
 274            match room
 275                .update(&mut cx, |room, cx| {
 276                    room.leave_when_empty = true;
 277                    room.call(called_user_id, initial_project_id, cx)
 278                })?
 279                .await
 280            {
 281                Ok(()) => Ok(room),
 282                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 283            }
 284        })
 285    }
 286
 287    pub(crate) fn join_channel(
 288        channel_id: u64,
 289        client: Arc<Client>,
 290        user_store: Model<UserStore>,
 291        cx: &mut AppContext,
 292    ) -> Task<Result<Model<Self>>> {
 293        cx.spawn(move |cx| async move {
 294            Self::from_join_response(
 295                client.request(proto::JoinChannel { channel_id }).await?,
 296                client,
 297                user_store,
 298                cx,
 299            )
 300        })
 301    }
 302
 303    pub(crate) fn join(
 304        call: &IncomingCall,
 305        client: Arc<Client>,
 306        user_store: Model<UserStore>,
 307        cx: &mut AppContext,
 308    ) -> Task<Result<Model<Self>>> {
 309        let id = call.room_id;
 310        cx.spawn(move |cx| async move {
 311            Self::from_join_response(
 312                client.request(proto::JoinRoom { id }).await?,
 313                client,
 314                user_store,
 315                cx,
 316            )
 317        })
 318    }
 319
 320    fn released(&mut self, cx: &mut AppContext) {
 321        if self.status.is_online() {
 322            self.leave_internal(cx).detach_and_log_err(cx);
 323        }
 324    }
 325
 326    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 327        let task = if self.status.is_online() {
 328            let leave = self.leave_internal(cx);
 329            Some(cx.executor().spawn(async move {
 330                leave.await.log_err();
 331            }))
 332        } else {
 333            None
 334        };
 335
 336        async move {
 337            if let Some(task) = task {
 338                task.await;
 339            }
 340        }
 341    }
 342
 343    pub fn mute_on_join(cx: &AppContext) -> bool {
 344        CallSettings::get_global(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some()
 345    }
 346
 347    fn from_join_response(
 348        response: proto::JoinRoomResponse,
 349        client: Arc<Client>,
 350        user_store: Model<UserStore>,
 351        mut cx: AsyncAppContext,
 352    ) -> Result<Model<Self>> {
 353        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 354        let room = cx.build_model(|cx| {
 355            Self::new(
 356                room_proto.id,
 357                response.channel_id,
 358                response.live_kit_connection_info,
 359                client,
 360                user_store,
 361                cx,
 362            )
 363        })?;
 364        room.update(&mut cx, |room, cx| {
 365            room.leave_when_empty = room.channel_id.is_none();
 366            room.apply_room_update(room_proto, cx)?;
 367            anyhow::Ok(())
 368        })??;
 369        Ok(room)
 370    }
 371
 372    fn should_leave(&self) -> bool {
 373        self.leave_when_empty
 374            && self.pending_room_update.is_none()
 375            && self.pending_participants.is_empty()
 376            && self.remote_participants.is_empty()
 377            && self.pending_call_count == 0
 378    }
 379
 380    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 381        cx.notify();
 382        cx.emit(Event::Left);
 383        self.leave_internal(cx)
 384    }
 385
 386    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 387        if self.status.is_offline() {
 388            return Task::ready(Err(anyhow!("room is offline")));
 389        }
 390
 391        log::info!("leaving room");
 392        Audio::play_sound(Sound::Leave, cx);
 393
 394        self.clear_state(cx);
 395
 396        let leave_room = self.client.request(proto::LeaveRoom {});
 397        cx.executor().spawn(async move {
 398            leave_room.await?;
 399            anyhow::Ok(())
 400        })
 401    }
 402
 403    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 404        for project in self.shared_projects.drain() {
 405            if let Some(project) = project.upgrade() {
 406                project.update(cx, |project, cx| {
 407                    project.unshare(cx).log_err();
 408                });
 409            }
 410        }
 411        for project in self.joined_projects.drain() {
 412            if let Some(project) = project.upgrade() {
 413                project.update(cx, |project, cx| {
 414                    project.disconnected_from_host(cx);
 415                    project.close(cx);
 416                });
 417            }
 418        }
 419
 420        self.status = RoomStatus::Offline;
 421        self.remote_participants.clear();
 422        self.pending_participants.clear();
 423        self.participant_user_ids.clear();
 424        self.client_subscriptions.clear();
 425        self.live_kit.take();
 426        self.pending_room_update.take();
 427        self.maintain_connection.take();
 428    }
 429
 430    async fn maintain_connection(
 431        this: WeakModel<Self>,
 432        client: Arc<Client>,
 433        mut cx: AsyncAppContext,
 434    ) -> Result<()> {
 435        let mut client_status = client.status();
 436        loop {
 437            let _ = client_status.try_recv();
 438            let is_connected = client_status.borrow().is_connected();
 439            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 440            if !is_connected || client_status.next().await.is_some() {
 441                log::info!("detected client disconnection");
 442
 443                this.upgrade()
 444                    .ok_or_else(|| anyhow!("room was dropped"))?
 445                    .update(&mut cx, |this, cx| {
 446                        this.status = RoomStatus::Rejoining;
 447                        cx.notify();
 448                    })?;
 449
 450                // Wait for client to re-establish a connection to the server.
 451                {
 452                    let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse();
 453                    let client_reconnection = async {
 454                        let mut remaining_attempts = 3;
 455                        while remaining_attempts > 0 {
 456                            if client_status.borrow().is_connected() {
 457                                log::info!("client reconnected, attempting to rejoin room");
 458
 459                                let Some(this) = this.upgrade() else { break };
 460                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 461                                    Ok(task) => {
 462                                        if task.await.log_err().is_some() {
 463                                            return true;
 464                                        } else {
 465                                            remaining_attempts -= 1;
 466                                        }
 467                                    }
 468                                    Err(_app_dropped) => return false,
 469                                }
 470                            } else if client_status.borrow().is_signed_out() {
 471                                return false;
 472                            }
 473
 474                            log::info!(
 475                                "waiting for client status change, remaining attempts {}",
 476                                remaining_attempts
 477                            );
 478                            client_status.next().await;
 479                        }
 480                        false
 481                    }
 482                    .fuse();
 483                    futures::pin_mut!(client_reconnection);
 484
 485                    futures::select_biased! {
 486                        reconnected = client_reconnection => {
 487                            if reconnected {
 488                                log::info!("successfully reconnected to room");
 489                                // If we successfully joined the room, go back around the loop
 490                                // waiting for future connection status changes.
 491                                continue;
 492                            }
 493                        }
 494                        _ = reconnection_timeout => {
 495                            log::info!("room reconnection timeout expired");
 496                        }
 497                    }
 498                }
 499
 500                break;
 501            }
 502        }
 503
 504        // The client failed to re-establish a connection to the server
 505        // or an error occurred while trying to re-join the room. Either way
 506        // we leave the room and return an error.
 507        if let Some(this) = this.upgrade() {
 508            log::info!("reconnection failed, leaving room");
 509            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 510        }
 511        Err(anyhow!(
 512            "can't reconnect to room: client failed to re-establish connection"
 513        ))
 514    }
 515
 516    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 517        let mut projects = HashMap::default();
 518        let mut reshared_projects = Vec::new();
 519        let mut rejoined_projects = Vec::new();
 520        self.shared_projects.retain(|project| {
 521            if let Some(handle) = project.upgrade() {
 522                let project = handle.read(cx);
 523                if let Some(project_id) = project.remote_id() {
 524                    projects.insert(project_id, handle.clone());
 525                    reshared_projects.push(proto::UpdateProject {
 526                        project_id,
 527                        worktrees: project.worktree_metadata_protos(cx),
 528                    });
 529                    return true;
 530                }
 531            }
 532            false
 533        });
 534        self.joined_projects.retain(|project| {
 535            if let Some(handle) = project.upgrade() {
 536                let project = handle.read(cx);
 537                if let Some(project_id) = project.remote_id() {
 538                    projects.insert(project_id, handle.clone());
 539                    rejoined_projects.push(proto::RejoinProject {
 540                        id: project_id,
 541                        worktrees: project
 542                            .worktrees()
 543                            .map(|worktree| {
 544                                let worktree = worktree.read(cx);
 545                                proto::RejoinWorktree {
 546                                    id: worktree.id().to_proto(),
 547                                    scan_id: worktree.completed_scan_id() as u64,
 548                                }
 549                            })
 550                            .collect(),
 551                    });
 552                }
 553                return true;
 554            }
 555            false
 556        });
 557
 558        let response = self.client.request_envelope(proto::RejoinRoom {
 559            id: self.id,
 560            reshared_projects,
 561            rejoined_projects,
 562        });
 563
 564        cx.spawn(|this, mut cx| async move {
 565            let response = response.await?;
 566            let message_id = response.message_id;
 567            let response = response.payload;
 568            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 569            this.update(&mut cx, |this, cx| {
 570                this.status = RoomStatus::Online;
 571                this.apply_room_update(room_proto, cx)?;
 572
 573                for reshared_project in response.reshared_projects {
 574                    if let Some(project) = projects.get(&reshared_project.id) {
 575                        project.update(cx, |project, cx| {
 576                            project.reshared(reshared_project, cx).log_err();
 577                        });
 578                    }
 579                }
 580
 581                for rejoined_project in response.rejoined_projects {
 582                    if let Some(project) = projects.get(&rejoined_project.id) {
 583                        project.update(cx, |project, cx| {
 584                            project.rejoined(rejoined_project, message_id, cx).log_err();
 585                        });
 586                    }
 587                }
 588
 589                anyhow::Ok(())
 590            })?
 591        })
 592    }
 593
 594    pub fn id(&self) -> u64 {
 595        self.id
 596    }
 597
 598    pub fn status(&self) -> RoomStatus {
 599        self.status
 600    }
 601
 602    pub fn local_participant(&self) -> &LocalParticipant {
 603        &self.local_participant
 604    }
 605
 606    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 607        &self.remote_participants
 608    }
 609
 610    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 611        self.remote_participants
 612            .values()
 613            .find(|p| p.peer_id == peer_id)
 614    }
 615
 616    pub fn pending_participants(&self) -> &[Arc<User>] {
 617        &self.pending_participants
 618    }
 619
 620    pub fn contains_participant(&self, user_id: u64) -> bool {
 621        self.participant_user_ids.contains(&user_id)
 622    }
 623
 624    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 625        self.follows_by_leader_id_project_id
 626            .get(&(leader_id, project_id))
 627            .map_or(&[], |v| v.as_slice())
 628    }
 629
 630    /// Returns the most 'active' projects, defined as most people in the project
 631    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 632        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 633        for participant in self.remote_participants.values() {
 634            match participant.location {
 635                ParticipantLocation::SharedProject { project_id } => {
 636                    project_hosts_and_guest_counts
 637                        .entry(project_id)
 638                        .or_default()
 639                        .1 += 1;
 640                }
 641                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 642            }
 643            for project in &participant.projects {
 644                project_hosts_and_guest_counts
 645                    .entry(project.id)
 646                    .or_default()
 647                    .0 = Some(participant.user.id);
 648            }
 649        }
 650
 651        if let Some(user) = self.user_store.read(cx).current_user() {
 652            for project in &self.local_participant.projects {
 653                project_hosts_and_guest_counts
 654                    .entry(project.id)
 655                    .or_default()
 656                    .0 = Some(user.id);
 657            }
 658        }
 659
 660        project_hosts_and_guest_counts
 661            .into_iter()
 662            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 663            .max_by_key(|(_, _, guest_count)| *guest_count)
 664            .map(|(id, host, _)| (id, host))
 665    }
 666
 667    async fn handle_room_updated(
 668        this: Model<Self>,
 669        envelope: TypedEnvelope<proto::RoomUpdated>,
 670        _: Arc<Client>,
 671        mut cx: AsyncAppContext,
 672    ) -> Result<()> {
 673        let room = envelope
 674            .payload
 675            .room
 676            .ok_or_else(|| anyhow!("invalid room"))?;
 677        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 678    }
 679
 680    fn apply_room_update(
 681        &mut self,
 682        mut room: proto::Room,
 683        cx: &mut ModelContext<Self>,
 684    ) -> Result<()> {
 685        // Filter ourselves out from the room's participants.
 686        let local_participant_ix = room
 687            .participants
 688            .iter()
 689            .position(|participant| Some(participant.user_id) == self.client.user_id());
 690        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 691
 692        let pending_participant_user_ids = room
 693            .pending_participants
 694            .iter()
 695            .map(|p| p.user_id)
 696            .collect::<Vec<_>>();
 697
 698        let remote_participant_user_ids = room
 699            .participants
 700            .iter()
 701            .map(|p| p.user_id)
 702            .collect::<Vec<_>>();
 703
 704        let (remote_participants, pending_participants) =
 705            self.user_store.update(cx, move |user_store, cx| {
 706                (
 707                    user_store.get_users(remote_participant_user_ids, cx),
 708                    user_store.get_users(pending_participant_user_ids, cx),
 709                )
 710            });
 711
 712        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 713            let (remote_participants, pending_participants) =
 714                futures::join!(remote_participants, pending_participants);
 715
 716            this.update(&mut cx, |this, cx| {
 717                this.participant_user_ids.clear();
 718
 719                if let Some(participant) = local_participant {
 720                    this.local_participant.projects = participant.projects;
 721                } else {
 722                    this.local_participant.projects.clear();
 723                }
 724
 725                if let Some(participants) = remote_participants.log_err() {
 726                    for (participant, user) in room.participants.into_iter().zip(participants) {
 727                        let Some(peer_id) = participant.peer_id else {
 728                            continue;
 729                        };
 730                        let participant_index = ParticipantIndex(participant.participant_index);
 731                        this.participant_user_ids.insert(participant.user_id);
 732
 733                        let old_projects = this
 734                            .remote_participants
 735                            .get(&participant.user_id)
 736                            .into_iter()
 737                            .flat_map(|existing| &existing.projects)
 738                            .map(|project| project.id)
 739                            .collect::<HashSet<_>>();
 740                        let new_projects = participant
 741                            .projects
 742                            .iter()
 743                            .map(|project| project.id)
 744                            .collect::<HashSet<_>>();
 745
 746                        for project in &participant.projects {
 747                            if !old_projects.contains(&project.id) {
 748                                cx.emit(Event::RemoteProjectShared {
 749                                    owner: user.clone(),
 750                                    project_id: project.id,
 751                                    worktree_root_names: project.worktree_root_names.clone(),
 752                                });
 753                            }
 754                        }
 755
 756                        for unshared_project_id in old_projects.difference(&new_projects) {
 757                            this.joined_projects.retain(|project| {
 758                                if let Some(project) = project.upgrade() {
 759                                    project.update(cx, |project, cx| {
 760                                        if project.remote_id() == Some(*unshared_project_id) {
 761                                            project.disconnected_from_host(cx);
 762                                            false
 763                                        } else {
 764                                            true
 765                                        }
 766                                    })
 767                                } else {
 768                                    false
 769                                }
 770                            });
 771                            cx.emit(Event::RemoteProjectUnshared {
 772                                project_id: *unshared_project_id,
 773                            });
 774                        }
 775
 776                        let location = ParticipantLocation::from_proto(participant.location)
 777                            .unwrap_or(ParticipantLocation::External);
 778                        if let Some(remote_participant) =
 779                            this.remote_participants.get_mut(&participant.user_id)
 780                        {
 781                            remote_participant.peer_id = peer_id;
 782                            remote_participant.projects = participant.projects;
 783                            remote_participant.participant_index = participant_index;
 784                            if location != remote_participant.location {
 785                                remote_participant.location = location;
 786                                cx.emit(Event::ParticipantLocationChanged {
 787                                    participant_id: peer_id,
 788                                });
 789                            }
 790                        } else {
 791                            this.remote_participants.insert(
 792                                participant.user_id,
 793                                RemoteParticipant {
 794                                    user: user.clone(),
 795                                    participant_index,
 796                                    peer_id,
 797                                    projects: participant.projects,
 798                                    location,
 799                                    muted: true,
 800                                    speaking: false,
 801                                    video_tracks: Default::default(),
 802                                    audio_tracks: Default::default(),
 803                                },
 804                            );
 805
 806                            Audio::play_sound(Sound::Joined, cx);
 807
 808                            if let Some(live_kit) = this.live_kit.as_ref() {
 809                                let video_tracks =
 810                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 811                                let audio_tracks =
 812                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 813                                let publications = live_kit
 814                                    .room
 815                                    .remote_audio_track_publications(&user.id.to_string());
 816
 817                                for track in video_tracks {
 818                                    this.remote_video_track_updated(
 819                                        RemoteVideoTrackUpdate::Subscribed(track),
 820                                        cx,
 821                                    )
 822                                    .log_err();
 823                                }
 824
 825                                for (track, publication) in
 826                                    audio_tracks.iter().zip(publications.iter())
 827                                {
 828                                    this.remote_audio_track_updated(
 829                                        RemoteAudioTrackUpdate::Subscribed(
 830                                            track.clone(),
 831                                            publication.clone(),
 832                                        ),
 833                                        cx,
 834                                    )
 835                                    .log_err();
 836                                }
 837                            }
 838                        }
 839                    }
 840
 841                    this.remote_participants.retain(|user_id, participant| {
 842                        if this.participant_user_ids.contains(user_id) {
 843                            true
 844                        } else {
 845                            for project in &participant.projects {
 846                                cx.emit(Event::RemoteProjectUnshared {
 847                                    project_id: project.id,
 848                                });
 849                            }
 850                            false
 851                        }
 852                    });
 853                }
 854
 855                if let Some(pending_participants) = pending_participants.log_err() {
 856                    this.pending_participants = pending_participants;
 857                    for participant in &this.pending_participants {
 858                        this.participant_user_ids.insert(participant.id);
 859                    }
 860                }
 861
 862                this.follows_by_leader_id_project_id.clear();
 863                for follower in room.followers {
 864                    let project_id = follower.project_id;
 865                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 866                        (Some(leader), Some(follower)) => (leader, follower),
 867
 868                        _ => {
 869                            log::error!("Follower message {follower:?} missing some state");
 870                            continue;
 871                        }
 872                    };
 873
 874                    let list = this
 875                        .follows_by_leader_id_project_id
 876                        .entry((leader, project_id))
 877                        .or_insert(Vec::new());
 878                    if !list.contains(&follower) {
 879                        list.push(follower);
 880                    }
 881                }
 882
 883                this.pending_room_update.take();
 884                if this.should_leave() {
 885                    log::info!("room is empty, leaving");
 886                    let _ = this.leave(cx);
 887                }
 888
 889                this.user_store.update(cx, |user_store, cx| {
 890                    let participant_indices_by_user_id = this
 891                        .remote_participants
 892                        .iter()
 893                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 894                        .collect();
 895                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 896                });
 897
 898                this.check_invariants();
 899                this.room_update_completed_tx.try_send(Some(())).ok();
 900                cx.notify();
 901            })
 902            .ok();
 903        }));
 904
 905        cx.notify();
 906        Ok(())
 907    }
 908
 909    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 910        let mut done_rx = self.room_update_completed_rx.clone();
 911        async move {
 912            while let Some(result) = done_rx.next().await {
 913                if result.is_some() {
 914                    break;
 915                }
 916            }
 917        }
 918    }
 919
 920    fn remote_video_track_updated(
 921        &mut self,
 922        change: RemoteVideoTrackUpdate,
 923        cx: &mut ModelContext<Self>,
 924    ) -> Result<()> {
 925        match change {
 926            RemoteVideoTrackUpdate::Subscribed(track) => {
 927                let user_id = track.publisher_id().parse()?;
 928                let track_id = track.sid().to_string();
 929                let participant = self
 930                    .remote_participants
 931                    .get_mut(&user_id)
 932                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 933                participant.video_tracks.insert(track_id.clone(), track);
 934                cx.emit(Event::RemoteVideoTracksChanged {
 935                    participant_id: participant.peer_id,
 936                });
 937            }
 938            RemoteVideoTrackUpdate::Unsubscribed {
 939                publisher_id,
 940                track_id,
 941            } => {
 942                let user_id = publisher_id.parse()?;
 943                let participant = self
 944                    .remote_participants
 945                    .get_mut(&user_id)
 946                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 947                participant.video_tracks.remove(&track_id);
 948                cx.emit(Event::RemoteVideoTracksChanged {
 949                    participant_id: participant.peer_id,
 950                });
 951            }
 952        }
 953
 954        cx.notify();
 955        Ok(())
 956    }
 957
 958    fn remote_audio_track_updated(
 959        &mut self,
 960        change: RemoteAudioTrackUpdate,
 961        cx: &mut ModelContext<Self>,
 962    ) -> Result<()> {
 963        match change {
 964            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 965                let mut speaker_ids = speakers
 966                    .into_iter()
 967                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 968                    .collect::<Vec<u64>>();
 969                speaker_ids.sort_unstable();
 970                for (sid, participant) in &mut self.remote_participants {
 971                    if let Ok(_) = speaker_ids.binary_search(sid) {
 972                        participant.speaking = true;
 973                    } else {
 974                        participant.speaking = false;
 975                    }
 976                }
 977                if let Some(id) = self.client.user_id() {
 978                    if let Some(room) = &mut self.live_kit {
 979                        if let Ok(_) = speaker_ids.binary_search(&id) {
 980                            room.speaking = true;
 981                        } else {
 982                            room.speaking = false;
 983                        }
 984                    }
 985                }
 986                cx.notify();
 987            }
 988            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 989                let mut found = false;
 990                for participant in &mut self.remote_participants.values_mut() {
 991                    for track in participant.audio_tracks.values() {
 992                        if track.sid() == track_id {
 993                            found = true;
 994                            break;
 995                        }
 996                    }
 997                    if found {
 998                        participant.muted = muted;
 999                        break;
1000                    }
1001                }
1002
1003                cx.notify();
1004            }
1005            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1006                let user_id = track.publisher_id().parse()?;
1007                let track_id = track.sid().to_string();
1008                let participant = self
1009                    .remote_participants
1010                    .get_mut(&user_id)
1011                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1012                participant.audio_tracks.insert(track_id.clone(), track);
1013                participant.muted = publication.is_muted();
1014
1015                cx.emit(Event::RemoteAudioTracksChanged {
1016                    participant_id: participant.peer_id,
1017                });
1018            }
1019            RemoteAudioTrackUpdate::Unsubscribed {
1020                publisher_id,
1021                track_id,
1022            } => {
1023                let user_id = publisher_id.parse()?;
1024                let participant = self
1025                    .remote_participants
1026                    .get_mut(&user_id)
1027                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1028                participant.audio_tracks.remove(&track_id);
1029                cx.emit(Event::RemoteAudioTracksChanged {
1030                    participant_id: participant.peer_id,
1031                });
1032            }
1033        }
1034
1035        cx.notify();
1036        Ok(())
1037    }
1038
1039    fn check_invariants(&self) {
1040        #[cfg(any(test, feature = "test-support"))]
1041        {
1042            for participant in self.remote_participants.values() {
1043                assert!(self.participant_user_ids.contains(&participant.user.id));
1044                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1045            }
1046
1047            for participant in &self.pending_participants {
1048                assert!(self.participant_user_ids.contains(&participant.id));
1049                assert_ne!(participant.id, self.client.user_id().unwrap());
1050            }
1051
1052            assert_eq!(
1053                self.participant_user_ids.len(),
1054                self.remote_participants.len() + self.pending_participants.len()
1055            );
1056        }
1057    }
1058
1059    pub(crate) fn call(
1060        &mut self,
1061        called_user_id: u64,
1062        initial_project_id: Option<u64>,
1063        cx: &mut ModelContext<Self>,
1064    ) -> Task<Result<()>> {
1065        if self.status.is_offline() {
1066            return Task::ready(Err(anyhow!("room is offline")));
1067        }
1068
1069        cx.notify();
1070        let client = self.client.clone();
1071        let room_id = self.id;
1072        self.pending_call_count += 1;
1073        cx.spawn(move |this, mut cx| async move {
1074            let result = client
1075                .request(proto::Call {
1076                    room_id,
1077                    called_user_id,
1078                    initial_project_id,
1079                })
1080                .await;
1081            this.update(&mut cx, |this, cx| {
1082                this.pending_call_count -= 1;
1083                if this.should_leave() {
1084                    this.leave(cx).detach_and_log_err(cx);
1085                }
1086            })?;
1087            result?;
1088            Ok(())
1089        })
1090    }
1091
1092    pub fn join_project(
1093        &mut self,
1094        id: u64,
1095        language_registry: Arc<LanguageRegistry>,
1096        fs: Arc<dyn Fs>,
1097        cx: &mut ModelContext<Self>,
1098    ) -> Task<Result<Model<Project>>> {
1099        let client = self.client.clone();
1100        let user_store = self.user_store.clone();
1101        cx.emit(Event::RemoteProjectJoined { project_id: id });
1102        cx.spawn(move |this, mut cx| async move {
1103            let project =
1104                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1105
1106            this.update(&mut cx, |this, cx| {
1107                this.joined_projects.retain(|project| {
1108                    if let Some(project) = project.upgrade() {
1109                        !project.read(cx).is_read_only()
1110                    } else {
1111                        false
1112                    }
1113                });
1114                this.joined_projects.insert(project.downgrade());
1115            })?;
1116            Ok(project)
1117        })
1118    }
1119
1120    pub(crate) fn share_project(
1121        &mut self,
1122        project: Model<Project>,
1123        cx: &mut ModelContext<Self>,
1124    ) -> Task<Result<u64>> {
1125        if let Some(project_id) = project.read(cx).remote_id() {
1126            return Task::ready(Ok(project_id));
1127        }
1128
1129        let request = self.client.request(proto::ShareProject {
1130            room_id: self.id(),
1131            worktrees: project.read(cx).worktree_metadata_protos(cx),
1132        });
1133        cx.spawn(|this, mut cx| async move {
1134            let response = request.await?;
1135
1136            project.update(&mut cx, |project, cx| {
1137                project.shared(response.project_id, cx)
1138            })??;
1139
1140            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1141            this.update(&mut cx, |this, cx| {
1142                this.shared_projects.insert(project.downgrade());
1143                let active_project = this.local_participant.active_project.as_ref();
1144                if active_project.map_or(false, |location| *location == project) {
1145                    this.set_location(Some(&project), cx)
1146                } else {
1147                    Task::ready(Ok(()))
1148                }
1149            })?
1150            .await?;
1151
1152            Ok(response.project_id)
1153        })
1154    }
1155
1156    pub(crate) fn unshare_project(
1157        &mut self,
1158        project: Model<Project>,
1159        cx: &mut ModelContext<Self>,
1160    ) -> Result<()> {
1161        let project_id = match project.read(cx).remote_id() {
1162            Some(project_id) => project_id,
1163            None => return Ok(()),
1164        };
1165
1166        self.client.send(proto::UnshareProject { project_id })?;
1167        project.update(cx, |this, cx| this.unshare(cx))
1168    }
1169
1170    pub(crate) fn set_location(
1171        &mut self,
1172        project: Option<&Model<Project>>,
1173        cx: &mut ModelContext<Self>,
1174    ) -> Task<Result<()>> {
1175        if self.status.is_offline() {
1176            return Task::ready(Err(anyhow!("room is offline")));
1177        }
1178
1179        let client = self.client.clone();
1180        let room_id = self.id;
1181        let location = if let Some(project) = project {
1182            self.local_participant.active_project = Some(project.downgrade());
1183            if let Some(project_id) = project.read(cx).remote_id() {
1184                proto::participant_location::Variant::SharedProject(
1185                    proto::participant_location::SharedProject { id: project_id },
1186                )
1187            } else {
1188                proto::participant_location::Variant::UnsharedProject(
1189                    proto::participant_location::UnsharedProject {},
1190                )
1191            }
1192        } else {
1193            self.local_participant.active_project = None;
1194            proto::participant_location::Variant::External(proto::participant_location::External {})
1195        };
1196
1197        cx.notify();
1198        cx.executor().spawn_on_main(move || async move {
1199            client
1200                .request(proto::UpdateParticipantLocation {
1201                    room_id,
1202                    location: Some(proto::ParticipantLocation {
1203                        variant: Some(location),
1204                    }),
1205                })
1206                .await?;
1207            Ok(())
1208        })
1209    }
1210
1211    pub fn is_screen_sharing(&self) -> bool {
1212        self.live_kit.as_ref().map_or(false, |live_kit| {
1213            !matches!(live_kit.screen_track, LocalTrack::None)
1214        })
1215    }
1216
1217    pub fn is_sharing_mic(&self) -> bool {
1218        self.live_kit.as_ref().map_or(false, |live_kit| {
1219            !matches!(live_kit.microphone_track, LocalTrack::None)
1220        })
1221    }
1222
1223    pub fn is_muted(&self, cx: &AppContext) -> bool {
1224        self.live_kit
1225            .as_ref()
1226            .and_then(|live_kit| match &live_kit.microphone_track {
1227                LocalTrack::None => Some(Self::mute_on_join(cx)),
1228                LocalTrack::Pending { muted, .. } => Some(*muted),
1229                LocalTrack::Published { muted, .. } => Some(*muted),
1230            })
1231            .unwrap_or(false)
1232    }
1233
1234    pub fn is_speaking(&self) -> bool {
1235        self.live_kit
1236            .as_ref()
1237            .map_or(false, |live_kit| live_kit.speaking)
1238    }
1239
1240    pub fn is_deafened(&self) -> Option<bool> {
1241        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1242    }
1243
1244    #[track_caller]
1245    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1246        if self.status.is_offline() {
1247            return Task::ready(Err(anyhow!("room is offline")));
1248        } else if self.is_sharing_mic() {
1249            return Task::ready(Err(anyhow!("microphone was already shared")));
1250        }
1251
1252        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1253            let publish_id = post_inc(&mut live_kit.next_publish_id);
1254            live_kit.microphone_track = LocalTrack::Pending {
1255                publish_id,
1256                muted: false,
1257            };
1258            cx.notify();
1259            publish_id
1260        } else {
1261            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1262        };
1263
1264        cx.spawn(move |this, mut cx| async move {
1265            let publish_track = async {
1266                let track = LocalAudioTrack::create();
1267                this.upgrade()
1268                    .ok_or_else(|| anyhow!("room was dropped"))?
1269                    .update(&mut cx, |this, _| {
1270                        this.live_kit
1271                            .as_ref()
1272                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1273                    })?
1274                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1275                    .await
1276            };
1277
1278            let publication = publish_track.await;
1279            this.upgrade()
1280                .ok_or_else(|| anyhow!("room was dropped"))?
1281                .update(&mut cx, |this, cx| {
1282                    let live_kit = this
1283                        .live_kit
1284                        .as_mut()
1285                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1286
1287                    let (canceled, muted) = if let LocalTrack::Pending {
1288                        publish_id: cur_publish_id,
1289                        muted,
1290                    } = &live_kit.microphone_track
1291                    {
1292                        (*cur_publish_id != publish_id, *muted)
1293                    } else {
1294                        (true, false)
1295                    };
1296
1297                    match publication {
1298                        Ok(publication) => {
1299                            if canceled {
1300                                live_kit.room.unpublish_track(publication);
1301                            } else {
1302                                if muted {
1303                                    cx.executor().spawn(publication.set_mute(muted)).detach();
1304                                }
1305                                live_kit.microphone_track = LocalTrack::Published {
1306                                    track_publication: publication,
1307                                    muted,
1308                                };
1309                                cx.notify();
1310                            }
1311                            Ok(())
1312                        }
1313                        Err(error) => {
1314                            if canceled {
1315                                Ok(())
1316                            } else {
1317                                live_kit.microphone_track = LocalTrack::None;
1318                                cx.notify();
1319                                Err(error)
1320                            }
1321                        }
1322                    }
1323                })?
1324        })
1325    }
1326
1327    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1328        if self.status.is_offline() {
1329            return Task::ready(Err(anyhow!("room is offline")));
1330        } else if self.is_screen_sharing() {
1331            return Task::ready(Err(anyhow!("screen was already shared")));
1332        }
1333
1334        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1335            let publish_id = post_inc(&mut live_kit.next_publish_id);
1336            live_kit.screen_track = LocalTrack::Pending {
1337                publish_id,
1338                muted: false,
1339            };
1340            cx.notify();
1341            (live_kit.room.display_sources(), publish_id)
1342        } else {
1343            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1344        };
1345
1346        cx.spawn_on_main(move |this, mut cx| async move {
1347            let publish_track = async {
1348                let displays = displays.await?;
1349                let display = displays
1350                    .first()
1351                    .ok_or_else(|| anyhow!("no display found"))?;
1352                let track = LocalVideoTrack::screen_share_for_display(&display);
1353                this.upgrade()
1354                    .ok_or_else(|| anyhow!("room was dropped"))?
1355                    .update(&mut cx, |this, _| {
1356                        this.live_kit
1357                            .as_ref()
1358                            .map(|live_kit| live_kit.room.publish_video_track(track))
1359                    })?
1360                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1361                    .await
1362            };
1363
1364            let publication = publish_track.await;
1365            this.upgrade()
1366                .ok_or_else(|| anyhow!("room was dropped"))?
1367                .update(&mut cx, |this, cx| {
1368                    let live_kit = this
1369                        .live_kit
1370                        .as_mut()
1371                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1372
1373                    let (canceled, muted) = if let LocalTrack::Pending {
1374                        publish_id: cur_publish_id,
1375                        muted,
1376                    } = &live_kit.screen_track
1377                    {
1378                        (*cur_publish_id != publish_id, *muted)
1379                    } else {
1380                        (true, false)
1381                    };
1382
1383                    match publication {
1384                        Ok(publication) => {
1385                            if canceled {
1386                                live_kit.room.unpublish_track(publication);
1387                            } else {
1388                                if muted {
1389                                    cx.executor().spawn(publication.set_mute(muted)).detach();
1390                                }
1391                                live_kit.screen_track = LocalTrack::Published {
1392                                    track_publication: publication,
1393                                    muted,
1394                                };
1395                                cx.notify();
1396                            }
1397
1398                            Audio::play_sound(Sound::StartScreenshare, cx);
1399
1400                            Ok(())
1401                        }
1402                        Err(error) => {
1403                            if canceled {
1404                                Ok(())
1405                            } else {
1406                                live_kit.screen_track = LocalTrack::None;
1407                                cx.notify();
1408                                Err(error)
1409                            }
1410                        }
1411                    }
1412                })?
1413        })
1414    }
1415
1416    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1417        let should_mute = !self.is_muted(cx);
1418        if let Some(live_kit) = self.live_kit.as_mut() {
1419            if matches!(live_kit.microphone_track, LocalTrack::None) {
1420                return Ok(self.share_microphone(cx));
1421            }
1422
1423            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1424            live_kit.muted_by_user = should_mute;
1425
1426            if old_muted == true && live_kit.deafened == true {
1427                if let Some(task) = self.toggle_deafen(cx).ok() {
1428                    task.detach();
1429                }
1430            }
1431
1432            Ok(ret_task)
1433        } else {
1434            Err(anyhow!("LiveKit not started"))
1435        }
1436    }
1437
1438    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1439        if let Some(live_kit) = self.live_kit.as_mut() {
1440            (*live_kit).deafened = !live_kit.deafened;
1441
1442            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1443            // Context notification is sent within set_mute itself.
1444            let mut mute_task = None;
1445            // When deafening, mute user's mic as well.
1446            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1447            if live_kit.deafened || !live_kit.muted_by_user {
1448                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1449            };
1450            for participant in self.remote_participants.values() {
1451                for track in live_kit
1452                    .room
1453                    .remote_audio_track_publications(&participant.user.id.to_string())
1454                {
1455                    let deafened = live_kit.deafened;
1456                    tasks.push(
1457                        cx.executor()
1458                            .spawn_on_main(move || track.set_enabled(!deafened)),
1459                    );
1460                }
1461            }
1462
1463            Ok(cx.executor().spawn_on_main(|| async {
1464                if let Some(mute_task) = mute_task {
1465                    mute_task.await?;
1466                }
1467                for task in tasks {
1468                    task.await?;
1469                }
1470                Ok(())
1471            }))
1472        } else {
1473            Err(anyhow!("LiveKit not started"))
1474        }
1475    }
1476
1477    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1478        if self.status.is_offline() {
1479            return Err(anyhow!("room is offline"));
1480        }
1481
1482        let live_kit = self
1483            .live_kit
1484            .as_mut()
1485            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1486        match mem::take(&mut live_kit.screen_track) {
1487            LocalTrack::None => Err(anyhow!("screen was not shared")),
1488            LocalTrack::Pending { .. } => {
1489                cx.notify();
1490                Ok(())
1491            }
1492            LocalTrack::Published {
1493                track_publication, ..
1494            } => {
1495                live_kit.room.unpublish_track(track_publication);
1496                cx.notify();
1497
1498                Audio::play_sound(Sound::StopScreenshare, cx);
1499                Ok(())
1500            }
1501        }
1502    }
1503
1504    #[cfg(any(test, feature = "test-support"))]
1505    pub fn set_display_sources(&self, sources: Vec<live_kit_client2::MacOSDisplay>) {
1506        self.live_kit
1507            .as_ref()
1508            .unwrap()
1509            .room
1510            .set_display_sources(sources);
1511    }
1512}
1513
1514struct LiveKitRoom {
1515    room: Arc<live_kit_client2::Room>,
1516    screen_track: LocalTrack,
1517    microphone_track: LocalTrack,
1518    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1519    muted_by_user: bool,
1520    deafened: bool,
1521    speaking: bool,
1522    next_publish_id: usize,
1523    _maintain_room: Task<()>,
1524    _maintain_tracks: [Task<()>; 2],
1525}
1526
1527impl LiveKitRoom {
1528    fn set_mute(
1529        self: &mut LiveKitRoom,
1530        should_mute: bool,
1531        cx: &mut ModelContext<Room>,
1532    ) -> Result<(Task<Result<()>>, bool)> {
1533        if !should_mute {
1534            // clear user muting state.
1535            self.muted_by_user = false;
1536        }
1537
1538        let (result, old_muted) = match &mut self.microphone_track {
1539            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1540            LocalTrack::Pending { muted, .. } => {
1541                let old_muted = *muted;
1542                *muted = should_mute;
1543                cx.notify();
1544                Ok((Task::Ready(Some(Ok(()))), old_muted))
1545            }
1546            LocalTrack::Published {
1547                track_publication,
1548                muted,
1549            } => {
1550                let old_muted = *muted;
1551                *muted = should_mute;
1552                cx.notify();
1553                Ok((
1554                    cx.executor().spawn(track_publication.set_mute(*muted)),
1555                    old_muted,
1556                ))
1557            }
1558        }?;
1559
1560        if old_muted != should_mute {
1561            if should_mute {
1562                Audio::play_sound(Sound::Mute, cx);
1563            } else {
1564                Audio::play_sound(Sound::Unmute, cx);
1565            }
1566        }
1567
1568        Ok((result, old_muted))
1569    }
1570}
1571
1572enum LocalTrack {
1573    None,
1574    Pending {
1575        publish_id: usize,
1576        muted: bool,
1577    },
1578    Published {
1579        track_publication: LocalTrackPublication,
1580        muted: bool,
1581    },
1582}
1583
1584impl Default for LocalTrack {
1585    fn default() -> Self {
1586        Self::None
1587    }
1588}
1589
1590#[derive(Copy, Clone, PartialEq, Eq)]
1591pub enum RoomStatus {
1592    Online,
1593    Rejoining,
1594    Offline,
1595}
1596
1597impl RoomStatus {
1598    pub fn is_offline(&self) -> bool {
1599        matches!(self, RoomStatus::Offline)
1600    }
1601
1602    pub fn is_online(&self) -> bool {
1603        matches!(self, RoomStatus::Online)
1604    }
1605}