room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
  15use language::LanguageRegistry;
  16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
  17use postage::{sink::Sink, stream::Stream, watch};
  18use project::Project;
  19use settings::Settings as _;
  20use std::{future::Future, mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    RoomJoined {
  28        channel_id: Option<ChannelId>,
  29    },
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    RoomLeft {
  54        channel_id: Option<ChannelId>,
  55    },
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<ChannelId>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakEntity<Project>>,
  64    joined_projects: HashSet<WeakEntity<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Entity<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<ChannelId> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                livekit_client_macos::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<ChannelId>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Entity<UserStore>,
 111        cx: &mut Context<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = livekit_client_macos::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(async move |this, cx| {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == livekit_client_macos::ConnectionState::Disconnected {
 127                        this.update(cx, |this, cx| this.leave(cx).log_err()).ok();
 128                        break;
 129                    }
 130                }
 131            });
 132
 133            let _handle_updates = cx.spawn({
 134                let room = room.clone();
 135                async move |this, cx| {
 136                    let mut updates = room.updates();
 137                    while let Some(update) = updates.next().await {
 138                        let this = if let Some(this) = this.upgrade() {
 139                            this
 140                        } else {
 141                            break;
 142                        };
 143
 144                        this.update(cx, |this, cx| {
 145                            this.live_kit_room_updated(update, cx).log_err()
 146                        })
 147                        .ok();
 148                    }
 149                }
 150            });
 151
 152            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 153            cx.spawn(async move |this, cx| {
 154                connect.await?;
 155                this.update(cx, |this, cx| {
 156                    if this.can_use_microphone() {
 157                        if let Some(live_kit) = &this.live_kit {
 158                            if !live_kit.muted_by_user && !live_kit.deafened {
 159                                return this.share_microphone(cx);
 160                            }
 161                        }
 162                    }
 163                    Task::ready(Ok(()))
 164                })?
 165                .await
 166            })
 167            .detach_and_log_err(cx);
 168
 169            Some(LiveKitRoom {
 170                room,
 171                screen_track: LocalTrack::None,
 172                microphone_track: LocalTrack::None,
 173                next_publish_id: 0,
 174                muted_by_user: Self::mute_on_join(cx),
 175                deafened: false,
 176                speaking: false,
 177                _maintain_room,
 178                _handle_updates,
 179            })
 180        } else {
 181            None
 182        };
 183
 184        let maintain_connection = cx.spawn({
 185            let client = client.clone();
 186            async move |this, cx| {
 187                Self::maintain_connection(this, client.clone(), cx)
 188                    .log_err()
 189                    .await
 190            }
 191        });
 192
 193        Audio::play_sound(Sound::Joined, cx);
 194
 195        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 196
 197        Self {
 198            id,
 199            channel_id,
 200            live_kit: live_kit_room,
 201            status: RoomStatus::Online,
 202            shared_projects: Default::default(),
 203            joined_projects: Default::default(),
 204            participant_user_ids: Default::default(),
 205            local_participant: Default::default(),
 206            remote_participants: Default::default(),
 207            pending_participants: Default::default(),
 208            pending_call_count: 0,
 209            client_subscriptions: vec![
 210                client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
 211            ],
 212            _subscriptions: vec![
 213                cx.on_release(Self::released),
 214                cx.on_app_quit(Self::app_will_quit),
 215            ],
 216            leave_when_empty: false,
 217            pending_room_update: None,
 218            client,
 219            user_store,
 220            follows_by_leader_id_project_id: Default::default(),
 221            maintain_connection: Some(maintain_connection),
 222            room_update_completed_tx,
 223            room_update_completed_rx,
 224        }
 225    }
 226
 227    pub(crate) fn create(
 228        called_user_id: u64,
 229        initial_project: Option<Entity<Project>>,
 230        client: Arc<Client>,
 231        user_store: Entity<UserStore>,
 232        cx: &mut App,
 233    ) -> Task<Result<Entity<Self>>> {
 234        cx.spawn(async move |cx| {
 235            let response = client.request(proto::CreateRoom {}).await?;
 236            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 237            let room = cx.new(|cx| {
 238                let mut room = Self::new(
 239                    room_proto.id,
 240                    None,
 241                    response.live_kit_connection_info,
 242                    client,
 243                    user_store,
 244                    cx,
 245                );
 246                if let Some(participant) = room_proto.participants.first() {
 247                    room.local_participant.role = participant.role()
 248                }
 249                room
 250            })?;
 251
 252            let initial_project_id = if let Some(initial_project) = initial_project {
 253                let initial_project_id = room
 254                    .update(cx, |room, cx| {
 255                        room.share_project(initial_project.clone(), cx)
 256                    })?
 257                    .await?;
 258                Some(initial_project_id)
 259            } else {
 260                None
 261            };
 262
 263            let did_join = room
 264                .update(cx, |room, cx| {
 265                    room.leave_when_empty = true;
 266                    room.call(called_user_id, initial_project_id, cx)
 267                })?
 268                .await;
 269            match did_join {
 270                Ok(()) => Ok(room),
 271                Err(error) => Err(error.context("room creation failed")),
 272            }
 273        })
 274    }
 275
 276    pub(crate) async fn join_channel(
 277        channel_id: ChannelId,
 278        client: Arc<Client>,
 279        user_store: Entity<UserStore>,
 280        cx: &mut AsyncApp,
 281    ) -> Result<Entity<Self>> {
 282        Self::from_join_response(
 283            client
 284                .request(proto::JoinChannel {
 285                    channel_id: channel_id.0,
 286                })
 287                .await?,
 288            client,
 289            user_store,
 290            cx,
 291        )
 292    }
 293
 294    pub(crate) async fn join(
 295        room_id: u64,
 296        client: Arc<Client>,
 297        user_store: Entity<UserStore>,
 298        cx: &mut AsyncApp,
 299    ) -> Result<Entity<Self>> {
 300        Self::from_join_response(
 301            client.request(proto::JoinRoom { id: room_id }).await?,
 302            client,
 303            user_store,
 304            cx,
 305        )
 306    }
 307
 308    fn released(&mut self, cx: &mut App) {
 309        if self.status.is_online() {
 310            self.leave_internal(cx).detach_and_log_err(cx);
 311        }
 312    }
 313
 314    fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
 315        let task = if self.status.is_online() {
 316            let leave = self.leave_internal(cx);
 317            Some(cx.background_spawn(async move {
 318                leave.await.log_err();
 319            }))
 320        } else {
 321            None
 322        };
 323
 324        async move {
 325            if let Some(task) = task {
 326                task.await;
 327            }
 328        }
 329    }
 330
 331    pub fn mute_on_join(cx: &App) -> bool {
 332        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 333    }
 334
 335    fn from_join_response(
 336        response: proto::JoinRoomResponse,
 337        client: Arc<Client>,
 338        user_store: Entity<UserStore>,
 339        cx: &mut AsyncApp,
 340    ) -> Result<Entity<Self>> {
 341        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 342        let room = cx.new(|cx| {
 343            Self::new(
 344                room_proto.id,
 345                response.channel_id.map(ChannelId),
 346                response.live_kit_connection_info,
 347                client,
 348                user_store,
 349                cx,
 350            )
 351        })?;
 352        room.update(cx, |room, cx| {
 353            room.leave_when_empty = room.channel_id.is_none();
 354            room.apply_room_update(room_proto, cx)?;
 355            anyhow::Ok(())
 356        })??;
 357        Ok(room)
 358    }
 359
 360    fn should_leave(&self) -> bool {
 361        self.leave_when_empty
 362            && self.pending_room_update.is_none()
 363            && self.pending_participants.is_empty()
 364            && self.remote_participants.is_empty()
 365            && self.pending_call_count == 0
 366    }
 367
 368    pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 369        cx.notify();
 370        self.leave_internal(cx)
 371    }
 372
 373    fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
 374        if self.status.is_offline() {
 375            return Task::ready(Err(anyhow!("room is offline")));
 376        }
 377
 378        log::info!("leaving room");
 379        Audio::play_sound(Sound::Leave, cx);
 380
 381        self.clear_state(cx);
 382
 383        let leave_room = self.client.request(proto::LeaveRoom {});
 384        cx.background_spawn(async move {
 385            leave_room.await?;
 386            anyhow::Ok(())
 387        })
 388    }
 389
 390    pub(crate) fn clear_state(&mut self, cx: &mut App) {
 391        for project in self.shared_projects.drain() {
 392            if let Some(project) = project.upgrade() {
 393                project.update(cx, |project, cx| {
 394                    project.unshare(cx).log_err();
 395                });
 396            }
 397        }
 398        for project in self.joined_projects.drain() {
 399            if let Some(project) = project.upgrade() {
 400                project.update(cx, |project, cx| {
 401                    project.disconnected_from_host(cx);
 402                    project.close(cx);
 403                });
 404            }
 405        }
 406
 407        self.status = RoomStatus::Offline;
 408        self.remote_participants.clear();
 409        self.pending_participants.clear();
 410        self.participant_user_ids.clear();
 411        self.client_subscriptions.clear();
 412        self.live_kit.take();
 413        self.pending_room_update.take();
 414        self.maintain_connection.take();
 415    }
 416
 417    async fn maintain_connection(
 418        this: WeakEntity<Self>,
 419        client: Arc<Client>,
 420        cx: &mut AsyncApp,
 421    ) -> Result<()> {
 422        let mut client_status = client.status();
 423        loop {
 424            let _ = client_status.try_recv();
 425            let is_connected = client_status.borrow().is_connected();
 426            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 427            if !is_connected || client_status.next().await.is_some() {
 428                log::info!("detected client disconnection");
 429
 430                this.upgrade()
 431                    .ok_or_else(|| anyhow!("room was dropped"))?
 432                    .update(cx, |this, cx| {
 433                        this.status = RoomStatus::Rejoining;
 434                        cx.notify();
 435                    })?;
 436
 437                // Wait for client to re-establish a connection to the server.
 438                {
 439                    let mut reconnection_timeout =
 440                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 441                    let client_reconnection = async {
 442                        let mut remaining_attempts = 3;
 443                        while remaining_attempts > 0 {
 444                            if client_status.borrow().is_connected() {
 445                                log::info!("client reconnected, attempting to rejoin room");
 446
 447                                let Some(this) = this.upgrade() else { break };
 448                                match this.update(cx, |this, cx| this.rejoin(cx)) {
 449                                    Ok(task) => {
 450                                        if task.await.log_err().is_some() {
 451                                            return true;
 452                                        } else {
 453                                            remaining_attempts -= 1;
 454                                        }
 455                                    }
 456                                    Err(_app_dropped) => return false,
 457                                }
 458                            } else if client_status.borrow().is_signed_out() {
 459                                return false;
 460                            }
 461
 462                            log::info!(
 463                                "waiting for client status change, remaining attempts {}",
 464                                remaining_attempts
 465                            );
 466                            client_status.next().await;
 467                        }
 468                        false
 469                    }
 470                    .fuse();
 471                    futures::pin_mut!(client_reconnection);
 472
 473                    futures::select_biased! {
 474                        reconnected = client_reconnection => {
 475                            if reconnected {
 476                                log::info!("successfully reconnected to room");
 477                                // If we successfully joined the room, go back around the loop
 478                                // waiting for future connection status changes.
 479                                continue;
 480                            }
 481                        }
 482                        _ = reconnection_timeout => {
 483                            log::info!("room reconnection timeout expired");
 484                        }
 485                    }
 486                }
 487
 488                break;
 489            }
 490        }
 491
 492        // The client failed to re-establish a connection to the server
 493        // or an error occurred while trying to re-join the room. Either way
 494        // we leave the room and return an error.
 495        if let Some(this) = this.upgrade() {
 496            log::info!("reconnection failed, leaving room");
 497            this.update(cx, |this, cx| this.leave(cx))?.await?;
 498        }
 499        Err(anyhow!(
 500            "can't reconnect to room: client failed to re-establish connection"
 501        ))
 502    }
 503
 504    fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 505        let mut projects = HashMap::default();
 506        let mut reshared_projects = Vec::new();
 507        let mut rejoined_projects = Vec::new();
 508        self.shared_projects.retain(|project| {
 509            if let Some(handle) = project.upgrade() {
 510                let project = handle.read(cx);
 511                if let Some(project_id) = project.remote_id() {
 512                    projects.insert(project_id, handle.clone());
 513                    reshared_projects.push(proto::UpdateProject {
 514                        project_id,
 515                        worktrees: project.worktree_metadata_protos(cx),
 516                    });
 517                    return true;
 518                }
 519            }
 520            false
 521        });
 522        self.joined_projects.retain(|project| {
 523            if let Some(handle) = project.upgrade() {
 524                let project = handle.read(cx);
 525                if let Some(project_id) = project.remote_id() {
 526                    projects.insert(project_id, handle.clone());
 527                    let mut worktrees = Vec::new();
 528                    let mut repositories = Vec::new();
 529                    for worktree in project.worktrees(cx) {
 530                        let worktree = worktree.read(cx);
 531                        worktrees.push(proto::RejoinWorktree {
 532                            id: worktree.id().to_proto(),
 533                            scan_id: worktree.completed_scan_id() as u64,
 534                        });
 535                    }
 536                    for (entry_id, repository) in project.repositories(cx) {
 537                        let repository = repository.read(cx);
 538                        repositories.push(proto::RejoinRepository {
 539                            id: entry_id.to_proto(),
 540                            scan_id: repository.completed_scan_id as u64,
 541                        });
 542                    }
 543
 544                    rejoined_projects.push(proto::RejoinProject {
 545                        id: project_id,
 546                        worktrees,
 547                        repositories,
 548                    });
 549                }
 550                return true;
 551            }
 552            false
 553        });
 554
 555        let response = self.client.request_envelope(proto::RejoinRoom {
 556            id: self.id,
 557            reshared_projects,
 558            rejoined_projects,
 559        });
 560
 561        cx.spawn(async move |this, cx| {
 562            let response = response.await?;
 563            let message_id = response.message_id;
 564            let response = response.payload;
 565            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 566            this.update(cx, |this, cx| {
 567                this.status = RoomStatus::Online;
 568                this.apply_room_update(room_proto, cx)?;
 569
 570                for reshared_project in response.reshared_projects {
 571                    if let Some(project) = projects.get(&reshared_project.id) {
 572                        project.update(cx, |project, cx| {
 573                            project.reshared(reshared_project, cx).log_err();
 574                        });
 575                    }
 576                }
 577
 578                for rejoined_project in response.rejoined_projects {
 579                    if let Some(project) = projects.get(&rejoined_project.id) {
 580                        project.update(cx, |project, cx| {
 581                            project.rejoined(rejoined_project, message_id, cx).log_err();
 582                        });
 583                    }
 584                }
 585
 586                anyhow::Ok(())
 587            })?
 588        })
 589    }
 590
 591    pub fn id(&self) -> u64 {
 592        self.id
 593    }
 594
 595    pub fn status(&self) -> RoomStatus {
 596        self.status
 597    }
 598
 599    pub fn local_participant(&self) -> &LocalParticipant {
 600        &self.local_participant
 601    }
 602
 603    pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
 604        self.user_store.read(cx).current_user()
 605    }
 606
 607    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 608        &self.remote_participants
 609    }
 610
 611    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 612        self.remote_participants
 613            .values()
 614            .find(|p| p.peer_id == peer_id)
 615    }
 616
 617    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 618        self.remote_participants
 619            .get(&user_id)
 620            .map(|participant| participant.role)
 621    }
 622
 623    pub fn contains_guests(&self) -> bool {
 624        self.local_participant.role == proto::ChannelRole::Guest
 625            || self
 626                .remote_participants
 627                .values()
 628                .any(|p| p.role == proto::ChannelRole::Guest)
 629    }
 630
 631    pub fn local_participant_is_admin(&self) -> bool {
 632        self.local_participant.role == proto::ChannelRole::Admin
 633    }
 634
 635    pub fn local_participant_is_guest(&self) -> bool {
 636        self.local_participant.role == proto::ChannelRole::Guest
 637    }
 638
 639    pub fn set_participant_role(
 640        &mut self,
 641        user_id: u64,
 642        role: proto::ChannelRole,
 643        cx: &Context<Self>,
 644    ) -> Task<Result<()>> {
 645        let client = self.client.clone();
 646        let room_id = self.id;
 647        let role = role.into();
 648        cx.spawn(async move |_, _| {
 649            client
 650                .request(proto::SetRoomParticipantRole {
 651                    room_id,
 652                    user_id,
 653                    role,
 654                })
 655                .await
 656                .map(|_| ())
 657        })
 658    }
 659
 660    pub fn pending_participants(&self) -> &[Arc<User>] {
 661        &self.pending_participants
 662    }
 663
 664    pub fn contains_participant(&self, user_id: u64) -> bool {
 665        self.participant_user_ids.contains(&user_id)
 666    }
 667
 668    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 669        self.follows_by_leader_id_project_id
 670            .get(&(leader_id, project_id))
 671            .map_or(&[], |v| v.as_slice())
 672    }
 673
 674    /// Returns the most 'active' projects, defined as most people in the project
 675    pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
 676        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 677        for participant in self.remote_participants.values() {
 678            match participant.location {
 679                ParticipantLocation::SharedProject { project_id } => {
 680                    project_hosts_and_guest_counts
 681                        .entry(project_id)
 682                        .or_default()
 683                        .1 += 1;
 684                }
 685                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 686            }
 687            for project in &participant.projects {
 688                project_hosts_and_guest_counts
 689                    .entry(project.id)
 690                    .or_default()
 691                    .0 = Some(participant.user.id);
 692            }
 693        }
 694
 695        if let Some(user) = self.user_store.read(cx).current_user() {
 696            for project in &self.local_participant.projects {
 697                project_hosts_and_guest_counts
 698                    .entry(project.id)
 699                    .or_default()
 700                    .0 = Some(user.id);
 701            }
 702        }
 703
 704        project_hosts_and_guest_counts
 705            .into_iter()
 706            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 707            .max_by_key(|(_, _, guest_count)| *guest_count)
 708            .map(|(id, host, _)| (id, host))
 709    }
 710
 711    async fn handle_room_updated(
 712        this: Entity<Self>,
 713        envelope: TypedEnvelope<proto::RoomUpdated>,
 714        mut cx: AsyncApp,
 715    ) -> Result<()> {
 716        let room = envelope
 717            .payload
 718            .room
 719            .ok_or_else(|| anyhow!("invalid room"))?;
 720        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 721    }
 722
 723    fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
 724        // Filter ourselves out from the room's participants.
 725        let local_participant_ix = room
 726            .participants
 727            .iter()
 728            .position(|participant| Some(participant.user_id) == self.client.user_id());
 729        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 730
 731        let pending_participant_user_ids = room
 732            .pending_participants
 733            .iter()
 734            .map(|p| p.user_id)
 735            .collect::<Vec<_>>();
 736
 737        let remote_participant_user_ids = room
 738            .participants
 739            .iter()
 740            .map(|p| p.user_id)
 741            .collect::<Vec<_>>();
 742
 743        let (remote_participants, pending_participants) =
 744            self.user_store.update(cx, move |user_store, cx| {
 745                (
 746                    user_store.get_users(remote_participant_user_ids, cx),
 747                    user_store.get_users(pending_participant_user_ids, cx),
 748                )
 749            });
 750
 751        self.pending_room_update = Some(cx.spawn(async move |this, cx| {
 752            let (remote_participants, pending_participants) =
 753                futures::join!(remote_participants, pending_participants);
 754
 755            this.update(cx, |this, cx| {
 756                this.participant_user_ids.clear();
 757
 758                if let Some(participant) = local_participant {
 759                    let role = participant.role();
 760                    this.local_participant.projects = participant.projects;
 761                    if this.local_participant.role != role {
 762                        this.local_participant.role = role;
 763
 764                        if role == proto::ChannelRole::Guest {
 765                            for project in mem::take(&mut this.shared_projects) {
 766                                if let Some(project) = project.upgrade() {
 767                                    this.unshare_project(project, cx).log_err();
 768                                }
 769                            }
 770                            this.local_participant.projects.clear();
 771                            if let Some(live_kit_room) = &mut this.live_kit {
 772                                live_kit_room.stop_publishing(cx);
 773                            }
 774                        }
 775
 776                        this.joined_projects.retain(|project| {
 777                            if let Some(project) = project.upgrade() {
 778                                project.update(cx, |project, cx| project.set_role(role, cx));
 779                                true
 780                            } else {
 781                                false
 782                            }
 783                        });
 784                    }
 785                } else {
 786                    this.local_participant.projects.clear();
 787                }
 788
 789                if let Some(participants) = remote_participants.log_err() {
 790                    for (participant, user) in room.participants.into_iter().zip(participants) {
 791                        let Some(peer_id) = participant.peer_id else {
 792                            continue;
 793                        };
 794                        let participant_index = ParticipantIndex(participant.participant_index);
 795                        this.participant_user_ids.insert(participant.user_id);
 796
 797                        let old_projects = this
 798                            .remote_participants
 799                            .get(&participant.user_id)
 800                            .into_iter()
 801                            .flat_map(|existing| &existing.projects)
 802                            .map(|project| project.id)
 803                            .collect::<HashSet<_>>();
 804                        let new_projects = participant
 805                            .projects
 806                            .iter()
 807                            .map(|project| project.id)
 808                            .collect::<HashSet<_>>();
 809
 810                        for project in &participant.projects {
 811                            if !old_projects.contains(&project.id) {
 812                                cx.emit(Event::RemoteProjectShared {
 813                                    owner: user.clone(),
 814                                    project_id: project.id,
 815                                    worktree_root_names: project.worktree_root_names.clone(),
 816                                });
 817                            }
 818                        }
 819
 820                        for unshared_project_id in old_projects.difference(&new_projects) {
 821                            this.joined_projects.retain(|project| {
 822                                if let Some(project) = project.upgrade() {
 823                                    project.update(cx, |project, cx| {
 824                                        if project.remote_id() == Some(*unshared_project_id) {
 825                                            project.disconnected_from_host(cx);
 826                                            false
 827                                        } else {
 828                                            true
 829                                        }
 830                                    })
 831                                } else {
 832                                    false
 833                                }
 834                            });
 835                            cx.emit(Event::RemoteProjectUnshared {
 836                                project_id: *unshared_project_id,
 837                            });
 838                        }
 839
 840                        let role = participant.role();
 841                        let location = ParticipantLocation::from_proto(participant.location)
 842                            .unwrap_or(ParticipantLocation::External);
 843                        if let Some(remote_participant) =
 844                            this.remote_participants.get_mut(&participant.user_id)
 845                        {
 846                            remote_participant.peer_id = peer_id;
 847                            remote_participant.projects = participant.projects;
 848                            remote_participant.participant_index = participant_index;
 849                            if location != remote_participant.location
 850                                || role != remote_participant.role
 851                            {
 852                                remote_participant.location = location;
 853                                remote_participant.role = role;
 854                                cx.emit(Event::ParticipantLocationChanged {
 855                                    participant_id: peer_id,
 856                                });
 857                            }
 858                        } else {
 859                            this.remote_participants.insert(
 860                                participant.user_id,
 861                                RemoteParticipant {
 862                                    user: user.clone(),
 863                                    participant_index,
 864                                    peer_id,
 865                                    projects: participant.projects,
 866                                    location,
 867                                    role,
 868                                    muted: true,
 869                                    speaking: false,
 870                                    video_tracks: Default::default(),
 871                                    audio_tracks: Default::default(),
 872                                },
 873                            );
 874
 875                            Audio::play_sound(Sound::Joined, cx);
 876
 877                            if let Some(live_kit) = this.live_kit.as_ref() {
 878                                let video_tracks =
 879                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 880                                let audio_tracks =
 881                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 882                                let publications = live_kit
 883                                    .room
 884                                    .remote_audio_track_publications(&user.id.to_string());
 885
 886                                for track in video_tracks {
 887                                    this.live_kit_room_updated(
 888                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
 889                                        cx,
 890                                    )
 891                                    .log_err();
 892                                }
 893
 894                                for (track, publication) in
 895                                    audio_tracks.iter().zip(publications.iter())
 896                                {
 897                                    this.live_kit_room_updated(
 898                                        RoomUpdate::SubscribedToRemoteAudioTrack(
 899                                            track.clone(),
 900                                            publication.clone(),
 901                                        ),
 902                                        cx,
 903                                    )
 904                                    .log_err();
 905                                }
 906                            }
 907                        }
 908                    }
 909
 910                    this.remote_participants.retain(|user_id, participant| {
 911                        if this.participant_user_ids.contains(user_id) {
 912                            true
 913                        } else {
 914                            for project in &participant.projects {
 915                                cx.emit(Event::RemoteProjectUnshared {
 916                                    project_id: project.id,
 917                                });
 918                            }
 919                            false
 920                        }
 921                    });
 922                }
 923
 924                if let Some(pending_participants) = pending_participants.log_err() {
 925                    this.pending_participants = pending_participants;
 926                    for participant in &this.pending_participants {
 927                        this.participant_user_ids.insert(participant.id);
 928                    }
 929                }
 930
 931                this.follows_by_leader_id_project_id.clear();
 932                for follower in room.followers {
 933                    let project_id = follower.project_id;
 934                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 935                        (Some(leader), Some(follower)) => (leader, follower),
 936
 937                        _ => {
 938                            log::error!("Follower message {follower:?} missing some state");
 939                            continue;
 940                        }
 941                    };
 942
 943                    let list = this
 944                        .follows_by_leader_id_project_id
 945                        .entry((leader, project_id))
 946                        .or_default();
 947                    if !list.contains(&follower) {
 948                        list.push(follower);
 949                    }
 950                }
 951
 952                this.pending_room_update.take();
 953                if this.should_leave() {
 954                    log::info!("room is empty, leaving");
 955                    this.leave(cx).detach();
 956                }
 957
 958                this.user_store.update(cx, |user_store, cx| {
 959                    let participant_indices_by_user_id = this
 960                        .remote_participants
 961                        .iter()
 962                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 963                        .collect();
 964                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 965                });
 966
 967                this.check_invariants();
 968                this.room_update_completed_tx.try_send(Some(())).ok();
 969                cx.notify();
 970            })
 971            .ok();
 972        }));
 973
 974        cx.notify();
 975        Ok(())
 976    }
 977
 978    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 979        let mut done_rx = self.room_update_completed_rx.clone();
 980        async move {
 981            while let Some(result) = done_rx.next().await {
 982                if result.is_some() {
 983                    break;
 984                }
 985            }
 986        }
 987    }
 988
 989    fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
 990        match update {
 991            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
 992                let user_id = track.publisher_id().parse()?;
 993                let track_id = track.sid().to_string();
 994                let participant = self
 995                    .remote_participants
 996                    .get_mut(&user_id)
 997                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 998                participant.video_tracks.insert(track_id.clone(), track);
 999                cx.emit(Event::RemoteVideoTracksChanged {
1000                    participant_id: participant.peer_id,
1001                });
1002            }
1003
1004            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
1005                publisher_id,
1006                track_id,
1007            } => {
1008                let user_id = publisher_id.parse()?;
1009                let participant = self
1010                    .remote_participants
1011                    .get_mut(&user_id)
1012                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1013                participant.video_tracks.remove(&track_id);
1014                cx.emit(Event::RemoteVideoTracksChanged {
1015                    participant_id: participant.peer_id,
1016                });
1017            }
1018
1019            RoomUpdate::ActiveSpeakersChanged { speakers } => {
1020                let mut speaker_ids = speakers
1021                    .into_iter()
1022                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
1023                    .collect::<Vec<u64>>();
1024                speaker_ids.sort_unstable();
1025                for (sid, participant) in &mut self.remote_participants {
1026                    participant.speaking = speaker_ids.binary_search(sid).is_ok();
1027                }
1028                if let Some(id) = self.client.user_id() {
1029                    if let Some(room) = &mut self.live_kit {
1030                        room.speaking = speaker_ids.binary_search(&id).is_ok();
1031                    }
1032                }
1033            }
1034
1035            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1036                let mut found = false;
1037                for participant in &mut self.remote_participants.values_mut() {
1038                    for track in participant.audio_tracks.values() {
1039                        if track.sid() == track_id {
1040                            found = true;
1041                            break;
1042                        }
1043                    }
1044                    if found {
1045                        participant.muted = muted;
1046                        break;
1047                    }
1048                }
1049            }
1050
1051            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1052                if let Some(live_kit) = &self.live_kit {
1053                    if live_kit.deafened {
1054                        track.stop();
1055                        cx.foreground_executor()
1056                            .spawn(publication.set_enabled(false))
1057                            .detach();
1058                    }
1059                }
1060
1061                let user_id = track.publisher_id().parse()?;
1062                let track_id = track.sid().to_string();
1063                let participant = self
1064                    .remote_participants
1065                    .get_mut(&user_id)
1066                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1067                participant.audio_tracks.insert(track_id.clone(), track);
1068                participant.muted = publication.is_muted();
1069
1070                cx.emit(Event::RemoteAudioTracksChanged {
1071                    participant_id: participant.peer_id,
1072                });
1073            }
1074
1075            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1076                publisher_id,
1077                track_id,
1078            } => {
1079                let user_id = publisher_id.parse()?;
1080                let participant = self
1081                    .remote_participants
1082                    .get_mut(&user_id)
1083                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1084                participant.audio_tracks.remove(&track_id);
1085                cx.emit(Event::RemoteAudioTracksChanged {
1086                    participant_id: participant.peer_id,
1087                });
1088            }
1089
1090            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1091                log::info!("unpublished audio track {}", publication.sid());
1092                if let Some(room) = &mut self.live_kit {
1093                    room.microphone_track = LocalTrack::None;
1094                }
1095            }
1096
1097            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1098                log::info!("unpublished video track {}", publication.sid());
1099                if let Some(room) = &mut self.live_kit {
1100                    room.screen_track = LocalTrack::None;
1101                }
1102            }
1103
1104            RoomUpdate::LocalAudioTrackPublished { publication } => {
1105                log::info!("published audio track {}", publication.sid());
1106            }
1107
1108            RoomUpdate::LocalVideoTrackPublished { publication } => {
1109                log::info!("published video track {}", publication.sid());
1110            }
1111        }
1112
1113        cx.notify();
1114        Ok(())
1115    }
1116
1117    fn check_invariants(&self) {
1118        #[cfg(any(test, feature = "test-support"))]
1119        {
1120            for participant in self.remote_participants.values() {
1121                assert!(self.participant_user_ids.contains(&participant.user.id));
1122                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1123            }
1124
1125            for participant in &self.pending_participants {
1126                assert!(self.participant_user_ids.contains(&participant.id));
1127                assert_ne!(participant.id, self.client.user_id().unwrap());
1128            }
1129
1130            assert_eq!(
1131                self.participant_user_ids.len(),
1132                self.remote_participants.len() + self.pending_participants.len()
1133            );
1134        }
1135    }
1136
1137    pub(crate) fn call(
1138        &mut self,
1139        called_user_id: u64,
1140        initial_project_id: Option<u64>,
1141        cx: &mut Context<Self>,
1142    ) -> Task<Result<()>> {
1143        if self.status.is_offline() {
1144            return Task::ready(Err(anyhow!("room is offline")));
1145        }
1146
1147        cx.notify();
1148        let client = self.client.clone();
1149        let room_id = self.id;
1150        self.pending_call_count += 1;
1151        cx.spawn(async move |this, cx| {
1152            let result = client
1153                .request(proto::Call {
1154                    room_id,
1155                    called_user_id,
1156                    initial_project_id,
1157                })
1158                .await;
1159            this.update(cx, |this, cx| {
1160                this.pending_call_count -= 1;
1161                if this.should_leave() {
1162                    this.leave(cx).detach_and_log_err(cx);
1163                }
1164            })?;
1165            result?;
1166            Ok(())
1167        })
1168    }
1169
1170    pub fn join_project(
1171        &mut self,
1172        id: u64,
1173        language_registry: Arc<LanguageRegistry>,
1174        fs: Arc<dyn Fs>,
1175        cx: &mut Context<Self>,
1176    ) -> Task<Result<Entity<Project>>> {
1177        let client = self.client.clone();
1178        let user_store = self.user_store.clone();
1179        cx.emit(Event::RemoteProjectJoined { project_id: id });
1180        cx.spawn(async move |this, cx| {
1181            let project =
1182                Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1183
1184            this.update(cx, |this, cx| {
1185                this.joined_projects.retain(|project| {
1186                    if let Some(project) = project.upgrade() {
1187                        !project.read(cx).is_disconnected(cx)
1188                    } else {
1189                        false
1190                    }
1191                });
1192                this.joined_projects.insert(project.downgrade());
1193            })?;
1194            Ok(project)
1195        })
1196    }
1197
1198    pub fn share_project(
1199        &mut self,
1200        project: Entity<Project>,
1201        cx: &mut Context<Self>,
1202    ) -> Task<Result<u64>> {
1203        if let Some(project_id) = project.read(cx).remote_id() {
1204            return Task::ready(Ok(project_id));
1205        }
1206
1207        let request = self.client.request(proto::ShareProject {
1208            room_id: self.id(),
1209            worktrees: project.read(cx).worktree_metadata_protos(cx),
1210            is_ssh_project: project.read(cx).is_via_ssh(),
1211        });
1212
1213        cx.spawn(async move |this, cx| {
1214            let response = request.await?;
1215
1216            project.update(cx, |project, cx| project.shared(response.project_id, cx))??;
1217
1218            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1219            this.update(cx, |this, cx| {
1220                this.shared_projects.insert(project.downgrade());
1221                let active_project = this.local_participant.active_project.as_ref();
1222                if active_project.map_or(false, |location| *location == project) {
1223                    this.set_location(Some(&project), cx)
1224                } else {
1225                    Task::ready(Ok(()))
1226                }
1227            })?
1228            .await?;
1229
1230            Ok(response.project_id)
1231        })
1232    }
1233
1234    pub(crate) fn unshare_project(
1235        &mut self,
1236        project: Entity<Project>,
1237        cx: &mut Context<Self>,
1238    ) -> Result<()> {
1239        let project_id = match project.read(cx).remote_id() {
1240            Some(project_id) => project_id,
1241            None => return Ok(()),
1242        };
1243
1244        self.client.send(proto::UnshareProject { project_id })?;
1245        project.update(cx, |this, cx| this.unshare(cx))?;
1246
1247        if self.local_participant.active_project == Some(project.downgrade()) {
1248            self.set_location(Some(&project), cx).detach_and_log_err(cx);
1249        }
1250        Ok(())
1251    }
1252
1253    pub(crate) fn set_location(
1254        &mut self,
1255        project: Option<&Entity<Project>>,
1256        cx: &mut Context<Self>,
1257    ) -> Task<Result<()>> {
1258        if self.status.is_offline() {
1259            return Task::ready(Err(anyhow!("room is offline")));
1260        }
1261
1262        let client = self.client.clone();
1263        let room_id = self.id;
1264        let location = if let Some(project) = project {
1265            self.local_participant.active_project = Some(project.downgrade());
1266            if let Some(project_id) = project.read(cx).remote_id() {
1267                proto::participant_location::Variant::SharedProject(
1268                    proto::participant_location::SharedProject { id: project_id },
1269                )
1270            } else {
1271                proto::participant_location::Variant::UnsharedProject(
1272                    proto::participant_location::UnsharedProject {},
1273                )
1274            }
1275        } else {
1276            self.local_participant.active_project = None;
1277            proto::participant_location::Variant::External(proto::participant_location::External {})
1278        };
1279
1280        cx.notify();
1281        cx.background_spawn(async move {
1282            client
1283                .request(proto::UpdateParticipantLocation {
1284                    room_id,
1285                    location: Some(proto::ParticipantLocation {
1286                        variant: Some(location),
1287                    }),
1288                })
1289                .await?;
1290            Ok(())
1291        })
1292    }
1293
1294    pub fn is_screen_sharing(&self) -> bool {
1295        self.live_kit.as_ref().map_or(false, |live_kit| {
1296            !matches!(live_kit.screen_track, LocalTrack::None)
1297        })
1298    }
1299
1300    pub fn is_sharing_mic(&self) -> bool {
1301        self.live_kit.as_ref().map_or(false, |live_kit| {
1302            !matches!(live_kit.microphone_track, LocalTrack::None)
1303        })
1304    }
1305
1306    pub fn is_muted(&self) -> bool {
1307        self.live_kit.as_ref().map_or(false, |live_kit| {
1308            matches!(live_kit.microphone_track, LocalTrack::None)
1309                || live_kit.muted_by_user
1310                || live_kit.deafened
1311        })
1312    }
1313
1314    pub fn muted_by_user(&self) -> bool {
1315        self.live_kit
1316            .as_ref()
1317            .map_or(false, |live_kit| live_kit.muted_by_user)
1318    }
1319
1320    pub fn is_speaking(&self) -> bool {
1321        self.live_kit
1322            .as_ref()
1323            .map_or(false, |live_kit| live_kit.speaking)
1324    }
1325
1326    pub fn is_deafened(&self) -> Option<bool> {
1327        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1328    }
1329
1330    pub fn can_use_microphone(&self) -> bool {
1331        use proto::ChannelRole::*;
1332        match self.local_participant.role {
1333            Admin | Member | Talker => true,
1334            Guest | Banned => false,
1335        }
1336    }
1337
1338    pub fn can_share_projects(&self) -> bool {
1339        use proto::ChannelRole::*;
1340        match self.local_participant.role {
1341            Admin | Member => true,
1342            Guest | Banned | Talker => false,
1343        }
1344    }
1345
1346    #[track_caller]
1347    pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1348        if self.status.is_offline() {
1349            return Task::ready(Err(anyhow!("room is offline")));
1350        }
1351
1352        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1353            let publish_id = post_inc(&mut live_kit.next_publish_id);
1354            live_kit.microphone_track = LocalTrack::Pending { publish_id };
1355            cx.notify();
1356            publish_id
1357        } else {
1358            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1359        };
1360
1361        cx.spawn(async move |this, cx| {
1362            let publish_track = async {
1363                let track = LocalAudioTrack::create();
1364                this.upgrade()
1365                    .ok_or_else(|| anyhow!("room was dropped"))?
1366                    .update(cx, |this, _| {
1367                        this.live_kit
1368                            .as_ref()
1369                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1370                    })?
1371                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1372                    .await
1373            };
1374            let publication = publish_track.await;
1375            this.upgrade()
1376                .ok_or_else(|| anyhow!("room was dropped"))?
1377                .update(cx, |this, cx| {
1378                    let live_kit = this
1379                        .live_kit
1380                        .as_mut()
1381                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1382
1383                    let canceled = if let LocalTrack::Pending {
1384                        publish_id: cur_publish_id,
1385                    } = &live_kit.microphone_track
1386                    {
1387                        *cur_publish_id != publish_id
1388                    } else {
1389                        true
1390                    };
1391
1392                    match publication {
1393                        Ok(publication) => {
1394                            if canceled {
1395                                live_kit.room.unpublish_track(publication);
1396                            } else {
1397                                if live_kit.muted_by_user || live_kit.deafened {
1398                                    cx.background_spawn(publication.set_mute(true)).detach();
1399                                }
1400                                live_kit.microphone_track = LocalTrack::Published {
1401                                    track_publication: publication,
1402                                };
1403                                cx.notify();
1404                            }
1405                            Ok(())
1406                        }
1407                        Err(error) => {
1408                            if canceled {
1409                                Ok(())
1410                            } else {
1411                                live_kit.microphone_track = LocalTrack::None;
1412                                cx.notify();
1413                                Err(error)
1414                            }
1415                        }
1416                    }
1417                })?
1418        })
1419    }
1420
1421    pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1422        if self.status.is_offline() {
1423            return Task::ready(Err(anyhow!("room is offline")));
1424        } else if self.is_screen_sharing() {
1425            return Task::ready(Err(anyhow!("screen was already shared")));
1426        }
1427
1428        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1429            let publish_id = post_inc(&mut live_kit.next_publish_id);
1430            live_kit.screen_track = LocalTrack::Pending { publish_id };
1431            cx.notify();
1432            (live_kit.room.display_sources(), publish_id)
1433        } else {
1434            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1435        };
1436
1437        cx.spawn(async move |this, cx| {
1438            let publish_track = async {
1439                let displays = displays.await?;
1440                let display = displays
1441                    .first()
1442                    .ok_or_else(|| anyhow!("no display found"))?;
1443                let track = LocalVideoTrack::screen_share_for_display(display);
1444                this.upgrade()
1445                    .ok_or_else(|| anyhow!("room was dropped"))?
1446                    .update(cx, |this, _| {
1447                        this.live_kit
1448                            .as_ref()
1449                            .map(|live_kit| live_kit.room.publish_video_track(track))
1450                    })?
1451                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1452                    .await
1453            };
1454
1455            let publication = publish_track.await;
1456            this.upgrade()
1457                .ok_or_else(|| anyhow!("room was dropped"))?
1458                .update(cx, |this, cx| {
1459                    let live_kit = this
1460                        .live_kit
1461                        .as_mut()
1462                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1463
1464                    let canceled = if let LocalTrack::Pending {
1465                        publish_id: cur_publish_id,
1466                    } = &live_kit.screen_track
1467                    {
1468                        *cur_publish_id != publish_id
1469                    } else {
1470                        true
1471                    };
1472
1473                    match publication {
1474                        Ok(publication) => {
1475                            if canceled {
1476                                live_kit.room.unpublish_track(publication);
1477                            } else {
1478                                live_kit.screen_track = LocalTrack::Published {
1479                                    track_publication: publication,
1480                                };
1481                                cx.notify();
1482                            }
1483
1484                            Audio::play_sound(Sound::StartScreenshare, cx);
1485
1486                            Ok(())
1487                        }
1488                        Err(error) => {
1489                            if canceled {
1490                                Ok(())
1491                            } else {
1492                                live_kit.screen_track = LocalTrack::None;
1493                                cx.notify();
1494                                Err(error)
1495                            }
1496                        }
1497                    }
1498                })?
1499        })
1500    }
1501
1502    pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1503        if let Some(live_kit) = self.live_kit.as_mut() {
1504            // When unmuting, undeafen if the user was deafened before.
1505            let was_deafened = live_kit.deafened;
1506            if live_kit.muted_by_user
1507                || live_kit.deafened
1508                || matches!(live_kit.microphone_track, LocalTrack::None)
1509            {
1510                live_kit.muted_by_user = false;
1511                live_kit.deafened = false;
1512            } else {
1513                live_kit.muted_by_user = true;
1514            }
1515            let muted = live_kit.muted_by_user;
1516            let should_undeafen = was_deafened && !live_kit.deafened;
1517
1518            if let Some(task) = self.set_mute(muted, cx) {
1519                task.detach_and_log_err(cx);
1520            }
1521
1522            if should_undeafen {
1523                if let Some(task) = self.set_deafened(false, cx) {
1524                    task.detach_and_log_err(cx);
1525                }
1526            }
1527        }
1528    }
1529
1530    pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1531        if let Some(live_kit) = self.live_kit.as_mut() {
1532            // When deafening, mute the microphone if it was not already muted.
1533            // When un-deafening, unmute the microphone, unless it was explicitly muted.
1534            let deafened = !live_kit.deafened;
1535            live_kit.deafened = deafened;
1536            let should_change_mute = !live_kit.muted_by_user;
1537
1538            if let Some(task) = self.set_deafened(deafened, cx) {
1539                task.detach_and_log_err(cx);
1540            }
1541
1542            if should_change_mute {
1543                if let Some(task) = self.set_mute(deafened, cx) {
1544                    task.detach_and_log_err(cx);
1545                }
1546            }
1547        }
1548    }
1549
1550    pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1551        if self.status.is_offline() {
1552            return Err(anyhow!("room is offline"));
1553        }
1554
1555        let live_kit = self
1556            .live_kit
1557            .as_mut()
1558            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1559        match mem::take(&mut live_kit.screen_track) {
1560            LocalTrack::None => Err(anyhow!("screen was not shared")),
1561            LocalTrack::Pending { .. } => {
1562                cx.notify();
1563                Ok(())
1564            }
1565            LocalTrack::Published {
1566                track_publication, ..
1567            } => {
1568                live_kit.room.unpublish_track(track_publication);
1569                cx.notify();
1570
1571                Audio::play_sound(Sound::StopScreenshare, cx);
1572                Ok(())
1573            }
1574        }
1575    }
1576
1577    fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1578        let live_kit = self.live_kit.as_mut()?;
1579        cx.notify();
1580
1581        let mut track_updates = Vec::new();
1582        for participant in self.remote_participants.values() {
1583            for publication in live_kit
1584                .room
1585                .remote_audio_track_publications(&participant.user.id.to_string())
1586            {
1587                track_updates.push(publication.set_enabled(!deafened));
1588            }
1589
1590            for track in participant.audio_tracks.values() {
1591                if deafened {
1592                    track.stop();
1593                } else {
1594                    track.start();
1595                }
1596            }
1597        }
1598
1599        Some(cx.foreground_executor().spawn(async move {
1600            for result in futures::future::join_all(track_updates).await {
1601                result?;
1602            }
1603            Ok(())
1604        }))
1605    }
1606
1607    fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1608        let live_kit = self.live_kit.as_mut()?;
1609        cx.notify();
1610
1611        if should_mute {
1612            Audio::play_sound(Sound::Mute, cx);
1613        } else {
1614            Audio::play_sound(Sound::Unmute, cx);
1615        }
1616
1617        match &mut live_kit.microphone_track {
1618            LocalTrack::None => {
1619                if should_mute {
1620                    None
1621                } else {
1622                    Some(self.share_microphone(cx))
1623                }
1624            }
1625            LocalTrack::Pending { .. } => None,
1626            LocalTrack::Published { track_publication } => Some(
1627                cx.foreground_executor()
1628                    .spawn(track_publication.set_mute(should_mute)),
1629            ),
1630        }
1631    }
1632
1633    #[cfg(any(test, feature = "test-support"))]
1634    pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1635        self.live_kit
1636            .as_ref()
1637            .unwrap()
1638            .room
1639            .set_display_sources(sources);
1640    }
1641}
1642
1643struct LiveKitRoom {
1644    room: Arc<livekit_client_macos::Room>,
1645    screen_track: LocalTrack,
1646    microphone_track: LocalTrack,
1647    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1648    muted_by_user: bool,
1649    deafened: bool,
1650    speaking: bool,
1651    next_publish_id: usize,
1652    _maintain_room: Task<()>,
1653    _handle_updates: Task<()>,
1654}
1655
1656impl LiveKitRoom {
1657    fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1658        if let LocalTrack::Published {
1659            track_publication, ..
1660        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1661        {
1662            self.room.unpublish_track(track_publication);
1663            cx.notify();
1664        }
1665
1666        if let LocalTrack::Published {
1667            track_publication, ..
1668        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1669        {
1670            self.room.unpublish_track(track_publication);
1671            cx.notify();
1672        }
1673    }
1674}
1675
1676enum LocalTrack {
1677    None,
1678    Pending {
1679        publish_id: usize,
1680    },
1681    Published {
1682        track_publication: LocalTrackPublication,
1683    },
1684}
1685
1686impl Default for LocalTrack {
1687    fn default() -> Self {
1688        Self::None
1689    }
1690}
1691
1692#[derive(Copy, Clone, PartialEq, Eq)]
1693pub enum RoomStatus {
1694    Online,
1695    Rejoining,
1696    Offline,
1697}
1698
1699impl RoomStatus {
1700    pub fn is_offline(&self) -> bool {
1701        matches!(self, RoomStatus::Offline)
1702    }
1703
1704    pub fn is_online(&self) -> bool {
1705        matches!(self, RoomStatus::Online)
1706    }
1707}