room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    Left,
  54}
  55
  56pub struct Room {
  57    id: u64,
  58    channel_id: Option<u64>,
  59    live_kit: Option<LiveKitRoom>,
  60    status: RoomStatus,
  61    shared_projects: HashSet<WeakModelHandle<Project>>,
  62    joined_projects: HashSet<WeakModelHandle<Project>>,
  63    local_participant: LocalParticipant,
  64    remote_participants: BTreeMap<u64, RemoteParticipant>,
  65    pending_participants: Vec<Arc<User>>,
  66    participant_user_ids: HashSet<u64>,
  67    pending_call_count: usize,
  68    leave_when_empty: bool,
  69    client: Arc<Client>,
  70    user_store: ModelHandle<UserStore>,
  71    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  72    subscriptions: Vec<client::Subscription>,
  73    pending_room_update: Option<Task<()>>,
  74    maintain_connection: Option<Task<Option<()>>>,
  75}
  76
  77impl Entity for Room {
  78    type Event = Event;
  79
  80    fn release(&mut self, cx: &mut AppContext) {
  81        if self.status.is_online() {
  82            self.leave_internal(cx).detach_and_log_err(cx);
  83        }
  84    }
  85
  86    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  87        if self.status.is_online() {
  88            let leave = self.leave_internal(cx);
  89            Some(
  90                cx.background()
  91                    .spawn(async move {
  92                        leave.await.log_err();
  93                    })
  94                    .boxed(),
  95            )
  96        } else {
  97            None
  98        }
  99    }
 100}
 101
 102impl Room {
 103    pub fn channel_id(&self) -> Option<u64> {
 104        self.channel_id
 105    }
 106
 107    #[cfg(any(test, feature = "test-support"))]
 108    pub fn is_connected(&self) -> bool {
 109        if let Some(live_kit) = self.live_kit.as_ref() {
 110            matches!(
 111                *live_kit.room.status().borrow(),
 112                live_kit_client::ConnectionState::Connected { .. }
 113            )
 114        } else {
 115            false
 116        }
 117    }
 118
 119    fn new(
 120        id: u64,
 121        channel_id: Option<u64>,
 122        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 123        client: Arc<Client>,
 124        user_store: ModelHandle<UserStore>,
 125        cx: &mut ModelContext<Self>,
 126    ) -> Self {
 127        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 128            let room = live_kit_client::Room::new();
 129            let mut status = room.status();
 130            // Consume the initial status of the room.
 131            let _ = status.try_recv();
 132            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 133                while let Some(status) = status.next().await {
 134                    let this = if let Some(this) = this.upgrade(&cx) {
 135                        this
 136                    } else {
 137                        break;
 138                    };
 139
 140                    if status == live_kit_client::ConnectionState::Disconnected {
 141                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 142                        break;
 143                    }
 144                }
 145            });
 146
 147            let mut track_video_changes = room.remote_video_track_updates();
 148            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 149                while let Some(track_change) = track_video_changes.next().await {
 150                    let this = if let Some(this) = this.upgrade(&cx) {
 151                        this
 152                    } else {
 153                        break;
 154                    };
 155
 156                    this.update(&mut cx, |this, cx| {
 157                        this.remote_video_track_updated(track_change, cx).log_err()
 158                    });
 159                }
 160            });
 161
 162            let mut track_audio_changes = room.remote_audio_track_updates();
 163            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 164                while let Some(track_change) = track_audio_changes.next().await {
 165                    let this = if let Some(this) = this.upgrade(&cx) {
 166                        this
 167                    } else {
 168                        break;
 169                    };
 170
 171                    this.update(&mut cx, |this, cx| {
 172                        this.remote_audio_track_updated(track_change, cx).log_err()
 173                    });
 174                }
 175            });
 176
 177            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 178            cx.spawn(|this, mut cx| async move {
 179                connect.await?;
 180
 181                if !cx.read(Self::mute_on_join) {
 182                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 183                        .await?;
 184                }
 185
 186                anyhow::Ok(())
 187            })
 188            .detach_and_log_err(cx);
 189
 190            Some(LiveKitRoom {
 191                room,
 192                screen_track: LocalTrack::None,
 193                microphone_track: LocalTrack::None,
 194                next_publish_id: 0,
 195                muted_by_user: false,
 196                deafened: false,
 197                speaking: false,
 198                _maintain_room,
 199                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 200            })
 201        } else {
 202            None
 203        };
 204
 205        let maintain_connection =
 206            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 207
 208        Audio::play_sound(Sound::Joined, cx);
 209
 210        Self {
 211            id,
 212            channel_id,
 213            live_kit: live_kit_room,
 214            status: RoomStatus::Online,
 215            shared_projects: Default::default(),
 216            joined_projects: Default::default(),
 217            participant_user_ids: Default::default(),
 218            local_participant: Default::default(),
 219            remote_participants: Default::default(),
 220            pending_participants: Default::default(),
 221            pending_call_count: 0,
 222            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 223            leave_when_empty: false,
 224            pending_room_update: None,
 225            client,
 226            user_store,
 227            follows_by_leader_id_project_id: Default::default(),
 228            maintain_connection: Some(maintain_connection),
 229        }
 230    }
 231
 232    pub(crate) fn create(
 233        called_user_id: u64,
 234        initial_project: Option<ModelHandle<Project>>,
 235        client: Arc<Client>,
 236        user_store: ModelHandle<UserStore>,
 237        cx: &mut AppContext,
 238    ) -> Task<Result<ModelHandle<Self>>> {
 239        cx.spawn(|mut cx| async move {
 240            let response = client.request(proto::CreateRoom {}).await?;
 241            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 242            let room = cx.add_model(|cx| {
 243                Self::new(
 244                    room_proto.id,
 245                    None,
 246                    response.live_kit_connection_info,
 247                    client,
 248                    user_store,
 249                    cx,
 250                )
 251            });
 252
 253            let initial_project_id = if let Some(initial_project) = initial_project {
 254                let initial_project_id = room
 255                    .update(&mut cx, |room, cx| {
 256                        room.share_project(initial_project.clone(), cx)
 257                    })
 258                    .await?;
 259                Some(initial_project_id)
 260            } else {
 261                None
 262            };
 263
 264            match room
 265                .update(&mut cx, |room, cx| {
 266                    room.leave_when_empty = true;
 267                    room.call(called_user_id, initial_project_id, cx)
 268                })
 269                .await
 270            {
 271                Ok(()) => Ok(room),
 272                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 273            }
 274        })
 275    }
 276
 277    pub(crate) fn join_channel(
 278        channel_id: u64,
 279        client: Arc<Client>,
 280        user_store: ModelHandle<UserStore>,
 281        cx: &mut AppContext,
 282    ) -> Task<Result<ModelHandle<Self>>> {
 283        cx.spawn(|cx| async move {
 284            Self::from_join_response(
 285                client.request(proto::JoinChannel { channel_id }).await?,
 286                client,
 287                user_store,
 288                cx,
 289            )
 290        })
 291    }
 292
 293    pub(crate) fn join(
 294        call: &IncomingCall,
 295        client: Arc<Client>,
 296        user_store: ModelHandle<UserStore>,
 297        cx: &mut AppContext,
 298    ) -> Task<Result<ModelHandle<Self>>> {
 299        let id = call.room_id;
 300        cx.spawn(|cx| async move {
 301            Self::from_join_response(
 302                client.request(proto::JoinRoom { id }).await?,
 303                client,
 304                user_store,
 305                cx,
 306            )
 307        })
 308    }
 309
 310    pub fn mute_on_join(cx: &AppContext) -> bool {
 311        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 312    }
 313
 314    fn from_join_response(
 315        response: proto::JoinRoomResponse,
 316        client: Arc<Client>,
 317        user_store: ModelHandle<UserStore>,
 318        mut cx: AsyncAppContext,
 319    ) -> Result<ModelHandle<Self>> {
 320        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 321        let room = cx.add_model(|cx| {
 322            Self::new(
 323                room_proto.id,
 324                response.channel_id,
 325                response.live_kit_connection_info,
 326                client,
 327                user_store,
 328                cx,
 329            )
 330        });
 331        room.update(&mut cx, |room, cx| {
 332            room.leave_when_empty = room.channel_id.is_none();
 333            room.apply_room_update(room_proto, cx)?;
 334            anyhow::Ok(())
 335        })?;
 336        Ok(room)
 337    }
 338
 339    fn should_leave(&self) -> bool {
 340        self.leave_when_empty
 341            && self.pending_room_update.is_none()
 342            && self.pending_participants.is_empty()
 343            && self.remote_participants.is_empty()
 344            && self.pending_call_count == 0
 345    }
 346
 347    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 348        cx.notify();
 349        cx.emit(Event::Left);
 350        self.leave_internal(cx)
 351    }
 352
 353    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 354        if self.status.is_offline() {
 355            return Task::ready(Err(anyhow!("room is offline")));
 356        }
 357
 358        log::info!("leaving room");
 359        Audio::play_sound(Sound::Leave, cx);
 360
 361        self.clear_state(cx);
 362
 363        let leave_room = self.client.request(proto::LeaveRoom {});
 364        cx.background().spawn(async move {
 365            leave_room.await?;
 366            anyhow::Ok(())
 367        })
 368    }
 369
 370    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 371        for project in self.shared_projects.drain() {
 372            if let Some(project) = project.upgrade(cx) {
 373                project.update(cx, |project, cx| {
 374                    project.unshare(cx).log_err();
 375                });
 376            }
 377        }
 378        for project in self.joined_projects.drain() {
 379            if let Some(project) = project.upgrade(cx) {
 380                project.update(cx, |project, cx| {
 381                    project.disconnected_from_host(cx);
 382                    project.close(cx);
 383                });
 384            }
 385        }
 386
 387        self.status = RoomStatus::Offline;
 388        self.remote_participants.clear();
 389        self.pending_participants.clear();
 390        self.participant_user_ids.clear();
 391        self.subscriptions.clear();
 392        self.live_kit.take();
 393        self.pending_room_update.take();
 394        self.maintain_connection.take();
 395    }
 396
 397    async fn maintain_connection(
 398        this: WeakModelHandle<Self>,
 399        client: Arc<Client>,
 400        mut cx: AsyncAppContext,
 401    ) -> Result<()> {
 402        let mut client_status = client.status();
 403        loop {
 404            let _ = client_status.try_recv();
 405            let is_connected = client_status.borrow().is_connected();
 406            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 407            if !is_connected || client_status.next().await.is_some() {
 408                log::info!("detected client disconnection");
 409
 410                this.upgrade(&cx)
 411                    .ok_or_else(|| anyhow!("room was dropped"))?
 412                    .update(&mut cx, |this, cx| {
 413                        this.status = RoomStatus::Rejoining;
 414                        cx.notify();
 415                    });
 416
 417                // Wait for client to re-establish a connection to the server.
 418                {
 419                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 420                    let client_reconnection = async {
 421                        let mut remaining_attempts = 3;
 422                        while remaining_attempts > 0 {
 423                            if client_status.borrow().is_connected() {
 424                                log::info!("client reconnected, attempting to rejoin room");
 425
 426                                let Some(this) = this.upgrade(&cx) else { break };
 427                                if this
 428                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 429                                    .await
 430                                    .log_err()
 431                                    .is_some()
 432                                {
 433                                    return true;
 434                                } else {
 435                                    remaining_attempts -= 1;
 436                                }
 437                            } else if client_status.borrow().is_signed_out() {
 438                                return false;
 439                            }
 440
 441                            log::info!(
 442                                "waiting for client status change, remaining attempts {}",
 443                                remaining_attempts
 444                            );
 445                            client_status.next().await;
 446                        }
 447                        false
 448                    }
 449                    .fuse();
 450                    futures::pin_mut!(client_reconnection);
 451
 452                    futures::select_biased! {
 453                        reconnected = client_reconnection => {
 454                            if reconnected {
 455                                log::info!("successfully reconnected to room");
 456                                // If we successfully joined the room, go back around the loop
 457                                // waiting for future connection status changes.
 458                                continue;
 459                            }
 460                        }
 461                        _ = reconnection_timeout => {
 462                            log::info!("room reconnection timeout expired");
 463                        }
 464                    }
 465                }
 466
 467                break;
 468            }
 469        }
 470
 471        // The client failed to re-establish a connection to the server
 472        // or an error occurred while trying to re-join the room. Either way
 473        // we leave the room and return an error.
 474        if let Some(this) = this.upgrade(&cx) {
 475            log::info!("reconnection failed, leaving room");
 476            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 477        }
 478        Err(anyhow!(
 479            "can't reconnect to room: client failed to re-establish connection"
 480        ))
 481    }
 482
 483    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 484        let mut projects = HashMap::default();
 485        let mut reshared_projects = Vec::new();
 486        let mut rejoined_projects = Vec::new();
 487        self.shared_projects.retain(|project| {
 488            if let Some(handle) = project.upgrade(cx) {
 489                let project = handle.read(cx);
 490                if let Some(project_id) = project.remote_id() {
 491                    projects.insert(project_id, handle.clone());
 492                    reshared_projects.push(proto::UpdateProject {
 493                        project_id,
 494                        worktrees: project.worktree_metadata_protos(cx),
 495                    });
 496                    return true;
 497                }
 498            }
 499            false
 500        });
 501        self.joined_projects.retain(|project| {
 502            if let Some(handle) = project.upgrade(cx) {
 503                let project = handle.read(cx);
 504                if let Some(project_id) = project.remote_id() {
 505                    projects.insert(project_id, handle.clone());
 506                    rejoined_projects.push(proto::RejoinProject {
 507                        id: project_id,
 508                        worktrees: project
 509                            .worktrees(cx)
 510                            .map(|worktree| {
 511                                let worktree = worktree.read(cx);
 512                                proto::RejoinWorktree {
 513                                    id: worktree.id().to_proto(),
 514                                    scan_id: worktree.completed_scan_id() as u64,
 515                                }
 516                            })
 517                            .collect(),
 518                    });
 519                }
 520                return true;
 521            }
 522            false
 523        });
 524
 525        let response = self.client.request_envelope(proto::RejoinRoom {
 526            id: self.id,
 527            reshared_projects,
 528            rejoined_projects,
 529        });
 530
 531        cx.spawn(|this, mut cx| async move {
 532            let response = response.await?;
 533            let message_id = response.message_id;
 534            let response = response.payload;
 535            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 536            this.update(&mut cx, |this, cx| {
 537                this.status = RoomStatus::Online;
 538                this.apply_room_update(room_proto, cx)?;
 539
 540                for reshared_project in response.reshared_projects {
 541                    if let Some(project) = projects.get(&reshared_project.id) {
 542                        project.update(cx, |project, cx| {
 543                            project.reshared(reshared_project, cx).log_err();
 544                        });
 545                    }
 546                }
 547
 548                for rejoined_project in response.rejoined_projects {
 549                    if let Some(project) = projects.get(&rejoined_project.id) {
 550                        project.update(cx, |project, cx| {
 551                            project.rejoined(rejoined_project, message_id, cx).log_err();
 552                        });
 553                    }
 554                }
 555
 556                anyhow::Ok(())
 557            })
 558        })
 559    }
 560
 561    pub fn id(&self) -> u64 {
 562        self.id
 563    }
 564
 565    pub fn status(&self) -> RoomStatus {
 566        self.status
 567    }
 568
 569    pub fn local_participant(&self) -> &LocalParticipant {
 570        &self.local_participant
 571    }
 572
 573    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 574        &self.remote_participants
 575    }
 576
 577    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 578        self.remote_participants
 579            .values()
 580            .find(|p| p.peer_id == peer_id)
 581    }
 582
 583    pub fn pending_participants(&self) -> &[Arc<User>] {
 584        &self.pending_participants
 585    }
 586
 587    pub fn contains_participant(&self, user_id: u64) -> bool {
 588        self.participant_user_ids.contains(&user_id)
 589    }
 590
 591    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 592        self.follows_by_leader_id_project_id
 593            .get(&(leader_id, project_id))
 594            .map_or(&[], |v| v.as_slice())
 595    }
 596
 597    /// projects_to_join returns a list of shared projects sorted such
 598    /// that the most 'active' projects appear last.
 599    pub fn projects_to_join(&self) -> Vec<(u64, u64)> {
 600        let mut projects = HashMap::default();
 601        let mut hosts = HashMap::default();
 602        for participant in self.remote_participants.values() {
 603            match participant.location {
 604                ParticipantLocation::SharedProject { project_id } => {
 605                    *projects.entry(project_id).or_insert(0) += 1;
 606                }
 607                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 608            }
 609            for project in &participant.projects {
 610                *projects.entry(project.id).or_insert(0) += 1;
 611                hosts.insert(project.id, participant.user.id);
 612            }
 613        }
 614
 615        let mut pairs: Vec<(u64, usize)> = projects.into_iter().collect();
 616        pairs.sort_by_key(|(_, count)| 0 - *count as i32);
 617
 618        pairs
 619            .into_iter()
 620            .map(|(project_id, _)| (project_id, hosts[&project_id]))
 621            .collect()
 622    }
 623
 624    async fn handle_room_updated(
 625        this: ModelHandle<Self>,
 626        envelope: TypedEnvelope<proto::RoomUpdated>,
 627        _: Arc<Client>,
 628        mut cx: AsyncAppContext,
 629    ) -> Result<()> {
 630        let room = envelope
 631            .payload
 632            .room
 633            .ok_or_else(|| anyhow!("invalid room"))?;
 634        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 635    }
 636
 637    fn apply_room_update(
 638        &mut self,
 639        mut room: proto::Room,
 640        cx: &mut ModelContext<Self>,
 641    ) -> Result<()> {
 642        // Filter ourselves out from the room's participants.
 643        let local_participant_ix = room
 644            .participants
 645            .iter()
 646            .position(|participant| Some(participant.user_id) == self.client.user_id());
 647        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 648
 649        let pending_participant_user_ids = room
 650            .pending_participants
 651            .iter()
 652            .map(|p| p.user_id)
 653            .collect::<Vec<_>>();
 654
 655        let remote_participant_user_ids = room
 656            .participants
 657            .iter()
 658            .map(|p| p.user_id)
 659            .collect::<Vec<_>>();
 660
 661        let (remote_participants, pending_participants) =
 662            self.user_store.update(cx, move |user_store, cx| {
 663                (
 664                    user_store.get_users(remote_participant_user_ids, cx),
 665                    user_store.get_users(pending_participant_user_ids, cx),
 666                )
 667            });
 668
 669        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 670            let (remote_participants, pending_participants) =
 671                futures::join!(remote_participants, pending_participants);
 672
 673            this.update(&mut cx, |this, cx| {
 674                this.participant_user_ids.clear();
 675
 676                if let Some(participant) = local_participant {
 677                    this.local_participant.projects = participant.projects;
 678                } else {
 679                    this.local_participant.projects.clear();
 680                }
 681
 682                if let Some(participants) = remote_participants.log_err() {
 683                    for (participant, user) in room.participants.into_iter().zip(participants) {
 684                        let Some(peer_id) = participant.peer_id else {
 685                            continue;
 686                        };
 687                        this.participant_user_ids.insert(participant.user_id);
 688
 689                        let old_projects = this
 690                            .remote_participants
 691                            .get(&participant.user_id)
 692                            .into_iter()
 693                            .flat_map(|existing| &existing.projects)
 694                            .map(|project| project.id)
 695                            .collect::<HashSet<_>>();
 696                        let new_projects = participant
 697                            .projects
 698                            .iter()
 699                            .map(|project| project.id)
 700                            .collect::<HashSet<_>>();
 701
 702                        for project in &participant.projects {
 703                            if !old_projects.contains(&project.id) {
 704                                cx.emit(Event::RemoteProjectShared {
 705                                    owner: user.clone(),
 706                                    project_id: project.id,
 707                                    worktree_root_names: project.worktree_root_names.clone(),
 708                                });
 709                            }
 710                        }
 711
 712                        for unshared_project_id in old_projects.difference(&new_projects) {
 713                            this.joined_projects.retain(|project| {
 714                                if let Some(project) = project.upgrade(cx) {
 715                                    project.update(cx, |project, cx| {
 716                                        if project.remote_id() == Some(*unshared_project_id) {
 717                                            project.disconnected_from_host(cx);
 718                                            false
 719                                        } else {
 720                                            true
 721                                        }
 722                                    })
 723                                } else {
 724                                    false
 725                                }
 726                            });
 727                            cx.emit(Event::RemoteProjectUnshared {
 728                                project_id: *unshared_project_id,
 729                            });
 730                        }
 731
 732                        let location = ParticipantLocation::from_proto(participant.location)
 733                            .unwrap_or(ParticipantLocation::External);
 734                        if let Some(remote_participant) =
 735                            this.remote_participants.get_mut(&participant.user_id)
 736                        {
 737                            remote_participant.projects = participant.projects;
 738                            remote_participant.peer_id = peer_id;
 739                            if location != remote_participant.location {
 740                                remote_participant.location = location;
 741                                cx.emit(Event::ParticipantLocationChanged {
 742                                    participant_id: peer_id,
 743                                });
 744                            }
 745                        } else {
 746                            this.remote_participants.insert(
 747                                participant.user_id,
 748                                RemoteParticipant {
 749                                    user: user.clone(),
 750                                    participant_index: ParticipantIndex(
 751                                        participant.participant_index,
 752                                    ),
 753                                    peer_id,
 754                                    projects: participant.projects,
 755                                    location,
 756                                    muted: true,
 757                                    speaking: false,
 758                                    video_tracks: Default::default(),
 759                                    audio_tracks: Default::default(),
 760                                },
 761                            );
 762
 763                            Audio::play_sound(Sound::Joined, cx);
 764
 765                            if let Some(live_kit) = this.live_kit.as_ref() {
 766                                let video_tracks =
 767                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 768                                let audio_tracks =
 769                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 770                                let publications = live_kit
 771                                    .room
 772                                    .remote_audio_track_publications(&user.id.to_string());
 773
 774                                for track in video_tracks {
 775                                    this.remote_video_track_updated(
 776                                        RemoteVideoTrackUpdate::Subscribed(track),
 777                                        cx,
 778                                    )
 779                                    .log_err();
 780                                }
 781
 782                                for (track, publication) in
 783                                    audio_tracks.iter().zip(publications.iter())
 784                                {
 785                                    this.remote_audio_track_updated(
 786                                        RemoteAudioTrackUpdate::Subscribed(
 787                                            track.clone(),
 788                                            publication.clone(),
 789                                        ),
 790                                        cx,
 791                                    )
 792                                    .log_err();
 793                                }
 794                            }
 795                        }
 796                    }
 797
 798                    this.remote_participants.retain(|user_id, participant| {
 799                        if this.participant_user_ids.contains(user_id) {
 800                            true
 801                        } else {
 802                            for project in &participant.projects {
 803                                cx.emit(Event::RemoteProjectUnshared {
 804                                    project_id: project.id,
 805                                });
 806                            }
 807                            false
 808                        }
 809                    });
 810                }
 811
 812                if let Some(pending_participants) = pending_participants.log_err() {
 813                    this.pending_participants = pending_participants;
 814                    for participant in &this.pending_participants {
 815                        this.participant_user_ids.insert(participant.id);
 816                    }
 817                }
 818
 819                this.follows_by_leader_id_project_id.clear();
 820                for follower in room.followers {
 821                    let project_id = follower.project_id;
 822                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 823                        (Some(leader), Some(follower)) => (leader, follower),
 824
 825                        _ => {
 826                            log::error!("Follower message {follower:?} missing some state");
 827                            continue;
 828                        }
 829                    };
 830
 831                    let list = this
 832                        .follows_by_leader_id_project_id
 833                        .entry((leader, project_id))
 834                        .or_insert(Vec::new());
 835                    if !list.contains(&follower) {
 836                        list.push(follower);
 837                    }
 838                }
 839
 840                this.pending_room_update.take();
 841                if this.should_leave() {
 842                    log::info!("room is empty, leaving");
 843                    let _ = this.leave(cx);
 844                }
 845
 846                this.user_store.update(cx, |user_store, cx| {
 847                    let participant_indices_by_user_id = this
 848                        .remote_participants
 849                        .iter()
 850                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 851                        .collect();
 852                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 853                });
 854
 855                this.check_invariants();
 856                cx.notify();
 857            });
 858        }));
 859
 860        cx.notify();
 861        Ok(())
 862    }
 863
 864    fn remote_video_track_updated(
 865        &mut self,
 866        change: RemoteVideoTrackUpdate,
 867        cx: &mut ModelContext<Self>,
 868    ) -> Result<()> {
 869        match change {
 870            RemoteVideoTrackUpdate::Subscribed(track) => {
 871                let user_id = track.publisher_id().parse()?;
 872                let track_id = track.sid().to_string();
 873                let participant = self
 874                    .remote_participants
 875                    .get_mut(&user_id)
 876                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 877                participant.video_tracks.insert(
 878                    track_id.clone(),
 879                    Arc::new(RemoteVideoTrack {
 880                        live_kit_track: track,
 881                    }),
 882                );
 883                cx.emit(Event::RemoteVideoTracksChanged {
 884                    participant_id: participant.peer_id,
 885                });
 886            }
 887            RemoteVideoTrackUpdate::Unsubscribed {
 888                publisher_id,
 889                track_id,
 890            } => {
 891                let user_id = publisher_id.parse()?;
 892                let participant = self
 893                    .remote_participants
 894                    .get_mut(&user_id)
 895                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 896                participant.video_tracks.remove(&track_id);
 897                cx.emit(Event::RemoteVideoTracksChanged {
 898                    participant_id: participant.peer_id,
 899                });
 900            }
 901        }
 902
 903        cx.notify();
 904        Ok(())
 905    }
 906
 907    fn remote_audio_track_updated(
 908        &mut self,
 909        change: RemoteAudioTrackUpdate,
 910        cx: &mut ModelContext<Self>,
 911    ) -> Result<()> {
 912        match change {
 913            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 914                let mut speaker_ids = speakers
 915                    .into_iter()
 916                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 917                    .collect::<Vec<u64>>();
 918                speaker_ids.sort_unstable();
 919                for (sid, participant) in &mut self.remote_participants {
 920                    if let Ok(_) = speaker_ids.binary_search(sid) {
 921                        participant.speaking = true;
 922                    } else {
 923                        participant.speaking = false;
 924                    }
 925                }
 926                if let Some(id) = self.client.user_id() {
 927                    if let Some(room) = &mut self.live_kit {
 928                        if let Ok(_) = speaker_ids.binary_search(&id) {
 929                            room.speaking = true;
 930                        } else {
 931                            room.speaking = false;
 932                        }
 933                    }
 934                }
 935                cx.notify();
 936            }
 937            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 938                let mut found = false;
 939                for participant in &mut self.remote_participants.values_mut() {
 940                    for track in participant.audio_tracks.values() {
 941                        if track.sid() == track_id {
 942                            found = true;
 943                            break;
 944                        }
 945                    }
 946                    if found {
 947                        participant.muted = muted;
 948                        break;
 949                    }
 950                }
 951
 952                cx.notify();
 953            }
 954            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 955                let user_id = track.publisher_id().parse()?;
 956                let track_id = track.sid().to_string();
 957                let participant = self
 958                    .remote_participants
 959                    .get_mut(&user_id)
 960                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 961
 962                participant.audio_tracks.insert(track_id.clone(), track);
 963                participant.muted = publication.is_muted();
 964
 965                cx.emit(Event::RemoteAudioTracksChanged {
 966                    participant_id: participant.peer_id,
 967                });
 968            }
 969            RemoteAudioTrackUpdate::Unsubscribed {
 970                publisher_id,
 971                track_id,
 972            } => {
 973                let user_id = publisher_id.parse()?;
 974                let participant = self
 975                    .remote_participants
 976                    .get_mut(&user_id)
 977                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 978                participant.audio_tracks.remove(&track_id);
 979                cx.emit(Event::RemoteAudioTracksChanged {
 980                    participant_id: participant.peer_id,
 981                });
 982            }
 983        }
 984
 985        cx.notify();
 986        Ok(())
 987    }
 988
 989    fn check_invariants(&self) {
 990        #[cfg(any(test, feature = "test-support"))]
 991        {
 992            for participant in self.remote_participants.values() {
 993                assert!(self.participant_user_ids.contains(&participant.user.id));
 994                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 995            }
 996
 997            for participant in &self.pending_participants {
 998                assert!(self.participant_user_ids.contains(&participant.id));
 999                assert_ne!(participant.id, self.client.user_id().unwrap());
1000            }
1001
1002            assert_eq!(
1003                self.participant_user_ids.len(),
1004                self.remote_participants.len() + self.pending_participants.len()
1005            );
1006        }
1007    }
1008
1009    pub(crate) fn call(
1010        &mut self,
1011        called_user_id: u64,
1012        initial_project_id: Option<u64>,
1013        cx: &mut ModelContext<Self>,
1014    ) -> Task<Result<()>> {
1015        if self.status.is_offline() {
1016            return Task::ready(Err(anyhow!("room is offline")));
1017        }
1018
1019        cx.notify();
1020        let client = self.client.clone();
1021        let room_id = self.id;
1022        self.pending_call_count += 1;
1023        cx.spawn(|this, mut cx| async move {
1024            let result = client
1025                .request(proto::Call {
1026                    room_id,
1027                    called_user_id,
1028                    initial_project_id,
1029                })
1030                .await;
1031            this.update(&mut cx, |this, cx| {
1032                this.pending_call_count -= 1;
1033                if this.should_leave() {
1034                    this.leave(cx).detach_and_log_err(cx);
1035                }
1036            });
1037            result?;
1038            Ok(())
1039        })
1040    }
1041
1042    pub fn join_project(
1043        &mut self,
1044        id: u64,
1045        language_registry: Arc<LanguageRegistry>,
1046        fs: Arc<dyn Fs>,
1047        cx: &mut ModelContext<Self>,
1048    ) -> Task<Result<ModelHandle<Project>>> {
1049        let client = self.client.clone();
1050        let user_store = self.user_store.clone();
1051        cx.emit(Event::RemoteProjectJoined { project_id: id });
1052        cx.spawn(|this, mut cx| async move {
1053            let project =
1054                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1055
1056            this.update(&mut cx, |this, cx| {
1057                this.joined_projects.retain(|project| {
1058                    if let Some(project) = project.upgrade(cx) {
1059                        !project.read(cx).is_read_only()
1060                    } else {
1061                        false
1062                    }
1063                });
1064                this.joined_projects.insert(project.downgrade());
1065            });
1066            Ok(project)
1067        })
1068    }
1069
1070    pub(crate) fn share_project(
1071        &mut self,
1072        project: ModelHandle<Project>,
1073        cx: &mut ModelContext<Self>,
1074    ) -> Task<Result<u64>> {
1075        if let Some(project_id) = project.read(cx).remote_id() {
1076            return Task::ready(Ok(project_id));
1077        }
1078
1079        let request = self.client.request(proto::ShareProject {
1080            room_id: self.id(),
1081            worktrees: project.read(cx).worktree_metadata_protos(cx),
1082        });
1083        cx.spawn(|this, mut cx| async move {
1084            let response = request.await?;
1085
1086            project.update(&mut cx, |project, cx| {
1087                project.shared(response.project_id, cx)
1088            })?;
1089
1090            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1091            this.update(&mut cx, |this, cx| {
1092                this.shared_projects.insert(project.downgrade());
1093                let active_project = this.local_participant.active_project.as_ref();
1094                if active_project.map_or(false, |location| *location == project) {
1095                    this.set_location(Some(&project), cx)
1096                } else {
1097                    Task::ready(Ok(()))
1098                }
1099            })
1100            .await?;
1101
1102            Ok(response.project_id)
1103        })
1104    }
1105
1106    pub(crate) fn unshare_project(
1107        &mut self,
1108        project: ModelHandle<Project>,
1109        cx: &mut ModelContext<Self>,
1110    ) -> Result<()> {
1111        let project_id = match project.read(cx).remote_id() {
1112            Some(project_id) => project_id,
1113            None => return Ok(()),
1114        };
1115
1116        self.client.send(proto::UnshareProject { project_id })?;
1117        project.update(cx, |this, cx| this.unshare(cx))
1118    }
1119
1120    pub(crate) fn set_location(
1121        &mut self,
1122        project: Option<&ModelHandle<Project>>,
1123        cx: &mut ModelContext<Self>,
1124    ) -> Task<Result<()>> {
1125        if self.status.is_offline() {
1126            return Task::ready(Err(anyhow!("room is offline")));
1127        }
1128
1129        let client = self.client.clone();
1130        let room_id = self.id;
1131        let location = if let Some(project) = project {
1132            self.local_participant.active_project = Some(project.downgrade());
1133            if let Some(project_id) = project.read(cx).remote_id() {
1134                proto::participant_location::Variant::SharedProject(
1135                    proto::participant_location::SharedProject { id: project_id },
1136                )
1137            } else {
1138                proto::participant_location::Variant::UnsharedProject(
1139                    proto::participant_location::UnsharedProject {},
1140                )
1141            }
1142        } else {
1143            self.local_participant.active_project = None;
1144            proto::participant_location::Variant::External(proto::participant_location::External {})
1145        };
1146
1147        cx.notify();
1148        cx.foreground().spawn(async move {
1149            client
1150                .request(proto::UpdateParticipantLocation {
1151                    room_id,
1152                    location: Some(proto::ParticipantLocation {
1153                        variant: Some(location),
1154                    }),
1155                })
1156                .await?;
1157            Ok(())
1158        })
1159    }
1160
1161    pub fn is_screen_sharing(&self) -> bool {
1162        self.live_kit.as_ref().map_or(false, |live_kit| {
1163            !matches!(live_kit.screen_track, LocalTrack::None)
1164        })
1165    }
1166
1167    pub fn is_sharing_mic(&self) -> bool {
1168        self.live_kit.as_ref().map_or(false, |live_kit| {
1169            !matches!(live_kit.microphone_track, LocalTrack::None)
1170        })
1171    }
1172
1173    pub fn is_muted(&self, cx: &AppContext) -> bool {
1174        self.live_kit
1175            .as_ref()
1176            .and_then(|live_kit| match &live_kit.microphone_track {
1177                LocalTrack::None => Some(Self::mute_on_join(cx)),
1178                LocalTrack::Pending { muted, .. } => Some(*muted),
1179                LocalTrack::Published { muted, .. } => Some(*muted),
1180            })
1181            .unwrap_or(false)
1182    }
1183
1184    pub fn is_speaking(&self) -> bool {
1185        self.live_kit
1186            .as_ref()
1187            .map_or(false, |live_kit| live_kit.speaking)
1188    }
1189
1190    pub fn is_deafened(&self) -> Option<bool> {
1191        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1192    }
1193
1194    #[track_caller]
1195    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1196        if self.status.is_offline() {
1197            return Task::ready(Err(anyhow!("room is offline")));
1198        } else if self.is_sharing_mic() {
1199            return Task::ready(Err(anyhow!("microphone was already shared")));
1200        }
1201
1202        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1203            let publish_id = post_inc(&mut live_kit.next_publish_id);
1204            live_kit.microphone_track = LocalTrack::Pending {
1205                publish_id,
1206                muted: false,
1207            };
1208            cx.notify();
1209            publish_id
1210        } else {
1211            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1212        };
1213
1214        cx.spawn_weak(|this, mut cx| async move {
1215            let publish_track = async {
1216                let track = LocalAudioTrack::create();
1217                this.upgrade(&cx)
1218                    .ok_or_else(|| anyhow!("room was dropped"))?
1219                    .read_with(&cx, |this, _| {
1220                        this.live_kit
1221                            .as_ref()
1222                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1223                    })
1224                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1225                    .await
1226            };
1227
1228            let publication = publish_track.await;
1229            this.upgrade(&cx)
1230                .ok_or_else(|| anyhow!("room was dropped"))?
1231                .update(&mut cx, |this, cx| {
1232                    let live_kit = this
1233                        .live_kit
1234                        .as_mut()
1235                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1236
1237                    let (canceled, muted) = if let LocalTrack::Pending {
1238                        publish_id: cur_publish_id,
1239                        muted,
1240                    } = &live_kit.microphone_track
1241                    {
1242                        (*cur_publish_id != publish_id, *muted)
1243                    } else {
1244                        (true, false)
1245                    };
1246
1247                    match publication {
1248                        Ok(publication) => {
1249                            if canceled {
1250                                live_kit.room.unpublish_track(publication);
1251                            } else {
1252                                if muted {
1253                                    cx.background().spawn(publication.set_mute(muted)).detach();
1254                                }
1255                                live_kit.microphone_track = LocalTrack::Published {
1256                                    track_publication: publication,
1257                                    muted,
1258                                };
1259                                cx.notify();
1260                            }
1261                            Ok(())
1262                        }
1263                        Err(error) => {
1264                            if canceled {
1265                                Ok(())
1266                            } else {
1267                                live_kit.microphone_track = LocalTrack::None;
1268                                cx.notify();
1269                                Err(error)
1270                            }
1271                        }
1272                    }
1273                })
1274        })
1275    }
1276
1277    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1278        if self.status.is_offline() {
1279            return Task::ready(Err(anyhow!("room is offline")));
1280        } else if self.is_screen_sharing() {
1281            return Task::ready(Err(anyhow!("screen was already shared")));
1282        }
1283
1284        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1285            let publish_id = post_inc(&mut live_kit.next_publish_id);
1286            live_kit.screen_track = LocalTrack::Pending {
1287                publish_id,
1288                muted: false,
1289            };
1290            cx.notify();
1291            (live_kit.room.display_sources(), publish_id)
1292        } else {
1293            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1294        };
1295
1296        cx.spawn_weak(|this, mut cx| async move {
1297            let publish_track = async {
1298                let displays = displays.await?;
1299                let display = displays
1300                    .first()
1301                    .ok_or_else(|| anyhow!("no display found"))?;
1302                let track = LocalVideoTrack::screen_share_for_display(&display);
1303                this.upgrade(&cx)
1304                    .ok_or_else(|| anyhow!("room was dropped"))?
1305                    .read_with(&cx, |this, _| {
1306                        this.live_kit
1307                            .as_ref()
1308                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1309                    })
1310                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1311                    .await
1312            };
1313
1314            let publication = publish_track.await;
1315            this.upgrade(&cx)
1316                .ok_or_else(|| anyhow!("room was dropped"))?
1317                .update(&mut cx, |this, cx| {
1318                    let live_kit = this
1319                        .live_kit
1320                        .as_mut()
1321                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1322
1323                    let (canceled, muted) = if let LocalTrack::Pending {
1324                        publish_id: cur_publish_id,
1325                        muted,
1326                    } = &live_kit.screen_track
1327                    {
1328                        (*cur_publish_id != publish_id, *muted)
1329                    } else {
1330                        (true, false)
1331                    };
1332
1333                    match publication {
1334                        Ok(publication) => {
1335                            if canceled {
1336                                live_kit.room.unpublish_track(publication);
1337                            } else {
1338                                if muted {
1339                                    cx.background().spawn(publication.set_mute(muted)).detach();
1340                                }
1341                                live_kit.screen_track = LocalTrack::Published {
1342                                    track_publication: publication,
1343                                    muted,
1344                                };
1345                                cx.notify();
1346                            }
1347
1348                            Audio::play_sound(Sound::StartScreenshare, cx);
1349
1350                            Ok(())
1351                        }
1352                        Err(error) => {
1353                            if canceled {
1354                                Ok(())
1355                            } else {
1356                                live_kit.screen_track = LocalTrack::None;
1357                                cx.notify();
1358                                Err(error)
1359                            }
1360                        }
1361                    }
1362                })
1363        })
1364    }
1365
1366    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1367        let should_mute = !self.is_muted(cx);
1368        if let Some(live_kit) = self.live_kit.as_mut() {
1369            if matches!(live_kit.microphone_track, LocalTrack::None) {
1370                return Ok(self.share_microphone(cx));
1371            }
1372
1373            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1374            live_kit.muted_by_user = should_mute;
1375
1376            if old_muted == true && live_kit.deafened == true {
1377                if let Some(task) = self.toggle_deafen(cx).ok() {
1378                    task.detach();
1379                }
1380            }
1381
1382            Ok(ret_task)
1383        } else {
1384            Err(anyhow!("LiveKit not started"))
1385        }
1386    }
1387
1388    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1389        if let Some(live_kit) = self.live_kit.as_mut() {
1390            (*live_kit).deafened = !live_kit.deafened;
1391
1392            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1393            // Context notification is sent within set_mute itself.
1394            let mut mute_task = None;
1395            // When deafening, mute user's mic as well.
1396            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1397            if live_kit.deafened || !live_kit.muted_by_user {
1398                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1399            };
1400            for participant in self.remote_participants.values() {
1401                for track in live_kit
1402                    .room
1403                    .remote_audio_track_publications(&participant.user.id.to_string())
1404                {
1405                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1406                }
1407            }
1408
1409            Ok(cx.foreground().spawn(async move {
1410                if let Some(mute_task) = mute_task {
1411                    mute_task.await?;
1412                }
1413                for task in tasks {
1414                    task.await?;
1415                }
1416                Ok(())
1417            }))
1418        } else {
1419            Err(anyhow!("LiveKit not started"))
1420        }
1421    }
1422
1423    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1424        if self.status.is_offline() {
1425            return Err(anyhow!("room is offline"));
1426        }
1427
1428        let live_kit = self
1429            .live_kit
1430            .as_mut()
1431            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1432        match mem::take(&mut live_kit.screen_track) {
1433            LocalTrack::None => Err(anyhow!("screen was not shared")),
1434            LocalTrack::Pending { .. } => {
1435                cx.notify();
1436                Ok(())
1437            }
1438            LocalTrack::Published {
1439                track_publication, ..
1440            } => {
1441                live_kit.room.unpublish_track(track_publication);
1442                cx.notify();
1443
1444                Audio::play_sound(Sound::StopScreenshare, cx);
1445                Ok(())
1446            }
1447        }
1448    }
1449
1450    #[cfg(any(test, feature = "test-support"))]
1451    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1452        self.live_kit
1453            .as_ref()
1454            .unwrap()
1455            .room
1456            .set_display_sources(sources);
1457    }
1458}
1459
1460struct LiveKitRoom {
1461    room: Arc<live_kit_client::Room>,
1462    screen_track: LocalTrack,
1463    microphone_track: LocalTrack,
1464    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1465    muted_by_user: bool,
1466    deafened: bool,
1467    speaking: bool,
1468    next_publish_id: usize,
1469    _maintain_room: Task<()>,
1470    _maintain_tracks: [Task<()>; 2],
1471}
1472
1473impl LiveKitRoom {
1474    fn set_mute(
1475        self: &mut LiveKitRoom,
1476        should_mute: bool,
1477        cx: &mut ModelContext<Room>,
1478    ) -> Result<(Task<Result<()>>, bool)> {
1479        if !should_mute {
1480            // clear user muting state.
1481            self.muted_by_user = false;
1482        }
1483
1484        let (result, old_muted) = match &mut self.microphone_track {
1485            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1486            LocalTrack::Pending { muted, .. } => {
1487                let old_muted = *muted;
1488                *muted = should_mute;
1489                cx.notify();
1490                Ok((Task::Ready(Some(Ok(()))), old_muted))
1491            }
1492            LocalTrack::Published {
1493                track_publication,
1494                muted,
1495            } => {
1496                let old_muted = *muted;
1497                *muted = should_mute;
1498                cx.notify();
1499                Ok((
1500                    cx.background().spawn(track_publication.set_mute(*muted)),
1501                    old_muted,
1502                ))
1503            }
1504        }?;
1505
1506        if old_muted != should_mute {
1507            if should_mute {
1508                Audio::play_sound(Sound::Mute, cx);
1509            } else {
1510                Audio::play_sound(Sound::Unmute, cx);
1511            }
1512        }
1513
1514        Ok((result, old_muted))
1515    }
1516}
1517
1518enum LocalTrack {
1519    None,
1520    Pending {
1521        publish_id: usize,
1522        muted: bool,
1523    },
1524    Published {
1525        track_publication: LocalTrackPublication,
1526        muted: bool,
1527    },
1528}
1529
1530impl Default for LocalTrack {
1531    fn default() -> Self {
1532        Self::None
1533    }
1534}
1535
1536#[derive(Copy, Clone, PartialEq, Eq)]
1537pub enum RoomStatus {
1538    Online,
1539    Rejoining,
1540    Offline,
1541}
1542
1543impl RoomStatus {
1544    pub fn is_offline(&self) -> bool {
1545        matches!(self, RoomStatus::Offline)
1546    }
1547
1548    pub fn is_online(&self) -> bool {
1549        matches!(self, RoomStatus::Online)
1550    }
1551}