room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
  15use language::LanguageRegistry;
  16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
  17use postage::{sink::Sink, stream::Stream, watch};
  18use project::Project;
  19use settings::Settings as _;
  20use std::{future::Future, mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    RoomJoined {
  28        channel_id: Option<ChannelId>,
  29    },
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    RoomLeft {
  54        channel_id: Option<ChannelId>,
  55    },
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<ChannelId>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakEntity<Project>>,
  64    joined_projects: HashSet<WeakEntity<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Entity<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<ChannelId> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                livekit_client_macos::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<ChannelId>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Entity<UserStore>,
 111        cx: &mut Context<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = livekit_client_macos::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(async move |this, cx| {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == livekit_client_macos::ConnectionState::Disconnected {
 127                        this.update(cx, |this, cx| this.leave(cx).log_err()).ok();
 128                        break;
 129                    }
 130                }
 131            });
 132
 133            let _handle_updates = cx.spawn({
 134                let room = room.clone();
 135                async move |this, cx| {
 136                    let mut updates = room.updates();
 137                    while let Some(update) = updates.next().await {
 138                        let this = if let Some(this) = this.upgrade() {
 139                            this
 140                        } else {
 141                            break;
 142                        };
 143
 144                        this.update(cx, |this, cx| {
 145                            this.live_kit_room_updated(update, cx).log_err()
 146                        })
 147                        .ok();
 148                    }
 149                }
 150            });
 151
 152            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 153            cx.spawn(async move |this, cx| {
 154                connect.await?;
 155                this.update(cx, |this, cx| {
 156                    if this.can_use_microphone() {
 157                        if let Some(live_kit) = &this.live_kit {
 158                            if !live_kit.muted_by_user && !live_kit.deafened {
 159                                return this.share_microphone(cx);
 160                            }
 161                        }
 162                    }
 163                    Task::ready(Ok(()))
 164                })?
 165                .await
 166            })
 167            .detach_and_log_err(cx);
 168
 169            Some(LiveKitRoom {
 170                room,
 171                screen_track: LocalTrack::None,
 172                microphone_track: LocalTrack::None,
 173                next_publish_id: 0,
 174                muted_by_user: Self::mute_on_join(cx),
 175                deafened: false,
 176                speaking: false,
 177                _maintain_room,
 178                _handle_updates,
 179            })
 180        } else {
 181            None
 182        };
 183
 184        let maintain_connection = cx.spawn({
 185            let client = client.clone();
 186            async move |this, cx| {
 187                Self::maintain_connection(this, client.clone(), cx)
 188                    .log_err()
 189                    .await
 190            }
 191        });
 192
 193        Audio::play_sound(Sound::Joined, cx);
 194
 195        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 196
 197        Self {
 198            id,
 199            channel_id,
 200            live_kit: live_kit_room,
 201            status: RoomStatus::Online,
 202            shared_projects: Default::default(),
 203            joined_projects: Default::default(),
 204            participant_user_ids: Default::default(),
 205            local_participant: Default::default(),
 206            remote_participants: Default::default(),
 207            pending_participants: Default::default(),
 208            pending_call_count: 0,
 209            client_subscriptions: vec![
 210                client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
 211            ],
 212            _subscriptions: vec![
 213                cx.on_release(Self::released),
 214                cx.on_app_quit(Self::app_will_quit),
 215            ],
 216            leave_when_empty: false,
 217            pending_room_update: None,
 218            client,
 219            user_store,
 220            follows_by_leader_id_project_id: Default::default(),
 221            maintain_connection: Some(maintain_connection),
 222            room_update_completed_tx,
 223            room_update_completed_rx,
 224        }
 225    }
 226
 227    pub(crate) fn create(
 228        called_user_id: u64,
 229        initial_project: Option<Entity<Project>>,
 230        client: Arc<Client>,
 231        user_store: Entity<UserStore>,
 232        cx: &mut App,
 233    ) -> Task<Result<Entity<Self>>> {
 234        cx.spawn(async move |cx| {
 235            let response = client.request(proto::CreateRoom {}).await?;
 236            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 237            let room = cx.new(|cx| {
 238                let mut room = Self::new(
 239                    room_proto.id,
 240                    None,
 241                    response.live_kit_connection_info,
 242                    client,
 243                    user_store,
 244                    cx,
 245                );
 246                if let Some(participant) = room_proto.participants.first() {
 247                    room.local_participant.role = participant.role()
 248                }
 249                room
 250            })?;
 251
 252            let initial_project_id = if let Some(initial_project) = initial_project {
 253                let initial_project_id = room
 254                    .update(cx, |room, cx| {
 255                        room.share_project(initial_project.clone(), cx)
 256                    })?
 257                    .await?;
 258                Some(initial_project_id)
 259            } else {
 260                None
 261            };
 262
 263            let did_join = room
 264                .update(cx, |room, cx| {
 265                    room.leave_when_empty = true;
 266                    room.call(called_user_id, initial_project_id, cx)
 267                })?
 268                .await;
 269            match did_join {
 270                Ok(()) => Ok(room),
 271                Err(error) => Err(error.context("room creation failed")),
 272            }
 273        })
 274    }
 275
 276    pub(crate) async fn join_channel(
 277        channel_id: ChannelId,
 278        client: Arc<Client>,
 279        user_store: Entity<UserStore>,
 280        cx: &mut AsyncApp,
 281    ) -> Result<Entity<Self>> {
 282        Self::from_join_response(
 283            client
 284                .request(proto::JoinChannel {
 285                    channel_id: channel_id.0,
 286                })
 287                .await?,
 288            client,
 289            user_store,
 290            cx,
 291        )
 292    }
 293
 294    pub(crate) async fn join(
 295        room_id: u64,
 296        client: Arc<Client>,
 297        user_store: Entity<UserStore>,
 298        cx: &mut AsyncApp,
 299    ) -> Result<Entity<Self>> {
 300        Self::from_join_response(
 301            client.request(proto::JoinRoom { id: room_id }).await?,
 302            client,
 303            user_store,
 304            cx,
 305        )
 306    }
 307
 308    fn released(&mut self, cx: &mut App) {
 309        if self.status.is_online() {
 310            self.leave_internal(cx).detach_and_log_err(cx);
 311        }
 312    }
 313
 314    fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
 315        let task = if self.status.is_online() {
 316            let leave = self.leave_internal(cx);
 317            Some(cx.background_spawn(async move {
 318                leave.await.log_err();
 319            }))
 320        } else {
 321            None
 322        };
 323
 324        async move {
 325            if let Some(task) = task {
 326                task.await;
 327            }
 328        }
 329    }
 330
 331    pub fn mute_on_join(cx: &App) -> bool {
 332        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 333    }
 334
 335    fn from_join_response(
 336        response: proto::JoinRoomResponse,
 337        client: Arc<Client>,
 338        user_store: Entity<UserStore>,
 339        cx: &mut AsyncApp,
 340    ) -> Result<Entity<Self>> {
 341        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 342        let room = cx.new(|cx| {
 343            Self::new(
 344                room_proto.id,
 345                response.channel_id.map(ChannelId),
 346                response.live_kit_connection_info,
 347                client,
 348                user_store,
 349                cx,
 350            )
 351        })?;
 352        room.update(cx, |room, cx| {
 353            room.leave_when_empty = room.channel_id.is_none();
 354            room.apply_room_update(room_proto, cx)?;
 355            anyhow::Ok(())
 356        })??;
 357        Ok(room)
 358    }
 359
 360    fn should_leave(&self) -> bool {
 361        self.leave_when_empty
 362            && self.pending_room_update.is_none()
 363            && self.pending_participants.is_empty()
 364            && self.remote_participants.is_empty()
 365            && self.pending_call_count == 0
 366    }
 367
 368    pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 369        cx.notify();
 370        self.leave_internal(cx)
 371    }
 372
 373    fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
 374        if self.status.is_offline() {
 375            return Task::ready(Err(anyhow!("room is offline")));
 376        }
 377
 378        log::info!("leaving room");
 379        Audio::play_sound(Sound::Leave, cx);
 380
 381        self.clear_state(cx);
 382
 383        let leave_room = self.client.request(proto::LeaveRoom {});
 384        cx.background_spawn(async move {
 385            leave_room.await?;
 386            anyhow::Ok(())
 387        })
 388    }
 389
 390    pub(crate) fn clear_state(&mut self, cx: &mut App) {
 391        for project in self.shared_projects.drain() {
 392            if let Some(project) = project.upgrade() {
 393                project.update(cx, |project, cx| {
 394                    project.unshare(cx).log_err();
 395                });
 396            }
 397        }
 398        for project in self.joined_projects.drain() {
 399            if let Some(project) = project.upgrade() {
 400                project.update(cx, |project, cx| {
 401                    project.disconnected_from_host(cx);
 402                    project.close(cx);
 403                });
 404            }
 405        }
 406
 407        self.status = RoomStatus::Offline;
 408        self.remote_participants.clear();
 409        self.pending_participants.clear();
 410        self.participant_user_ids.clear();
 411        self.client_subscriptions.clear();
 412        self.live_kit.take();
 413        self.pending_room_update.take();
 414        self.maintain_connection.take();
 415    }
 416
 417    async fn maintain_connection(
 418        this: WeakEntity<Self>,
 419        client: Arc<Client>,
 420        cx: &mut AsyncApp,
 421    ) -> Result<()> {
 422        let mut client_status = client.status();
 423        loop {
 424            let _ = client_status.try_recv();
 425            let is_connected = client_status.borrow().is_connected();
 426            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 427            if !is_connected || client_status.next().await.is_some() {
 428                log::info!("detected client disconnection");
 429
 430                this.upgrade()
 431                    .ok_or_else(|| anyhow!("room was dropped"))?
 432                    .update(cx, |this, cx| {
 433                        this.status = RoomStatus::Rejoining;
 434                        cx.notify();
 435                    })?;
 436
 437                // Wait for client to re-establish a connection to the server.
 438                {
 439                    let mut reconnection_timeout =
 440                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 441                    let client_reconnection = async {
 442                        let mut remaining_attempts = 3;
 443                        while remaining_attempts > 0 {
 444                            if client_status.borrow().is_connected() {
 445                                log::info!("client reconnected, attempting to rejoin room");
 446
 447                                let Some(this) = this.upgrade() else { break };
 448                                match this.update(cx, |this, cx| this.rejoin(cx)) {
 449                                    Ok(task) => {
 450                                        if task.await.log_err().is_some() {
 451                                            return true;
 452                                        } else {
 453                                            remaining_attempts -= 1;
 454                                        }
 455                                    }
 456                                    Err(_app_dropped) => return false,
 457                                }
 458                            } else if client_status.borrow().is_signed_out() {
 459                                return false;
 460                            }
 461
 462                            log::info!(
 463                                "waiting for client status change, remaining attempts {}",
 464                                remaining_attempts
 465                            );
 466                            client_status.next().await;
 467                        }
 468                        false
 469                    }
 470                    .fuse();
 471                    futures::pin_mut!(client_reconnection);
 472
 473                    futures::select_biased! {
 474                        reconnected = client_reconnection => {
 475                            if reconnected {
 476                                log::info!("successfully reconnected to room");
 477                                // If we successfully joined the room, go back around the loop
 478                                // waiting for future connection status changes.
 479                                continue;
 480                            }
 481                        }
 482                        _ = reconnection_timeout => {
 483                            log::info!("room reconnection timeout expired");
 484                        }
 485                    }
 486                }
 487
 488                break;
 489            }
 490        }
 491
 492        // The client failed to re-establish a connection to the server
 493        // or an error occurred while trying to re-join the room. Either way
 494        // we leave the room and return an error.
 495        if let Some(this) = this.upgrade() {
 496            log::info!("reconnection failed, leaving room");
 497            this.update(cx, |this, cx| this.leave(cx))?.await?;
 498        }
 499        Err(anyhow!(
 500            "can't reconnect to room: client failed to re-establish connection"
 501        ))
 502    }
 503
 504    fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 505        let mut projects = HashMap::default();
 506        let mut reshared_projects = Vec::new();
 507        let mut rejoined_projects = Vec::new();
 508        self.shared_projects.retain(|project| {
 509            if let Some(handle) = project.upgrade() {
 510                let project = handle.read(cx);
 511                if let Some(project_id) = project.remote_id() {
 512                    projects.insert(project_id, handle.clone());
 513                    reshared_projects.push(proto::UpdateProject {
 514                        project_id,
 515                        worktrees: project.worktree_metadata_protos(cx),
 516                    });
 517                    return true;
 518                }
 519            }
 520            false
 521        });
 522        self.joined_projects.retain(|project| {
 523            if let Some(handle) = project.upgrade() {
 524                let project = handle.read(cx);
 525                if let Some(project_id) = project.remote_id() {
 526                    projects.insert(project_id, handle.clone());
 527                    rejoined_projects.push(proto::RejoinProject {
 528                        id: project_id,
 529                        worktrees: project
 530                            .worktrees(cx)
 531                            .map(|worktree| {
 532                                let worktree = worktree.read(cx);
 533                                proto::RejoinWorktree {
 534                                    id: worktree.id().to_proto(),
 535                                    scan_id: worktree.completed_scan_id() as u64,
 536                                }
 537                            })
 538                            .collect(),
 539                    });
 540                }
 541                return true;
 542            }
 543            false
 544        });
 545
 546        let response = self.client.request_envelope(proto::RejoinRoom {
 547            id: self.id,
 548            reshared_projects,
 549            rejoined_projects,
 550        });
 551
 552        cx.spawn(async move |this, cx| {
 553            let response = response.await?;
 554            let message_id = response.message_id;
 555            let response = response.payload;
 556            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 557            this.update(cx, |this, cx| {
 558                this.status = RoomStatus::Online;
 559                this.apply_room_update(room_proto, cx)?;
 560
 561                for reshared_project in response.reshared_projects {
 562                    if let Some(project) = projects.get(&reshared_project.id) {
 563                        project.update(cx, |project, cx| {
 564                            project.reshared(reshared_project, cx).log_err();
 565                        });
 566                    }
 567                }
 568
 569                for rejoined_project in response.rejoined_projects {
 570                    if let Some(project) = projects.get(&rejoined_project.id) {
 571                        project.update(cx, |project, cx| {
 572                            project.rejoined(rejoined_project, message_id, cx).log_err();
 573                        });
 574                    }
 575                }
 576
 577                anyhow::Ok(())
 578            })?
 579        })
 580    }
 581
 582    pub fn id(&self) -> u64 {
 583        self.id
 584    }
 585
 586    pub fn status(&self) -> RoomStatus {
 587        self.status
 588    }
 589
 590    pub fn local_participant(&self) -> &LocalParticipant {
 591        &self.local_participant
 592    }
 593
 594    pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
 595        self.user_store.read(cx).current_user()
 596    }
 597
 598    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 599        &self.remote_participants
 600    }
 601
 602    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 603        self.remote_participants
 604            .values()
 605            .find(|p| p.peer_id == peer_id)
 606    }
 607
 608    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 609        self.remote_participants
 610            .get(&user_id)
 611            .map(|participant| participant.role)
 612    }
 613
 614    pub fn contains_guests(&self) -> bool {
 615        self.local_participant.role == proto::ChannelRole::Guest
 616            || self
 617                .remote_participants
 618                .values()
 619                .any(|p| p.role == proto::ChannelRole::Guest)
 620    }
 621
 622    pub fn local_participant_is_admin(&self) -> bool {
 623        self.local_participant.role == proto::ChannelRole::Admin
 624    }
 625
 626    pub fn local_participant_is_guest(&self) -> bool {
 627        self.local_participant.role == proto::ChannelRole::Guest
 628    }
 629
 630    pub fn set_participant_role(
 631        &mut self,
 632        user_id: u64,
 633        role: proto::ChannelRole,
 634        cx: &Context<Self>,
 635    ) -> Task<Result<()>> {
 636        let client = self.client.clone();
 637        let room_id = self.id;
 638        let role = role.into();
 639        cx.spawn(async move |_, _| {
 640            client
 641                .request(proto::SetRoomParticipantRole {
 642                    room_id,
 643                    user_id,
 644                    role,
 645                })
 646                .await
 647                .map(|_| ())
 648        })
 649    }
 650
 651    pub fn pending_participants(&self) -> &[Arc<User>] {
 652        &self.pending_participants
 653    }
 654
 655    pub fn contains_participant(&self, user_id: u64) -> bool {
 656        self.participant_user_ids.contains(&user_id)
 657    }
 658
 659    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 660        self.follows_by_leader_id_project_id
 661            .get(&(leader_id, project_id))
 662            .map_or(&[], |v| v.as_slice())
 663    }
 664
 665    /// Returns the most 'active' projects, defined as most people in the project
 666    pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
 667        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 668        for participant in self.remote_participants.values() {
 669            match participant.location {
 670                ParticipantLocation::SharedProject { project_id } => {
 671                    project_hosts_and_guest_counts
 672                        .entry(project_id)
 673                        .or_default()
 674                        .1 += 1;
 675                }
 676                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 677            }
 678            for project in &participant.projects {
 679                project_hosts_and_guest_counts
 680                    .entry(project.id)
 681                    .or_default()
 682                    .0 = Some(participant.user.id);
 683            }
 684        }
 685
 686        if let Some(user) = self.user_store.read(cx).current_user() {
 687            for project in &self.local_participant.projects {
 688                project_hosts_and_guest_counts
 689                    .entry(project.id)
 690                    .or_default()
 691                    .0 = Some(user.id);
 692            }
 693        }
 694
 695        project_hosts_and_guest_counts
 696            .into_iter()
 697            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 698            .max_by_key(|(_, _, guest_count)| *guest_count)
 699            .map(|(id, host, _)| (id, host))
 700    }
 701
 702    async fn handle_room_updated(
 703        this: Entity<Self>,
 704        envelope: TypedEnvelope<proto::RoomUpdated>,
 705        mut cx: AsyncApp,
 706    ) -> Result<()> {
 707        let room = envelope
 708            .payload
 709            .room
 710            .ok_or_else(|| anyhow!("invalid room"))?;
 711        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 712    }
 713
 714    fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
 715        // Filter ourselves out from the room's participants.
 716        let local_participant_ix = room
 717            .participants
 718            .iter()
 719            .position(|participant| Some(participant.user_id) == self.client.user_id());
 720        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 721
 722        let pending_participant_user_ids = room
 723            .pending_participants
 724            .iter()
 725            .map(|p| p.user_id)
 726            .collect::<Vec<_>>();
 727
 728        let remote_participant_user_ids = room
 729            .participants
 730            .iter()
 731            .map(|p| p.user_id)
 732            .collect::<Vec<_>>();
 733
 734        let (remote_participants, pending_participants) =
 735            self.user_store.update(cx, move |user_store, cx| {
 736                (
 737                    user_store.get_users(remote_participant_user_ids, cx),
 738                    user_store.get_users(pending_participant_user_ids, cx),
 739                )
 740            });
 741
 742        self.pending_room_update = Some(cx.spawn(async move |this, cx| {
 743            let (remote_participants, pending_participants) =
 744                futures::join!(remote_participants, pending_participants);
 745
 746            this.update(cx, |this, cx| {
 747                this.participant_user_ids.clear();
 748
 749                if let Some(participant) = local_participant {
 750                    let role = participant.role();
 751                    this.local_participant.projects = participant.projects;
 752                    if this.local_participant.role != role {
 753                        this.local_participant.role = role;
 754
 755                        if role == proto::ChannelRole::Guest {
 756                            for project in mem::take(&mut this.shared_projects) {
 757                                if let Some(project) = project.upgrade() {
 758                                    this.unshare_project(project, cx).log_err();
 759                                }
 760                            }
 761                            this.local_participant.projects.clear();
 762                            if let Some(live_kit_room) = &mut this.live_kit {
 763                                live_kit_room.stop_publishing(cx);
 764                            }
 765                        }
 766
 767                        this.joined_projects.retain(|project| {
 768                            if let Some(project) = project.upgrade() {
 769                                project.update(cx, |project, cx| project.set_role(role, cx));
 770                                true
 771                            } else {
 772                                false
 773                            }
 774                        });
 775                    }
 776                } else {
 777                    this.local_participant.projects.clear();
 778                }
 779
 780                if let Some(participants) = remote_participants.log_err() {
 781                    for (participant, user) in room.participants.into_iter().zip(participants) {
 782                        let Some(peer_id) = participant.peer_id else {
 783                            continue;
 784                        };
 785                        let participant_index = ParticipantIndex(participant.participant_index);
 786                        this.participant_user_ids.insert(participant.user_id);
 787
 788                        let old_projects = this
 789                            .remote_participants
 790                            .get(&participant.user_id)
 791                            .into_iter()
 792                            .flat_map(|existing| &existing.projects)
 793                            .map(|project| project.id)
 794                            .collect::<HashSet<_>>();
 795                        let new_projects = participant
 796                            .projects
 797                            .iter()
 798                            .map(|project| project.id)
 799                            .collect::<HashSet<_>>();
 800
 801                        for project in &participant.projects {
 802                            if !old_projects.contains(&project.id) {
 803                                cx.emit(Event::RemoteProjectShared {
 804                                    owner: user.clone(),
 805                                    project_id: project.id,
 806                                    worktree_root_names: project.worktree_root_names.clone(),
 807                                });
 808                            }
 809                        }
 810
 811                        for unshared_project_id in old_projects.difference(&new_projects) {
 812                            this.joined_projects.retain(|project| {
 813                                if let Some(project) = project.upgrade() {
 814                                    project.update(cx, |project, cx| {
 815                                        if project.remote_id() == Some(*unshared_project_id) {
 816                                            project.disconnected_from_host(cx);
 817                                            false
 818                                        } else {
 819                                            true
 820                                        }
 821                                    })
 822                                } else {
 823                                    false
 824                                }
 825                            });
 826                            cx.emit(Event::RemoteProjectUnshared {
 827                                project_id: *unshared_project_id,
 828                            });
 829                        }
 830
 831                        let role = participant.role();
 832                        let location = ParticipantLocation::from_proto(participant.location)
 833                            .unwrap_or(ParticipantLocation::External);
 834                        if let Some(remote_participant) =
 835                            this.remote_participants.get_mut(&participant.user_id)
 836                        {
 837                            remote_participant.peer_id = peer_id;
 838                            remote_participant.projects = participant.projects;
 839                            remote_participant.participant_index = participant_index;
 840                            if location != remote_participant.location
 841                                || role != remote_participant.role
 842                            {
 843                                remote_participant.location = location;
 844                                remote_participant.role = role;
 845                                cx.emit(Event::ParticipantLocationChanged {
 846                                    participant_id: peer_id,
 847                                });
 848                            }
 849                        } else {
 850                            this.remote_participants.insert(
 851                                participant.user_id,
 852                                RemoteParticipant {
 853                                    user: user.clone(),
 854                                    participant_index,
 855                                    peer_id,
 856                                    projects: participant.projects,
 857                                    location,
 858                                    role,
 859                                    muted: true,
 860                                    speaking: false,
 861                                    video_tracks: Default::default(),
 862                                    audio_tracks: Default::default(),
 863                                },
 864                            );
 865
 866                            Audio::play_sound(Sound::Joined, cx);
 867
 868                            if let Some(live_kit) = this.live_kit.as_ref() {
 869                                let video_tracks =
 870                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 871                                let audio_tracks =
 872                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 873                                let publications = live_kit
 874                                    .room
 875                                    .remote_audio_track_publications(&user.id.to_string());
 876
 877                                for track in video_tracks {
 878                                    this.live_kit_room_updated(
 879                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
 880                                        cx,
 881                                    )
 882                                    .log_err();
 883                                }
 884
 885                                for (track, publication) in
 886                                    audio_tracks.iter().zip(publications.iter())
 887                                {
 888                                    this.live_kit_room_updated(
 889                                        RoomUpdate::SubscribedToRemoteAudioTrack(
 890                                            track.clone(),
 891                                            publication.clone(),
 892                                        ),
 893                                        cx,
 894                                    )
 895                                    .log_err();
 896                                }
 897                            }
 898                        }
 899                    }
 900
 901                    this.remote_participants.retain(|user_id, participant| {
 902                        if this.participant_user_ids.contains(user_id) {
 903                            true
 904                        } else {
 905                            for project in &participant.projects {
 906                                cx.emit(Event::RemoteProjectUnshared {
 907                                    project_id: project.id,
 908                                });
 909                            }
 910                            false
 911                        }
 912                    });
 913                }
 914
 915                if let Some(pending_participants) = pending_participants.log_err() {
 916                    this.pending_participants = pending_participants;
 917                    for participant in &this.pending_participants {
 918                        this.participant_user_ids.insert(participant.id);
 919                    }
 920                }
 921
 922                this.follows_by_leader_id_project_id.clear();
 923                for follower in room.followers {
 924                    let project_id = follower.project_id;
 925                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 926                        (Some(leader), Some(follower)) => (leader, follower),
 927
 928                        _ => {
 929                            log::error!("Follower message {follower:?} missing some state");
 930                            continue;
 931                        }
 932                    };
 933
 934                    let list = this
 935                        .follows_by_leader_id_project_id
 936                        .entry((leader, project_id))
 937                        .or_default();
 938                    if !list.contains(&follower) {
 939                        list.push(follower);
 940                    }
 941                }
 942
 943                this.pending_room_update.take();
 944                if this.should_leave() {
 945                    log::info!("room is empty, leaving");
 946                    this.leave(cx).detach();
 947                }
 948
 949                this.user_store.update(cx, |user_store, cx| {
 950                    let participant_indices_by_user_id = this
 951                        .remote_participants
 952                        .iter()
 953                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 954                        .collect();
 955                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 956                });
 957
 958                this.check_invariants();
 959                this.room_update_completed_tx.try_send(Some(())).ok();
 960                cx.notify();
 961            })
 962            .ok();
 963        }));
 964
 965        cx.notify();
 966        Ok(())
 967    }
 968
 969    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 970        let mut done_rx = self.room_update_completed_rx.clone();
 971        async move {
 972            while let Some(result) = done_rx.next().await {
 973                if result.is_some() {
 974                    break;
 975                }
 976            }
 977        }
 978    }
 979
 980    fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
 981        match update {
 982            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
 983                let user_id = track.publisher_id().parse()?;
 984                let track_id = track.sid().to_string();
 985                let participant = self
 986                    .remote_participants
 987                    .get_mut(&user_id)
 988                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 989                participant.video_tracks.insert(track_id.clone(), track);
 990                cx.emit(Event::RemoteVideoTracksChanged {
 991                    participant_id: participant.peer_id,
 992                });
 993            }
 994
 995            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
 996                publisher_id,
 997                track_id,
 998            } => {
 999                let user_id = publisher_id.parse()?;
1000                let participant = self
1001                    .remote_participants
1002                    .get_mut(&user_id)
1003                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1004                participant.video_tracks.remove(&track_id);
1005                cx.emit(Event::RemoteVideoTracksChanged {
1006                    participant_id: participant.peer_id,
1007                });
1008            }
1009
1010            RoomUpdate::ActiveSpeakersChanged { speakers } => {
1011                let mut speaker_ids = speakers
1012                    .into_iter()
1013                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
1014                    .collect::<Vec<u64>>();
1015                speaker_ids.sort_unstable();
1016                for (sid, participant) in &mut self.remote_participants {
1017                    participant.speaking = speaker_ids.binary_search(sid).is_ok();
1018                }
1019                if let Some(id) = self.client.user_id() {
1020                    if let Some(room) = &mut self.live_kit {
1021                        room.speaking = speaker_ids.binary_search(&id).is_ok();
1022                    }
1023                }
1024            }
1025
1026            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1027                let mut found = false;
1028                for participant in &mut self.remote_participants.values_mut() {
1029                    for track in participant.audio_tracks.values() {
1030                        if track.sid() == track_id {
1031                            found = true;
1032                            break;
1033                        }
1034                    }
1035                    if found {
1036                        participant.muted = muted;
1037                        break;
1038                    }
1039                }
1040            }
1041
1042            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1043                if let Some(live_kit) = &self.live_kit {
1044                    if live_kit.deafened {
1045                        track.stop();
1046                        cx.foreground_executor()
1047                            .spawn(publication.set_enabled(false))
1048                            .detach();
1049                    }
1050                }
1051
1052                let user_id = track.publisher_id().parse()?;
1053                let track_id = track.sid().to_string();
1054                let participant = self
1055                    .remote_participants
1056                    .get_mut(&user_id)
1057                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1058                participant.audio_tracks.insert(track_id.clone(), track);
1059                participant.muted = publication.is_muted();
1060
1061                cx.emit(Event::RemoteAudioTracksChanged {
1062                    participant_id: participant.peer_id,
1063                });
1064            }
1065
1066            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1067                publisher_id,
1068                track_id,
1069            } => {
1070                let user_id = publisher_id.parse()?;
1071                let participant = self
1072                    .remote_participants
1073                    .get_mut(&user_id)
1074                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1075                participant.audio_tracks.remove(&track_id);
1076                cx.emit(Event::RemoteAudioTracksChanged {
1077                    participant_id: participant.peer_id,
1078                });
1079            }
1080
1081            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1082                log::info!("unpublished audio track {}", publication.sid());
1083                if let Some(room) = &mut self.live_kit {
1084                    room.microphone_track = LocalTrack::None;
1085                }
1086            }
1087
1088            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1089                log::info!("unpublished video track {}", publication.sid());
1090                if let Some(room) = &mut self.live_kit {
1091                    room.screen_track = LocalTrack::None;
1092                }
1093            }
1094
1095            RoomUpdate::LocalAudioTrackPublished { publication } => {
1096                log::info!("published audio track {}", publication.sid());
1097            }
1098
1099            RoomUpdate::LocalVideoTrackPublished { publication } => {
1100                log::info!("published video track {}", publication.sid());
1101            }
1102        }
1103
1104        cx.notify();
1105        Ok(())
1106    }
1107
1108    fn check_invariants(&self) {
1109        #[cfg(any(test, feature = "test-support"))]
1110        {
1111            for participant in self.remote_participants.values() {
1112                assert!(self.participant_user_ids.contains(&participant.user.id));
1113                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1114            }
1115
1116            for participant in &self.pending_participants {
1117                assert!(self.participant_user_ids.contains(&participant.id));
1118                assert_ne!(participant.id, self.client.user_id().unwrap());
1119            }
1120
1121            assert_eq!(
1122                self.participant_user_ids.len(),
1123                self.remote_participants.len() + self.pending_participants.len()
1124            );
1125        }
1126    }
1127
1128    pub(crate) fn call(
1129        &mut self,
1130        called_user_id: u64,
1131        initial_project_id: Option<u64>,
1132        cx: &mut Context<Self>,
1133    ) -> Task<Result<()>> {
1134        if self.status.is_offline() {
1135            return Task::ready(Err(anyhow!("room is offline")));
1136        }
1137
1138        cx.notify();
1139        let client = self.client.clone();
1140        let room_id = self.id;
1141        self.pending_call_count += 1;
1142        cx.spawn(async move |this, cx| {
1143            let result = client
1144                .request(proto::Call {
1145                    room_id,
1146                    called_user_id,
1147                    initial_project_id,
1148                })
1149                .await;
1150            this.update(cx, |this, cx| {
1151                this.pending_call_count -= 1;
1152                if this.should_leave() {
1153                    this.leave(cx).detach_and_log_err(cx);
1154                }
1155            })?;
1156            result?;
1157            Ok(())
1158        })
1159    }
1160
1161    pub fn join_project(
1162        &mut self,
1163        id: u64,
1164        language_registry: Arc<LanguageRegistry>,
1165        fs: Arc<dyn Fs>,
1166        cx: &mut Context<Self>,
1167    ) -> Task<Result<Entity<Project>>> {
1168        let client = self.client.clone();
1169        let user_store = self.user_store.clone();
1170        cx.emit(Event::RemoteProjectJoined { project_id: id });
1171        cx.spawn(async move |this, cx| {
1172            let project =
1173                Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1174
1175            this.update(cx, |this, cx| {
1176                this.joined_projects.retain(|project| {
1177                    if let Some(project) = project.upgrade() {
1178                        !project.read(cx).is_disconnected(cx)
1179                    } else {
1180                        false
1181                    }
1182                });
1183                this.joined_projects.insert(project.downgrade());
1184            })?;
1185            Ok(project)
1186        })
1187    }
1188
1189    pub fn share_project(
1190        &mut self,
1191        project: Entity<Project>,
1192        cx: &mut Context<Self>,
1193    ) -> Task<Result<u64>> {
1194        if let Some(project_id) = project.read(cx).remote_id() {
1195            return Task::ready(Ok(project_id));
1196        }
1197
1198        let request = self.client.request(proto::ShareProject {
1199            room_id: self.id(),
1200            worktrees: project.read(cx).worktree_metadata_protos(cx),
1201            is_ssh_project: project.read(cx).is_via_ssh(),
1202        });
1203
1204        cx.spawn(async move |this, cx| {
1205            let response = request.await?;
1206
1207            project.update(cx, |project, cx| project.shared(response.project_id, cx))??;
1208
1209            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1210            this.update(cx, |this, cx| {
1211                this.shared_projects.insert(project.downgrade());
1212                let active_project = this.local_participant.active_project.as_ref();
1213                if active_project.map_or(false, |location| *location == project) {
1214                    this.set_location(Some(&project), cx)
1215                } else {
1216                    Task::ready(Ok(()))
1217                }
1218            })?
1219            .await?;
1220
1221            Ok(response.project_id)
1222        })
1223    }
1224
1225    pub(crate) fn unshare_project(
1226        &mut self,
1227        project: Entity<Project>,
1228        cx: &mut Context<Self>,
1229    ) -> Result<()> {
1230        let project_id = match project.read(cx).remote_id() {
1231            Some(project_id) => project_id,
1232            None => return Ok(()),
1233        };
1234
1235        self.client.send(proto::UnshareProject { project_id })?;
1236        project.update(cx, |this, cx| this.unshare(cx))?;
1237
1238        if self.local_participant.active_project == Some(project.downgrade()) {
1239            self.set_location(Some(&project), cx).detach_and_log_err(cx);
1240        }
1241        Ok(())
1242    }
1243
1244    pub(crate) fn set_location(
1245        &mut self,
1246        project: Option<&Entity<Project>>,
1247        cx: &mut Context<Self>,
1248    ) -> Task<Result<()>> {
1249        if self.status.is_offline() {
1250            return Task::ready(Err(anyhow!("room is offline")));
1251        }
1252
1253        let client = self.client.clone();
1254        let room_id = self.id;
1255        let location = if let Some(project) = project {
1256            self.local_participant.active_project = Some(project.downgrade());
1257            if let Some(project_id) = project.read(cx).remote_id() {
1258                proto::participant_location::Variant::SharedProject(
1259                    proto::participant_location::SharedProject { id: project_id },
1260                )
1261            } else {
1262                proto::participant_location::Variant::UnsharedProject(
1263                    proto::participant_location::UnsharedProject {},
1264                )
1265            }
1266        } else {
1267            self.local_participant.active_project = None;
1268            proto::participant_location::Variant::External(proto::participant_location::External {})
1269        };
1270
1271        cx.notify();
1272        cx.background_spawn(async move {
1273            client
1274                .request(proto::UpdateParticipantLocation {
1275                    room_id,
1276                    location: Some(proto::ParticipantLocation {
1277                        variant: Some(location),
1278                    }),
1279                })
1280                .await?;
1281            Ok(())
1282        })
1283    }
1284
1285    pub fn is_screen_sharing(&self) -> bool {
1286        self.live_kit.as_ref().map_or(false, |live_kit| {
1287            !matches!(live_kit.screen_track, LocalTrack::None)
1288        })
1289    }
1290
1291    pub fn is_sharing_mic(&self) -> bool {
1292        self.live_kit.as_ref().map_or(false, |live_kit| {
1293            !matches!(live_kit.microphone_track, LocalTrack::None)
1294        })
1295    }
1296
1297    pub fn is_muted(&self) -> bool {
1298        self.live_kit.as_ref().map_or(false, |live_kit| {
1299            matches!(live_kit.microphone_track, LocalTrack::None)
1300                || live_kit.muted_by_user
1301                || live_kit.deafened
1302        })
1303    }
1304
1305    pub fn muted_by_user(&self) -> bool {
1306        self.live_kit
1307            .as_ref()
1308            .map_or(false, |live_kit| live_kit.muted_by_user)
1309    }
1310
1311    pub fn is_speaking(&self) -> bool {
1312        self.live_kit
1313            .as_ref()
1314            .map_or(false, |live_kit| live_kit.speaking)
1315    }
1316
1317    pub fn is_deafened(&self) -> Option<bool> {
1318        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1319    }
1320
1321    pub fn can_use_microphone(&self) -> bool {
1322        use proto::ChannelRole::*;
1323        match self.local_participant.role {
1324            Admin | Member | Talker => true,
1325            Guest | Banned => false,
1326        }
1327    }
1328
1329    pub fn can_share_projects(&self) -> bool {
1330        use proto::ChannelRole::*;
1331        match self.local_participant.role {
1332            Admin | Member => true,
1333            Guest | Banned | Talker => false,
1334        }
1335    }
1336
1337    #[track_caller]
1338    pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1339        if self.status.is_offline() {
1340            return Task::ready(Err(anyhow!("room is offline")));
1341        }
1342
1343        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1344            let publish_id = post_inc(&mut live_kit.next_publish_id);
1345            live_kit.microphone_track = LocalTrack::Pending { publish_id };
1346            cx.notify();
1347            publish_id
1348        } else {
1349            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1350        };
1351
1352        cx.spawn(async move |this, cx| {
1353            let publish_track = async {
1354                let track = LocalAudioTrack::create();
1355                this.upgrade()
1356                    .ok_or_else(|| anyhow!("room was dropped"))?
1357                    .update(cx, |this, _| {
1358                        this.live_kit
1359                            .as_ref()
1360                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1361                    })?
1362                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1363                    .await
1364            };
1365            let publication = publish_track.await;
1366            this.upgrade()
1367                .ok_or_else(|| anyhow!("room was dropped"))?
1368                .update(cx, |this, cx| {
1369                    let live_kit = this
1370                        .live_kit
1371                        .as_mut()
1372                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1373
1374                    let canceled = if let LocalTrack::Pending {
1375                        publish_id: cur_publish_id,
1376                    } = &live_kit.microphone_track
1377                    {
1378                        *cur_publish_id != publish_id
1379                    } else {
1380                        true
1381                    };
1382
1383                    match publication {
1384                        Ok(publication) => {
1385                            if canceled {
1386                                live_kit.room.unpublish_track(publication);
1387                            } else {
1388                                if live_kit.muted_by_user || live_kit.deafened {
1389                                    cx.background_spawn(publication.set_mute(true)).detach();
1390                                }
1391                                live_kit.microphone_track = LocalTrack::Published {
1392                                    track_publication: publication,
1393                                };
1394                                cx.notify();
1395                            }
1396                            Ok(())
1397                        }
1398                        Err(error) => {
1399                            if canceled {
1400                                Ok(())
1401                            } else {
1402                                live_kit.microphone_track = LocalTrack::None;
1403                                cx.notify();
1404                                Err(error)
1405                            }
1406                        }
1407                    }
1408                })?
1409        })
1410    }
1411
1412    pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1413        if self.status.is_offline() {
1414            return Task::ready(Err(anyhow!("room is offline")));
1415        } else if self.is_screen_sharing() {
1416            return Task::ready(Err(anyhow!("screen was already shared")));
1417        }
1418
1419        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1420            let publish_id = post_inc(&mut live_kit.next_publish_id);
1421            live_kit.screen_track = LocalTrack::Pending { publish_id };
1422            cx.notify();
1423            (live_kit.room.display_sources(), publish_id)
1424        } else {
1425            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1426        };
1427
1428        cx.spawn(async move |this, cx| {
1429            let publish_track = async {
1430                let displays = displays.await?;
1431                let display = displays
1432                    .first()
1433                    .ok_or_else(|| anyhow!("no display found"))?;
1434                let track = LocalVideoTrack::screen_share_for_display(display);
1435                this.upgrade()
1436                    .ok_or_else(|| anyhow!("room was dropped"))?
1437                    .update(cx, |this, _| {
1438                        this.live_kit
1439                            .as_ref()
1440                            .map(|live_kit| live_kit.room.publish_video_track(track))
1441                    })?
1442                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1443                    .await
1444            };
1445
1446            let publication = publish_track.await;
1447            this.upgrade()
1448                .ok_or_else(|| anyhow!("room was dropped"))?
1449                .update(cx, |this, cx| {
1450                    let live_kit = this
1451                        .live_kit
1452                        .as_mut()
1453                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1454
1455                    let canceled = if let LocalTrack::Pending {
1456                        publish_id: cur_publish_id,
1457                    } = &live_kit.screen_track
1458                    {
1459                        *cur_publish_id != publish_id
1460                    } else {
1461                        true
1462                    };
1463
1464                    match publication {
1465                        Ok(publication) => {
1466                            if canceled {
1467                                live_kit.room.unpublish_track(publication);
1468                            } else {
1469                                live_kit.screen_track = LocalTrack::Published {
1470                                    track_publication: publication,
1471                                };
1472                                cx.notify();
1473                            }
1474
1475                            Audio::play_sound(Sound::StartScreenshare, cx);
1476
1477                            Ok(())
1478                        }
1479                        Err(error) => {
1480                            if canceled {
1481                                Ok(())
1482                            } else {
1483                                live_kit.screen_track = LocalTrack::None;
1484                                cx.notify();
1485                                Err(error)
1486                            }
1487                        }
1488                    }
1489                })?
1490        })
1491    }
1492
1493    pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1494        if let Some(live_kit) = self.live_kit.as_mut() {
1495            // When unmuting, undeafen if the user was deafened before.
1496            let was_deafened = live_kit.deafened;
1497            if live_kit.muted_by_user
1498                || live_kit.deafened
1499                || matches!(live_kit.microphone_track, LocalTrack::None)
1500            {
1501                live_kit.muted_by_user = false;
1502                live_kit.deafened = false;
1503            } else {
1504                live_kit.muted_by_user = true;
1505            }
1506            let muted = live_kit.muted_by_user;
1507            let should_undeafen = was_deafened && !live_kit.deafened;
1508
1509            if let Some(task) = self.set_mute(muted, cx) {
1510                task.detach_and_log_err(cx);
1511            }
1512
1513            if should_undeafen {
1514                if let Some(task) = self.set_deafened(false, cx) {
1515                    task.detach_and_log_err(cx);
1516                }
1517            }
1518        }
1519    }
1520
1521    pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1522        if let Some(live_kit) = self.live_kit.as_mut() {
1523            // When deafening, mute the microphone if it was not already muted.
1524            // When un-deafening, unmute the microphone, unless it was explicitly muted.
1525            let deafened = !live_kit.deafened;
1526            live_kit.deafened = deafened;
1527            let should_change_mute = !live_kit.muted_by_user;
1528
1529            if let Some(task) = self.set_deafened(deafened, cx) {
1530                task.detach_and_log_err(cx);
1531            }
1532
1533            if should_change_mute {
1534                if let Some(task) = self.set_mute(deafened, cx) {
1535                    task.detach_and_log_err(cx);
1536                }
1537            }
1538        }
1539    }
1540
1541    pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1542        if self.status.is_offline() {
1543            return Err(anyhow!("room is offline"));
1544        }
1545
1546        let live_kit = self
1547            .live_kit
1548            .as_mut()
1549            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1550        match mem::take(&mut live_kit.screen_track) {
1551            LocalTrack::None => Err(anyhow!("screen was not shared")),
1552            LocalTrack::Pending { .. } => {
1553                cx.notify();
1554                Ok(())
1555            }
1556            LocalTrack::Published {
1557                track_publication, ..
1558            } => {
1559                live_kit.room.unpublish_track(track_publication);
1560                cx.notify();
1561
1562                Audio::play_sound(Sound::StopScreenshare, cx);
1563                Ok(())
1564            }
1565        }
1566    }
1567
1568    fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1569        let live_kit = self.live_kit.as_mut()?;
1570        cx.notify();
1571
1572        let mut track_updates = Vec::new();
1573        for participant in self.remote_participants.values() {
1574            for publication in live_kit
1575                .room
1576                .remote_audio_track_publications(&participant.user.id.to_string())
1577            {
1578                track_updates.push(publication.set_enabled(!deafened));
1579            }
1580
1581            for track in participant.audio_tracks.values() {
1582                if deafened {
1583                    track.stop();
1584                } else {
1585                    track.start();
1586                }
1587            }
1588        }
1589
1590        Some(cx.foreground_executor().spawn(async move {
1591            for result in futures::future::join_all(track_updates).await {
1592                result?;
1593            }
1594            Ok(())
1595        }))
1596    }
1597
1598    fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1599        let live_kit = self.live_kit.as_mut()?;
1600        cx.notify();
1601
1602        if should_mute {
1603            Audio::play_sound(Sound::Mute, cx);
1604        } else {
1605            Audio::play_sound(Sound::Unmute, cx);
1606        }
1607
1608        match &mut live_kit.microphone_track {
1609            LocalTrack::None => {
1610                if should_mute {
1611                    None
1612                } else {
1613                    Some(self.share_microphone(cx))
1614                }
1615            }
1616            LocalTrack::Pending { .. } => None,
1617            LocalTrack::Published { track_publication } => Some(
1618                cx.foreground_executor()
1619                    .spawn(track_publication.set_mute(should_mute)),
1620            ),
1621        }
1622    }
1623
1624    #[cfg(any(test, feature = "test-support"))]
1625    pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1626        self.live_kit
1627            .as_ref()
1628            .unwrap()
1629            .room
1630            .set_display_sources(sources);
1631    }
1632}
1633
1634struct LiveKitRoom {
1635    room: Arc<livekit_client_macos::Room>,
1636    screen_track: LocalTrack,
1637    microphone_track: LocalTrack,
1638    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1639    muted_by_user: bool,
1640    deafened: bool,
1641    speaking: bool,
1642    next_publish_id: usize,
1643    _maintain_room: Task<()>,
1644    _handle_updates: Task<()>,
1645}
1646
1647impl LiveKitRoom {
1648    fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1649        if let LocalTrack::Published {
1650            track_publication, ..
1651        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1652        {
1653            self.room.unpublish_track(track_publication);
1654            cx.notify();
1655        }
1656
1657        if let LocalTrack::Published {
1658            track_publication, ..
1659        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1660        {
1661            self.room.unpublish_track(track_publication);
1662            cx.notify();
1663        }
1664    }
1665}
1666
1667enum LocalTrack {
1668    None,
1669    Pending {
1670        publish_id: usize,
1671    },
1672    Published {
1673        track_publication: LocalTrackPublication,
1674    },
1675}
1676
1677impl Default for LocalTrack {
1678    fn default() -> Self {
1679        Self::None
1680    }
1681}
1682
1683#[derive(Copy, Clone, PartialEq, Eq)]
1684pub enum RoomStatus {
1685    Online,
1686    Rejoining,
1687    Offline,
1688}
1689
1690impl RoomStatus {
1691    pub fn is_offline(&self) -> bool {
1692        matches!(self, RoomStatus::Offline)
1693    }
1694
1695    pub fn is_online(&self) -> bool {
1696        matches!(self, RoomStatus::Online)
1697    }
1698}