room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    Left,
  54}
  55
  56pub struct Room {
  57    id: u64,
  58    channel_id: Option<u64>,
  59    live_kit: Option<LiveKitRoom>,
  60    status: RoomStatus,
  61    shared_projects: HashSet<WeakModelHandle<Project>>,
  62    joined_projects: HashSet<WeakModelHandle<Project>>,
  63    local_participant: LocalParticipant,
  64    remote_participants: BTreeMap<u64, RemoteParticipant>,
  65    pending_participants: Vec<Arc<User>>,
  66    participant_user_ids: HashSet<u64>,
  67    pending_call_count: usize,
  68    leave_when_empty: bool,
  69    client: Arc<Client>,
  70    user_store: ModelHandle<UserStore>,
  71    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  72    subscriptions: Vec<client::Subscription>,
  73    pending_room_update: Option<Task<()>>,
  74    maintain_connection: Option<Task<Option<()>>>,
  75}
  76
  77impl Entity for Room {
  78    type Event = Event;
  79
  80    fn release(&mut self, cx: &mut AppContext) {
  81        if self.status.is_online() {
  82            self.leave_internal(cx).detach_and_log_err(cx);
  83        }
  84    }
  85
  86    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  87        if self.status.is_online() {
  88            let leave = self.leave_internal(cx);
  89            Some(
  90                cx.background()
  91                    .spawn(async move {
  92                        leave.await.log_err();
  93                    })
  94                    .boxed(),
  95            )
  96        } else {
  97            None
  98        }
  99    }
 100}
 101
 102impl Room {
 103    pub fn channel_id(&self) -> Option<u64> {
 104        self.channel_id
 105    }
 106
 107    pub fn is_sharing_project(&self) -> bool {
 108        !self.shared_projects.is_empty()
 109    }
 110
 111    #[cfg(any(test, feature = "test-support"))]
 112    pub fn is_connected(&self) -> bool {
 113        if let Some(live_kit) = self.live_kit.as_ref() {
 114            matches!(
 115                *live_kit.room.status().borrow(),
 116                live_kit_client::ConnectionState::Connected { .. }
 117            )
 118        } else {
 119            false
 120        }
 121    }
 122
 123    fn new(
 124        id: u64,
 125        channel_id: Option<u64>,
 126        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 127        client: Arc<Client>,
 128        user_store: ModelHandle<UserStore>,
 129        cx: &mut ModelContext<Self>,
 130    ) -> Self {
 131        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 132            let room = live_kit_client::Room::new();
 133            let mut status = room.status();
 134            // Consume the initial status of the room.
 135            let _ = status.try_recv();
 136            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 137                while let Some(status) = status.next().await {
 138                    let this = if let Some(this) = this.upgrade(&cx) {
 139                        this
 140                    } else {
 141                        break;
 142                    };
 143
 144                    if status == live_kit_client::ConnectionState::Disconnected {
 145                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 146                        break;
 147                    }
 148                }
 149            });
 150
 151            let mut track_video_changes = room.remote_video_track_updates();
 152            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 153                while let Some(track_change) = track_video_changes.next().await {
 154                    let this = if let Some(this) = this.upgrade(&cx) {
 155                        this
 156                    } else {
 157                        break;
 158                    };
 159
 160                    this.update(&mut cx, |this, cx| {
 161                        this.remote_video_track_updated(track_change, cx).log_err()
 162                    });
 163                }
 164            });
 165
 166            let mut track_audio_changes = room.remote_audio_track_updates();
 167            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 168                while let Some(track_change) = track_audio_changes.next().await {
 169                    let this = if let Some(this) = this.upgrade(&cx) {
 170                        this
 171                    } else {
 172                        break;
 173                    };
 174
 175                    this.update(&mut cx, |this, cx| {
 176                        this.remote_audio_track_updated(track_change, cx).log_err()
 177                    });
 178                }
 179            });
 180
 181            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 182            cx.spawn(|this, mut cx| async move {
 183                connect.await?;
 184
 185                if !cx.read(Self::mute_on_join) {
 186                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 187                        .await?;
 188                }
 189
 190                anyhow::Ok(())
 191            })
 192            .detach_and_log_err(cx);
 193
 194            Some(LiveKitRoom {
 195                room,
 196                screen_track: LocalTrack::None,
 197                microphone_track: LocalTrack::None,
 198                next_publish_id: 0,
 199                muted_by_user: false,
 200                deafened: false,
 201                speaking: false,
 202                _maintain_room,
 203                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 204            })
 205        } else {
 206            None
 207        };
 208
 209        let maintain_connection =
 210            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 211
 212        Audio::play_sound(Sound::Joined, cx);
 213
 214        Self {
 215            id,
 216            channel_id,
 217            live_kit: live_kit_room,
 218            status: RoomStatus::Online,
 219            shared_projects: Default::default(),
 220            joined_projects: Default::default(),
 221            participant_user_ids: Default::default(),
 222            local_participant: Default::default(),
 223            remote_participants: Default::default(),
 224            pending_participants: Default::default(),
 225            pending_call_count: 0,
 226            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 227            leave_when_empty: false,
 228            pending_room_update: None,
 229            client,
 230            user_store,
 231            follows_by_leader_id_project_id: Default::default(),
 232            maintain_connection: Some(maintain_connection),
 233        }
 234    }
 235
 236    pub(crate) fn create(
 237        called_user_id: u64,
 238        initial_project: Option<ModelHandle<Project>>,
 239        client: Arc<Client>,
 240        user_store: ModelHandle<UserStore>,
 241        cx: &mut AppContext,
 242    ) -> Task<Result<ModelHandle<Self>>> {
 243        cx.spawn(|mut cx| async move {
 244            let response = client.request(proto::CreateRoom {}).await?;
 245            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 246            let room = cx.add_model(|cx| {
 247                Self::new(
 248                    room_proto.id,
 249                    None,
 250                    response.live_kit_connection_info,
 251                    client,
 252                    user_store,
 253                    cx,
 254                )
 255            });
 256
 257            let initial_project_id = if let Some(initial_project) = initial_project {
 258                let initial_project_id = room
 259                    .update(&mut cx, |room, cx| {
 260                        room.share_project(initial_project.clone(), cx)
 261                    })
 262                    .await?;
 263                Some(initial_project_id)
 264            } else {
 265                None
 266            };
 267
 268            match room
 269                .update(&mut cx, |room, cx| {
 270                    room.leave_when_empty = true;
 271                    room.call(called_user_id, initial_project_id, cx)
 272                })
 273                .await
 274            {
 275                Ok(()) => Ok(room),
 276                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 277            }
 278        })
 279    }
 280
 281    pub(crate) fn join_channel(
 282        channel_id: u64,
 283        client: Arc<Client>,
 284        user_store: ModelHandle<UserStore>,
 285        cx: &mut AppContext,
 286    ) -> Task<Result<ModelHandle<Self>>> {
 287        cx.spawn(|cx| async move {
 288            Self::from_join_response(
 289                client.request(proto::JoinChannel { channel_id }).await?,
 290                client,
 291                user_store,
 292                cx,
 293            )
 294        })
 295    }
 296
 297    pub(crate) fn join(
 298        call: &IncomingCall,
 299        client: Arc<Client>,
 300        user_store: ModelHandle<UserStore>,
 301        cx: &mut AppContext,
 302    ) -> Task<Result<ModelHandle<Self>>> {
 303        let id = call.room_id;
 304        cx.spawn(|cx| async move {
 305            Self::from_join_response(
 306                client.request(proto::JoinRoom { id }).await?,
 307                client,
 308                user_store,
 309                cx,
 310            )
 311        })
 312    }
 313
 314    pub fn mute_on_join(cx: &AppContext) -> bool {
 315        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 316    }
 317
 318    fn from_join_response(
 319        response: proto::JoinRoomResponse,
 320        client: Arc<Client>,
 321        user_store: ModelHandle<UserStore>,
 322        mut cx: AsyncAppContext,
 323    ) -> Result<ModelHandle<Self>> {
 324        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 325        let room = cx.add_model(|cx| {
 326            Self::new(
 327                room_proto.id,
 328                response.channel_id,
 329                response.live_kit_connection_info,
 330                client,
 331                user_store,
 332                cx,
 333            )
 334        });
 335        room.update(&mut cx, |room, cx| {
 336            room.leave_when_empty = room.channel_id.is_none();
 337            room.apply_room_update(room_proto, cx)?;
 338            anyhow::Ok(())
 339        })?;
 340        Ok(room)
 341    }
 342
 343    fn should_leave(&self) -> bool {
 344        self.leave_when_empty
 345            && self.pending_room_update.is_none()
 346            && self.pending_participants.is_empty()
 347            && self.remote_participants.is_empty()
 348            && self.pending_call_count == 0
 349    }
 350
 351    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 352        cx.notify();
 353        cx.emit(Event::Left);
 354        self.leave_internal(cx)
 355    }
 356
 357    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 358        if self.status.is_offline() {
 359            return Task::ready(Err(anyhow!("room is offline")));
 360        }
 361
 362        log::info!("leaving room");
 363        Audio::play_sound(Sound::Leave, cx);
 364
 365        self.clear_state(cx);
 366
 367        let leave_room = self.client.request(proto::LeaveRoom {});
 368        cx.background().spawn(async move {
 369            leave_room.await?;
 370            anyhow::Ok(())
 371        })
 372    }
 373
 374    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 375        for project in self.shared_projects.drain() {
 376            if let Some(project) = project.upgrade(cx) {
 377                project.update(cx, |project, cx| {
 378                    project.unshare(cx).log_err();
 379                });
 380            }
 381        }
 382        for project in self.joined_projects.drain() {
 383            if let Some(project) = project.upgrade(cx) {
 384                project.update(cx, |project, cx| {
 385                    project.disconnected_from_host(cx);
 386                    project.close(cx);
 387                });
 388            }
 389        }
 390
 391        self.status = RoomStatus::Offline;
 392        self.remote_participants.clear();
 393        self.pending_participants.clear();
 394        self.participant_user_ids.clear();
 395        self.subscriptions.clear();
 396        self.live_kit.take();
 397        self.pending_room_update.take();
 398        self.maintain_connection.take();
 399    }
 400
 401    async fn maintain_connection(
 402        this: WeakModelHandle<Self>,
 403        client: Arc<Client>,
 404        mut cx: AsyncAppContext,
 405    ) -> Result<()> {
 406        let mut client_status = client.status();
 407        loop {
 408            let _ = client_status.try_recv();
 409            let is_connected = client_status.borrow().is_connected();
 410            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 411            if !is_connected || client_status.next().await.is_some() {
 412                log::info!("detected client disconnection");
 413
 414                this.upgrade(&cx)
 415                    .ok_or_else(|| anyhow!("room was dropped"))?
 416                    .update(&mut cx, |this, cx| {
 417                        this.status = RoomStatus::Rejoining;
 418                        cx.notify();
 419                    });
 420
 421                // Wait for client to re-establish a connection to the server.
 422                {
 423                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 424                    let client_reconnection = async {
 425                        let mut remaining_attempts = 3;
 426                        while remaining_attempts > 0 {
 427                            if client_status.borrow().is_connected() {
 428                                log::info!("client reconnected, attempting to rejoin room");
 429
 430                                let Some(this) = this.upgrade(&cx) else { break };
 431                                if this
 432                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 433                                    .await
 434                                    .log_err()
 435                                    .is_some()
 436                                {
 437                                    return true;
 438                                } else {
 439                                    remaining_attempts -= 1;
 440                                }
 441                            } else if client_status.borrow().is_signed_out() {
 442                                return false;
 443                            }
 444
 445                            log::info!(
 446                                "waiting for client status change, remaining attempts {}",
 447                                remaining_attempts
 448                            );
 449                            client_status.next().await;
 450                        }
 451                        false
 452                    }
 453                    .fuse();
 454                    futures::pin_mut!(client_reconnection);
 455
 456                    futures::select_biased! {
 457                        reconnected = client_reconnection => {
 458                            if reconnected {
 459                                log::info!("successfully reconnected to room");
 460                                // If we successfully joined the room, go back around the loop
 461                                // waiting for future connection status changes.
 462                                continue;
 463                            }
 464                        }
 465                        _ = reconnection_timeout => {
 466                            log::info!("room reconnection timeout expired");
 467                        }
 468                    }
 469                }
 470
 471                break;
 472            }
 473        }
 474
 475        // The client failed to re-establish a connection to the server
 476        // or an error occurred while trying to re-join the room. Either way
 477        // we leave the room and return an error.
 478        if let Some(this) = this.upgrade(&cx) {
 479            log::info!("reconnection failed, leaving room");
 480            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 481        }
 482        Err(anyhow!(
 483            "can't reconnect to room: client failed to re-establish connection"
 484        ))
 485    }
 486
 487    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 488        let mut projects = HashMap::default();
 489        let mut reshared_projects = Vec::new();
 490        let mut rejoined_projects = Vec::new();
 491        self.shared_projects.retain(|project| {
 492            if let Some(handle) = project.upgrade(cx) {
 493                let project = handle.read(cx);
 494                if let Some(project_id) = project.remote_id() {
 495                    projects.insert(project_id, handle.clone());
 496                    reshared_projects.push(proto::UpdateProject {
 497                        project_id,
 498                        worktrees: project.worktree_metadata_protos(cx),
 499                    });
 500                    return true;
 501                }
 502            }
 503            false
 504        });
 505        self.joined_projects.retain(|project| {
 506            if let Some(handle) = project.upgrade(cx) {
 507                let project = handle.read(cx);
 508                if let Some(project_id) = project.remote_id() {
 509                    projects.insert(project_id, handle.clone());
 510                    rejoined_projects.push(proto::RejoinProject {
 511                        id: project_id,
 512                        worktrees: project
 513                            .worktrees(cx)
 514                            .map(|worktree| {
 515                                let worktree = worktree.read(cx);
 516                                proto::RejoinWorktree {
 517                                    id: worktree.id().to_proto(),
 518                                    scan_id: worktree.completed_scan_id() as u64,
 519                                }
 520                            })
 521                            .collect(),
 522                    });
 523                }
 524                return true;
 525            }
 526            false
 527        });
 528
 529        let response = self.client.request_envelope(proto::RejoinRoom {
 530            id: self.id,
 531            reshared_projects,
 532            rejoined_projects,
 533        });
 534
 535        cx.spawn(|this, mut cx| async move {
 536            let response = response.await?;
 537            let message_id = response.message_id;
 538            let response = response.payload;
 539            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 540            this.update(&mut cx, |this, cx| {
 541                this.status = RoomStatus::Online;
 542                this.apply_room_update(room_proto, cx)?;
 543
 544                for reshared_project in response.reshared_projects {
 545                    if let Some(project) = projects.get(&reshared_project.id) {
 546                        project.update(cx, |project, cx| {
 547                            project.reshared(reshared_project, cx).log_err();
 548                        });
 549                    }
 550                }
 551
 552                for rejoined_project in response.rejoined_projects {
 553                    if let Some(project) = projects.get(&rejoined_project.id) {
 554                        project.update(cx, |project, cx| {
 555                            project.rejoined(rejoined_project, message_id, cx).log_err();
 556                        });
 557                    }
 558                }
 559
 560                anyhow::Ok(())
 561            })
 562        })
 563    }
 564
 565    pub fn id(&self) -> u64 {
 566        self.id
 567    }
 568
 569    pub fn status(&self) -> RoomStatus {
 570        self.status
 571    }
 572
 573    pub fn local_participant(&self) -> &LocalParticipant {
 574        &self.local_participant
 575    }
 576
 577    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 578        &self.remote_participants
 579    }
 580
 581    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 582        self.remote_participants
 583            .values()
 584            .find(|p| p.peer_id == peer_id)
 585    }
 586
 587    pub fn pending_participants(&self) -> &[Arc<User>] {
 588        &self.pending_participants
 589    }
 590
 591    pub fn contains_participant(&self, user_id: u64) -> bool {
 592        self.participant_user_ids.contains(&user_id)
 593    }
 594
 595    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 596        self.follows_by_leader_id_project_id
 597            .get(&(leader_id, project_id))
 598            .map_or(&[], |v| v.as_slice())
 599    }
 600
 601    /// Returns the most 'active' projects, defined as most people in the project
 602    pub fn most_active_project(&self) -> Option<(u64, u64)> {
 603        let mut projects = HashMap::default();
 604        let mut hosts = HashMap::default();
 605        for participant in self.remote_participants.values() {
 606            match participant.location {
 607                ParticipantLocation::SharedProject { project_id } => {
 608                    *projects.entry(project_id).or_insert(0) += 1;
 609                }
 610                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 611            }
 612            for project in &participant.projects {
 613                *projects.entry(project.id).or_insert(0) += 1;
 614                hosts.insert(project.id, participant.user.id);
 615            }
 616        }
 617
 618        let mut pairs: Vec<(u64, usize)> = projects.into_iter().collect();
 619        pairs.sort_by_key(|(_, count)| *count as i32);
 620
 621        pairs
 622            .first()
 623            .map(|(project_id, _)| (*project_id, hosts[&project_id]))
 624    }
 625
 626    async fn handle_room_updated(
 627        this: ModelHandle<Self>,
 628        envelope: TypedEnvelope<proto::RoomUpdated>,
 629        _: Arc<Client>,
 630        mut cx: AsyncAppContext,
 631    ) -> Result<()> {
 632        let room = envelope
 633            .payload
 634            .room
 635            .ok_or_else(|| anyhow!("invalid room"))?;
 636        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 637    }
 638
 639    fn apply_room_update(
 640        &mut self,
 641        mut room: proto::Room,
 642        cx: &mut ModelContext<Self>,
 643    ) -> Result<()> {
 644        // Filter ourselves out from the room's participants.
 645        let local_participant_ix = room
 646            .participants
 647            .iter()
 648            .position(|participant| Some(participant.user_id) == self.client.user_id());
 649        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 650
 651        let pending_participant_user_ids = room
 652            .pending_participants
 653            .iter()
 654            .map(|p| p.user_id)
 655            .collect::<Vec<_>>();
 656
 657        let remote_participant_user_ids = room
 658            .participants
 659            .iter()
 660            .map(|p| p.user_id)
 661            .collect::<Vec<_>>();
 662
 663        let (remote_participants, pending_participants) =
 664            self.user_store.update(cx, move |user_store, cx| {
 665                (
 666                    user_store.get_users(remote_participant_user_ids, cx),
 667                    user_store.get_users(pending_participant_user_ids, cx),
 668                )
 669            });
 670
 671        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 672            let (remote_participants, pending_participants) =
 673                futures::join!(remote_participants, pending_participants);
 674
 675            this.update(&mut cx, |this, cx| {
 676                this.participant_user_ids.clear();
 677
 678                if let Some(participant) = local_participant {
 679                    this.local_participant.projects = participant.projects;
 680                } else {
 681                    this.local_participant.projects.clear();
 682                }
 683
 684                if let Some(participants) = remote_participants.log_err() {
 685                    for (participant, user) in room.participants.into_iter().zip(participants) {
 686                        let Some(peer_id) = participant.peer_id else {
 687                            continue;
 688                        };
 689                        this.participant_user_ids.insert(participant.user_id);
 690
 691                        let old_projects = this
 692                            .remote_participants
 693                            .get(&participant.user_id)
 694                            .into_iter()
 695                            .flat_map(|existing| &existing.projects)
 696                            .map(|project| project.id)
 697                            .collect::<HashSet<_>>();
 698                        let new_projects = participant
 699                            .projects
 700                            .iter()
 701                            .map(|project| project.id)
 702                            .collect::<HashSet<_>>();
 703
 704                        for project in &participant.projects {
 705                            if !old_projects.contains(&project.id) {
 706                                cx.emit(Event::RemoteProjectShared {
 707                                    owner: user.clone(),
 708                                    project_id: project.id,
 709                                    worktree_root_names: project.worktree_root_names.clone(),
 710                                });
 711                            }
 712                        }
 713
 714                        for unshared_project_id in old_projects.difference(&new_projects) {
 715                            this.joined_projects.retain(|project| {
 716                                if let Some(project) = project.upgrade(cx) {
 717                                    project.update(cx, |project, cx| {
 718                                        if project.remote_id() == Some(*unshared_project_id) {
 719                                            project.disconnected_from_host(cx);
 720                                            false
 721                                        } else {
 722                                            true
 723                                        }
 724                                    })
 725                                } else {
 726                                    false
 727                                }
 728                            });
 729                            cx.emit(Event::RemoteProjectUnshared {
 730                                project_id: *unshared_project_id,
 731                            });
 732                        }
 733
 734                        let location = ParticipantLocation::from_proto(participant.location)
 735                            .unwrap_or(ParticipantLocation::External);
 736                        if let Some(remote_participant) =
 737                            this.remote_participants.get_mut(&participant.user_id)
 738                        {
 739                            remote_participant.projects = participant.projects;
 740                            remote_participant.peer_id = peer_id;
 741                            if location != remote_participant.location {
 742                                remote_participant.location = location;
 743                                cx.emit(Event::ParticipantLocationChanged {
 744                                    participant_id: peer_id,
 745                                });
 746                            }
 747                        } else {
 748                            this.remote_participants.insert(
 749                                participant.user_id,
 750                                RemoteParticipant {
 751                                    user: user.clone(),
 752                                    participant_index: ParticipantIndex(
 753                                        participant.participant_index,
 754                                    ),
 755                                    peer_id,
 756                                    projects: participant.projects,
 757                                    location,
 758                                    muted: true,
 759                                    speaking: false,
 760                                    video_tracks: Default::default(),
 761                                    audio_tracks: Default::default(),
 762                                },
 763                            );
 764
 765                            Audio::play_sound(Sound::Joined, cx);
 766
 767                            if let Some(live_kit) = this.live_kit.as_ref() {
 768                                let video_tracks =
 769                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 770                                let audio_tracks =
 771                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 772                                let publications = live_kit
 773                                    .room
 774                                    .remote_audio_track_publications(&user.id.to_string());
 775
 776                                for track in video_tracks {
 777                                    this.remote_video_track_updated(
 778                                        RemoteVideoTrackUpdate::Subscribed(track),
 779                                        cx,
 780                                    )
 781                                    .log_err();
 782                                }
 783
 784                                for (track, publication) in
 785                                    audio_tracks.iter().zip(publications.iter())
 786                                {
 787                                    this.remote_audio_track_updated(
 788                                        RemoteAudioTrackUpdate::Subscribed(
 789                                            track.clone(),
 790                                            publication.clone(),
 791                                        ),
 792                                        cx,
 793                                    )
 794                                    .log_err();
 795                                }
 796                            }
 797                        }
 798                    }
 799
 800                    this.remote_participants.retain(|user_id, participant| {
 801                        if this.participant_user_ids.contains(user_id) {
 802                            true
 803                        } else {
 804                            for project in &participant.projects {
 805                                cx.emit(Event::RemoteProjectUnshared {
 806                                    project_id: project.id,
 807                                });
 808                            }
 809                            false
 810                        }
 811                    });
 812                }
 813
 814                if let Some(pending_participants) = pending_participants.log_err() {
 815                    this.pending_participants = pending_participants;
 816                    for participant in &this.pending_participants {
 817                        this.participant_user_ids.insert(participant.id);
 818                    }
 819                }
 820
 821                this.follows_by_leader_id_project_id.clear();
 822                for follower in room.followers {
 823                    let project_id = follower.project_id;
 824                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 825                        (Some(leader), Some(follower)) => (leader, follower),
 826
 827                        _ => {
 828                            log::error!("Follower message {follower:?} missing some state");
 829                            continue;
 830                        }
 831                    };
 832
 833                    let list = this
 834                        .follows_by_leader_id_project_id
 835                        .entry((leader, project_id))
 836                        .or_insert(Vec::new());
 837                    if !list.contains(&follower) {
 838                        list.push(follower);
 839                    }
 840                }
 841
 842                this.pending_room_update.take();
 843                if this.should_leave() {
 844                    log::info!("room is empty, leaving");
 845                    let _ = this.leave(cx);
 846                }
 847
 848                this.user_store.update(cx, |user_store, cx| {
 849                    let participant_indices_by_user_id = this
 850                        .remote_participants
 851                        .iter()
 852                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 853                        .collect();
 854                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 855                });
 856
 857                this.check_invariants();
 858                cx.notify();
 859            });
 860        }));
 861
 862        cx.notify();
 863        Ok(())
 864    }
 865
 866    fn remote_video_track_updated(
 867        &mut self,
 868        change: RemoteVideoTrackUpdate,
 869        cx: &mut ModelContext<Self>,
 870    ) -> Result<()> {
 871        match change {
 872            RemoteVideoTrackUpdate::Subscribed(track) => {
 873                let user_id = track.publisher_id().parse()?;
 874                let track_id = track.sid().to_string();
 875                let participant = self
 876                    .remote_participants
 877                    .get_mut(&user_id)
 878                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 879                participant.video_tracks.insert(
 880                    track_id.clone(),
 881                    Arc::new(RemoteVideoTrack {
 882                        live_kit_track: track,
 883                    }),
 884                );
 885                cx.emit(Event::RemoteVideoTracksChanged {
 886                    participant_id: participant.peer_id,
 887                });
 888            }
 889            RemoteVideoTrackUpdate::Unsubscribed {
 890                publisher_id,
 891                track_id,
 892            } => {
 893                let user_id = publisher_id.parse()?;
 894                let participant = self
 895                    .remote_participants
 896                    .get_mut(&user_id)
 897                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 898                participant.video_tracks.remove(&track_id);
 899                cx.emit(Event::RemoteVideoTracksChanged {
 900                    participant_id: participant.peer_id,
 901                });
 902            }
 903        }
 904
 905        cx.notify();
 906        Ok(())
 907    }
 908
 909    fn remote_audio_track_updated(
 910        &mut self,
 911        change: RemoteAudioTrackUpdate,
 912        cx: &mut ModelContext<Self>,
 913    ) -> Result<()> {
 914        match change {
 915            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 916                let mut speaker_ids = speakers
 917                    .into_iter()
 918                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 919                    .collect::<Vec<u64>>();
 920                speaker_ids.sort_unstable();
 921                for (sid, participant) in &mut self.remote_participants {
 922                    if let Ok(_) = speaker_ids.binary_search(sid) {
 923                        participant.speaking = true;
 924                    } else {
 925                        participant.speaking = false;
 926                    }
 927                }
 928                if let Some(id) = self.client.user_id() {
 929                    if let Some(room) = &mut self.live_kit {
 930                        if let Ok(_) = speaker_ids.binary_search(&id) {
 931                            room.speaking = true;
 932                        } else {
 933                            room.speaking = false;
 934                        }
 935                    }
 936                }
 937                cx.notify();
 938            }
 939            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 940                let mut found = false;
 941                for participant in &mut self.remote_participants.values_mut() {
 942                    for track in participant.audio_tracks.values() {
 943                        if track.sid() == track_id {
 944                            found = true;
 945                            break;
 946                        }
 947                    }
 948                    if found {
 949                        participant.muted = muted;
 950                        break;
 951                    }
 952                }
 953
 954                cx.notify();
 955            }
 956            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 957                let user_id = track.publisher_id().parse()?;
 958                let track_id = track.sid().to_string();
 959                let participant = self
 960                    .remote_participants
 961                    .get_mut(&user_id)
 962                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 963
 964                participant.audio_tracks.insert(track_id.clone(), track);
 965                participant.muted = publication.is_muted();
 966
 967                cx.emit(Event::RemoteAudioTracksChanged {
 968                    participant_id: participant.peer_id,
 969                });
 970            }
 971            RemoteAudioTrackUpdate::Unsubscribed {
 972                publisher_id,
 973                track_id,
 974            } => {
 975                let user_id = publisher_id.parse()?;
 976                let participant = self
 977                    .remote_participants
 978                    .get_mut(&user_id)
 979                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 980                participant.audio_tracks.remove(&track_id);
 981                cx.emit(Event::RemoteAudioTracksChanged {
 982                    participant_id: participant.peer_id,
 983                });
 984            }
 985        }
 986
 987        cx.notify();
 988        Ok(())
 989    }
 990
 991    fn check_invariants(&self) {
 992        #[cfg(any(test, feature = "test-support"))]
 993        {
 994            for participant in self.remote_participants.values() {
 995                assert!(self.participant_user_ids.contains(&participant.user.id));
 996                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 997            }
 998
 999            for participant in &self.pending_participants {
1000                assert!(self.participant_user_ids.contains(&participant.id));
1001                assert_ne!(participant.id, self.client.user_id().unwrap());
1002            }
1003
1004            assert_eq!(
1005                self.participant_user_ids.len(),
1006                self.remote_participants.len() + self.pending_participants.len()
1007            );
1008        }
1009    }
1010
1011    pub(crate) fn call(
1012        &mut self,
1013        called_user_id: u64,
1014        initial_project_id: Option<u64>,
1015        cx: &mut ModelContext<Self>,
1016    ) -> Task<Result<()>> {
1017        if self.status.is_offline() {
1018            return Task::ready(Err(anyhow!("room is offline")));
1019        }
1020
1021        cx.notify();
1022        let client = self.client.clone();
1023        let room_id = self.id;
1024        self.pending_call_count += 1;
1025        cx.spawn(|this, mut cx| async move {
1026            let result = client
1027                .request(proto::Call {
1028                    room_id,
1029                    called_user_id,
1030                    initial_project_id,
1031                })
1032                .await;
1033            this.update(&mut cx, |this, cx| {
1034                this.pending_call_count -= 1;
1035                if this.should_leave() {
1036                    this.leave(cx).detach_and_log_err(cx);
1037                }
1038            });
1039            result?;
1040            Ok(())
1041        })
1042    }
1043
1044    pub fn join_project(
1045        &mut self,
1046        id: u64,
1047        language_registry: Arc<LanguageRegistry>,
1048        fs: Arc<dyn Fs>,
1049        cx: &mut ModelContext<Self>,
1050    ) -> Task<Result<ModelHandle<Project>>> {
1051        let client = self.client.clone();
1052        let user_store = self.user_store.clone();
1053        cx.emit(Event::RemoteProjectJoined { project_id: id });
1054        cx.spawn(|this, mut cx| async move {
1055            let project =
1056                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1057
1058            this.update(&mut cx, |this, cx| {
1059                this.joined_projects.retain(|project| {
1060                    if let Some(project) = project.upgrade(cx) {
1061                        !project.read(cx).is_read_only()
1062                    } else {
1063                        false
1064                    }
1065                });
1066                this.joined_projects.insert(project.downgrade());
1067            });
1068            Ok(project)
1069        })
1070    }
1071
1072    pub(crate) fn share_project(
1073        &mut self,
1074        project: ModelHandle<Project>,
1075        cx: &mut ModelContext<Self>,
1076    ) -> Task<Result<u64>> {
1077        if let Some(project_id) = project.read(cx).remote_id() {
1078            return Task::ready(Ok(project_id));
1079        }
1080
1081        let request = self.client.request(proto::ShareProject {
1082            room_id: self.id(),
1083            worktrees: project.read(cx).worktree_metadata_protos(cx),
1084        });
1085        cx.spawn(|this, mut cx| async move {
1086            let response = request.await?;
1087
1088            project.update(&mut cx, |project, cx| {
1089                project.shared(response.project_id, cx)
1090            })?;
1091
1092            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1093            this.update(&mut cx, |this, cx| {
1094                this.shared_projects.insert(project.downgrade());
1095                let active_project = this.local_participant.active_project.as_ref();
1096                if active_project.map_or(false, |location| *location == project) {
1097                    this.set_location(Some(&project), cx)
1098                } else {
1099                    Task::ready(Ok(()))
1100                }
1101            })
1102            .await?;
1103
1104            Ok(response.project_id)
1105        })
1106    }
1107
1108    pub(crate) fn unshare_project(
1109        &mut self,
1110        project: ModelHandle<Project>,
1111        cx: &mut ModelContext<Self>,
1112    ) -> Result<()> {
1113        let project_id = match project.read(cx).remote_id() {
1114            Some(project_id) => project_id,
1115            None => return Ok(()),
1116        };
1117
1118        self.client.send(proto::UnshareProject { project_id })?;
1119        project.update(cx, |this, cx| this.unshare(cx))
1120    }
1121
1122    pub(crate) fn set_location(
1123        &mut self,
1124        project: Option<&ModelHandle<Project>>,
1125        cx: &mut ModelContext<Self>,
1126    ) -> Task<Result<()>> {
1127        if self.status.is_offline() {
1128            return Task::ready(Err(anyhow!("room is offline")));
1129        }
1130
1131        let client = self.client.clone();
1132        let room_id = self.id;
1133        let location = if let Some(project) = project {
1134            self.local_participant.active_project = Some(project.downgrade());
1135            if let Some(project_id) = project.read(cx).remote_id() {
1136                proto::participant_location::Variant::SharedProject(
1137                    proto::participant_location::SharedProject { id: project_id },
1138                )
1139            } else {
1140                proto::participant_location::Variant::UnsharedProject(
1141                    proto::participant_location::UnsharedProject {},
1142                )
1143            }
1144        } else {
1145            self.local_participant.active_project = None;
1146            proto::participant_location::Variant::External(proto::participant_location::External {})
1147        };
1148
1149        cx.notify();
1150        cx.foreground().spawn(async move {
1151            client
1152                .request(proto::UpdateParticipantLocation {
1153                    room_id,
1154                    location: Some(proto::ParticipantLocation {
1155                        variant: Some(location),
1156                    }),
1157                })
1158                .await?;
1159            Ok(())
1160        })
1161    }
1162
1163    pub fn is_screen_sharing(&self) -> bool {
1164        self.live_kit.as_ref().map_or(false, |live_kit| {
1165            !matches!(live_kit.screen_track, LocalTrack::None)
1166        })
1167    }
1168
1169    pub fn is_sharing_mic(&self) -> bool {
1170        self.live_kit.as_ref().map_or(false, |live_kit| {
1171            !matches!(live_kit.microphone_track, LocalTrack::None)
1172        })
1173    }
1174
1175    pub fn is_muted(&self, cx: &AppContext) -> bool {
1176        self.live_kit
1177            .as_ref()
1178            .and_then(|live_kit| match &live_kit.microphone_track {
1179                LocalTrack::None => Some(Self::mute_on_join(cx)),
1180                LocalTrack::Pending { muted, .. } => Some(*muted),
1181                LocalTrack::Published { muted, .. } => Some(*muted),
1182            })
1183            .unwrap_or(false)
1184    }
1185
1186    pub fn is_speaking(&self) -> bool {
1187        self.live_kit
1188            .as_ref()
1189            .map_or(false, |live_kit| live_kit.speaking)
1190    }
1191
1192    pub fn is_deafened(&self) -> Option<bool> {
1193        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1194    }
1195
1196    #[track_caller]
1197    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1198        if self.status.is_offline() {
1199            return Task::ready(Err(anyhow!("room is offline")));
1200        } else if self.is_sharing_mic() {
1201            return Task::ready(Err(anyhow!("microphone was already shared")));
1202        }
1203
1204        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1205            let publish_id = post_inc(&mut live_kit.next_publish_id);
1206            live_kit.microphone_track = LocalTrack::Pending {
1207                publish_id,
1208                muted: false,
1209            };
1210            cx.notify();
1211            publish_id
1212        } else {
1213            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1214        };
1215
1216        cx.spawn_weak(|this, mut cx| async move {
1217            let publish_track = async {
1218                let track = LocalAudioTrack::create();
1219                this.upgrade(&cx)
1220                    .ok_or_else(|| anyhow!("room was dropped"))?
1221                    .read_with(&cx, |this, _| {
1222                        this.live_kit
1223                            .as_ref()
1224                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1225                    })
1226                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1227                    .await
1228            };
1229
1230            let publication = publish_track.await;
1231            this.upgrade(&cx)
1232                .ok_or_else(|| anyhow!("room was dropped"))?
1233                .update(&mut cx, |this, cx| {
1234                    let live_kit = this
1235                        .live_kit
1236                        .as_mut()
1237                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1238
1239                    let (canceled, muted) = if let LocalTrack::Pending {
1240                        publish_id: cur_publish_id,
1241                        muted,
1242                    } = &live_kit.microphone_track
1243                    {
1244                        (*cur_publish_id != publish_id, *muted)
1245                    } else {
1246                        (true, false)
1247                    };
1248
1249                    match publication {
1250                        Ok(publication) => {
1251                            if canceled {
1252                                live_kit.room.unpublish_track(publication);
1253                            } else {
1254                                if muted {
1255                                    cx.background().spawn(publication.set_mute(muted)).detach();
1256                                }
1257                                live_kit.microphone_track = LocalTrack::Published {
1258                                    track_publication: publication,
1259                                    muted,
1260                                };
1261                                cx.notify();
1262                            }
1263                            Ok(())
1264                        }
1265                        Err(error) => {
1266                            if canceled {
1267                                Ok(())
1268                            } else {
1269                                live_kit.microphone_track = LocalTrack::None;
1270                                cx.notify();
1271                                Err(error)
1272                            }
1273                        }
1274                    }
1275                })
1276        })
1277    }
1278
1279    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1280        if self.status.is_offline() {
1281            return Task::ready(Err(anyhow!("room is offline")));
1282        } else if self.is_screen_sharing() {
1283            return Task::ready(Err(anyhow!("screen was already shared")));
1284        }
1285
1286        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1287            let publish_id = post_inc(&mut live_kit.next_publish_id);
1288            live_kit.screen_track = LocalTrack::Pending {
1289                publish_id,
1290                muted: false,
1291            };
1292            cx.notify();
1293            (live_kit.room.display_sources(), publish_id)
1294        } else {
1295            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1296        };
1297
1298        cx.spawn_weak(|this, mut cx| async move {
1299            let publish_track = async {
1300                let displays = displays.await?;
1301                let display = displays
1302                    .first()
1303                    .ok_or_else(|| anyhow!("no display found"))?;
1304                let track = LocalVideoTrack::screen_share_for_display(&display);
1305                this.upgrade(&cx)
1306                    .ok_or_else(|| anyhow!("room was dropped"))?
1307                    .read_with(&cx, |this, _| {
1308                        this.live_kit
1309                            .as_ref()
1310                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1311                    })
1312                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1313                    .await
1314            };
1315
1316            let publication = publish_track.await;
1317            this.upgrade(&cx)
1318                .ok_or_else(|| anyhow!("room was dropped"))?
1319                .update(&mut cx, |this, cx| {
1320                    let live_kit = this
1321                        .live_kit
1322                        .as_mut()
1323                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1324
1325                    let (canceled, muted) = if let LocalTrack::Pending {
1326                        publish_id: cur_publish_id,
1327                        muted,
1328                    } = &live_kit.screen_track
1329                    {
1330                        (*cur_publish_id != publish_id, *muted)
1331                    } else {
1332                        (true, false)
1333                    };
1334
1335                    match publication {
1336                        Ok(publication) => {
1337                            if canceled {
1338                                live_kit.room.unpublish_track(publication);
1339                            } else {
1340                                if muted {
1341                                    cx.background().spawn(publication.set_mute(muted)).detach();
1342                                }
1343                                live_kit.screen_track = LocalTrack::Published {
1344                                    track_publication: publication,
1345                                    muted,
1346                                };
1347                                cx.notify();
1348                            }
1349
1350                            Audio::play_sound(Sound::StartScreenshare, cx);
1351
1352                            Ok(())
1353                        }
1354                        Err(error) => {
1355                            if canceled {
1356                                Ok(())
1357                            } else {
1358                                live_kit.screen_track = LocalTrack::None;
1359                                cx.notify();
1360                                Err(error)
1361                            }
1362                        }
1363                    }
1364                })
1365        })
1366    }
1367
1368    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1369        let should_mute = !self.is_muted(cx);
1370        if let Some(live_kit) = self.live_kit.as_mut() {
1371            if matches!(live_kit.microphone_track, LocalTrack::None) {
1372                return Ok(self.share_microphone(cx));
1373            }
1374
1375            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1376            live_kit.muted_by_user = should_mute;
1377
1378            if old_muted == true && live_kit.deafened == true {
1379                if let Some(task) = self.toggle_deafen(cx).ok() {
1380                    task.detach();
1381                }
1382            }
1383
1384            Ok(ret_task)
1385        } else {
1386            Err(anyhow!("LiveKit not started"))
1387        }
1388    }
1389
1390    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1391        if let Some(live_kit) = self.live_kit.as_mut() {
1392            (*live_kit).deafened = !live_kit.deafened;
1393
1394            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1395            // Context notification is sent within set_mute itself.
1396            let mut mute_task = None;
1397            // When deafening, mute user's mic as well.
1398            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1399            if live_kit.deafened || !live_kit.muted_by_user {
1400                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1401            };
1402            for participant in self.remote_participants.values() {
1403                for track in live_kit
1404                    .room
1405                    .remote_audio_track_publications(&participant.user.id.to_string())
1406                {
1407                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1408                }
1409            }
1410
1411            Ok(cx.foreground().spawn(async move {
1412                if let Some(mute_task) = mute_task {
1413                    mute_task.await?;
1414                }
1415                for task in tasks {
1416                    task.await?;
1417                }
1418                Ok(())
1419            }))
1420        } else {
1421            Err(anyhow!("LiveKit not started"))
1422        }
1423    }
1424
1425    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1426        if self.status.is_offline() {
1427            return Err(anyhow!("room is offline"));
1428        }
1429
1430        let live_kit = self
1431            .live_kit
1432            .as_mut()
1433            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1434        match mem::take(&mut live_kit.screen_track) {
1435            LocalTrack::None => Err(anyhow!("screen was not shared")),
1436            LocalTrack::Pending { .. } => {
1437                cx.notify();
1438                Ok(())
1439            }
1440            LocalTrack::Published {
1441                track_publication, ..
1442            } => {
1443                live_kit.room.unpublish_track(track_publication);
1444                cx.notify();
1445
1446                Audio::play_sound(Sound::StopScreenshare, cx);
1447                Ok(())
1448            }
1449        }
1450    }
1451
1452    #[cfg(any(test, feature = "test-support"))]
1453    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1454        self.live_kit
1455            .as_ref()
1456            .unwrap()
1457            .room
1458            .set_display_sources(sources);
1459    }
1460}
1461
1462struct LiveKitRoom {
1463    room: Arc<live_kit_client::Room>,
1464    screen_track: LocalTrack,
1465    microphone_track: LocalTrack,
1466    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1467    muted_by_user: bool,
1468    deafened: bool,
1469    speaking: bool,
1470    next_publish_id: usize,
1471    _maintain_room: Task<()>,
1472    _maintain_tracks: [Task<()>; 2],
1473}
1474
1475impl LiveKitRoom {
1476    fn set_mute(
1477        self: &mut LiveKitRoom,
1478        should_mute: bool,
1479        cx: &mut ModelContext<Room>,
1480    ) -> Result<(Task<Result<()>>, bool)> {
1481        if !should_mute {
1482            // clear user muting state.
1483            self.muted_by_user = false;
1484        }
1485
1486        let (result, old_muted) = match &mut self.microphone_track {
1487            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1488            LocalTrack::Pending { muted, .. } => {
1489                let old_muted = *muted;
1490                *muted = should_mute;
1491                cx.notify();
1492                Ok((Task::Ready(Some(Ok(()))), old_muted))
1493            }
1494            LocalTrack::Published {
1495                track_publication,
1496                muted,
1497            } => {
1498                let old_muted = *muted;
1499                *muted = should_mute;
1500                cx.notify();
1501                Ok((
1502                    cx.background().spawn(track_publication.set_mute(*muted)),
1503                    old_muted,
1504                ))
1505            }
1506        }?;
1507
1508        if old_muted != should_mute {
1509            if should_mute {
1510                Audio::play_sound(Sound::Mute, cx);
1511            } else {
1512                Audio::play_sound(Sound::Unmute, cx);
1513            }
1514        }
1515
1516        Ok((result, old_muted))
1517    }
1518}
1519
1520enum LocalTrack {
1521    None,
1522    Pending {
1523        publish_id: usize,
1524        muted: bool,
1525    },
1526    Published {
1527        track_publication: LocalTrackPublication,
1528        muted: bool,
1529    },
1530}
1531
1532impl Default for LocalTrack {
1533    fn default() -> Self {
1534        Self::None
1535    }
1536}
1537
1538#[derive(Copy, Clone, PartialEq, Eq)]
1539pub enum RoomStatus {
1540    Online,
1541    Rejoining,
1542    Offline,
1543}
1544
1545impl RoomStatus {
1546    pub fn is_offline(&self) -> bool {
1547        matches!(self, RoomStatus::Offline)
1548    }
1549
1550    pub fn is_online(&self) -> bool {
1551        matches!(self, RoomStatus::Online)
1552    }
1553}