room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
  15use language::LanguageRegistry;
  16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
  17use postage::{sink::Sink, stream::Stream, watch};
  18use project::Project;
  19use settings::Settings as _;
  20use std::{future::Future, mem, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    RoomJoined {
  28        channel_id: Option<ChannelId>,
  29    },
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    RoomLeft {
  54        channel_id: Option<ChannelId>,
  55    },
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<ChannelId>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakEntity<Project>>,
  64    joined_projects: HashSet<WeakEntity<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Entity<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<ChannelId> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                livekit_client_macos::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<ChannelId>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Entity<UserStore>,
 111        cx: &mut Context<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = livekit_client_macos::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(|this, mut cx| async move {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == livekit_client_macos::ConnectionState::Disconnected {
 127                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 128                            .ok();
 129                        break;
 130                    }
 131                }
 132            });
 133
 134            let _handle_updates = cx.spawn({
 135                let room = room.clone();
 136                move |this, mut cx| async move {
 137                    let mut updates = room.updates();
 138                    while let Some(update) = updates.next().await {
 139                        let this = if let Some(this) = this.upgrade() {
 140                            this
 141                        } else {
 142                            break;
 143                        };
 144
 145                        this.update(&mut cx, |this, cx| {
 146                            this.live_kit_room_updated(update, cx).log_err()
 147                        })
 148                        .ok();
 149                    }
 150                }
 151            });
 152
 153            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 154            cx.spawn(|this, mut cx| async move {
 155                connect.await?;
 156                this.update(&mut cx, |this, cx| {
 157                    if this.can_use_microphone() {
 158                        if let Some(live_kit) = &this.live_kit {
 159                            if !live_kit.muted_by_user && !live_kit.deafened {
 160                                return this.share_microphone(cx);
 161                            }
 162                        }
 163                    }
 164                    Task::ready(Ok(()))
 165                })?
 166                .await
 167            })
 168            .detach_and_log_err(cx);
 169
 170            Some(LiveKitRoom {
 171                room,
 172                screen_track: LocalTrack::None,
 173                microphone_track: LocalTrack::None,
 174                next_publish_id: 0,
 175                muted_by_user: Self::mute_on_join(cx),
 176                deafened: false,
 177                speaking: false,
 178                _maintain_room,
 179                _handle_updates,
 180            })
 181        } else {
 182            None
 183        };
 184
 185        let maintain_connection = cx.spawn({
 186            let client = client.clone();
 187            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 188        });
 189
 190        Audio::play_sound(Sound::Joined, cx);
 191
 192        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 193
 194        Self {
 195            id,
 196            channel_id,
 197            live_kit: live_kit_room,
 198            status: RoomStatus::Online,
 199            shared_projects: Default::default(),
 200            joined_projects: Default::default(),
 201            participant_user_ids: Default::default(),
 202            local_participant: Default::default(),
 203            remote_participants: Default::default(),
 204            pending_participants: Default::default(),
 205            pending_call_count: 0,
 206            client_subscriptions: vec![
 207                client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
 208            ],
 209            _subscriptions: vec![
 210                cx.on_release(Self::released),
 211                cx.on_app_quit(Self::app_will_quit),
 212            ],
 213            leave_when_empty: false,
 214            pending_room_update: None,
 215            client,
 216            user_store,
 217            follows_by_leader_id_project_id: Default::default(),
 218            maintain_connection: Some(maintain_connection),
 219            room_update_completed_tx,
 220            room_update_completed_rx,
 221        }
 222    }
 223
 224    pub(crate) fn create(
 225        called_user_id: u64,
 226        initial_project: Option<Entity<Project>>,
 227        client: Arc<Client>,
 228        user_store: Entity<UserStore>,
 229        cx: &mut App,
 230    ) -> Task<Result<Entity<Self>>> {
 231        cx.spawn(move |mut cx| async move {
 232            let response = client.request(proto::CreateRoom {}).await?;
 233            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 234            let room = cx.new(|cx| {
 235                let mut room = Self::new(
 236                    room_proto.id,
 237                    None,
 238                    response.live_kit_connection_info,
 239                    client,
 240                    user_store,
 241                    cx,
 242                );
 243                if let Some(participant) = room_proto.participants.first() {
 244                    room.local_participant.role = participant.role()
 245                }
 246                room
 247            })?;
 248
 249            let initial_project_id = if let Some(initial_project) = initial_project {
 250                let initial_project_id = room
 251                    .update(&mut cx, |room, cx| {
 252                        room.share_project(initial_project.clone(), cx)
 253                    })?
 254                    .await?;
 255                Some(initial_project_id)
 256            } else {
 257                None
 258            };
 259
 260            let did_join = room
 261                .update(&mut cx, |room, cx| {
 262                    room.leave_when_empty = true;
 263                    room.call(called_user_id, initial_project_id, cx)
 264                })?
 265                .await;
 266            match did_join {
 267                Ok(()) => Ok(room),
 268                Err(error) => Err(error.context("room creation failed")),
 269            }
 270        })
 271    }
 272
 273    pub(crate) async fn join_channel(
 274        channel_id: ChannelId,
 275        client: Arc<Client>,
 276        user_store: Entity<UserStore>,
 277        cx: AsyncApp,
 278    ) -> Result<Entity<Self>> {
 279        Self::from_join_response(
 280            client
 281                .request(proto::JoinChannel {
 282                    channel_id: channel_id.0,
 283                })
 284                .await?,
 285            client,
 286            user_store,
 287            cx,
 288        )
 289    }
 290
 291    pub(crate) async fn join(
 292        room_id: u64,
 293        client: Arc<Client>,
 294        user_store: Entity<UserStore>,
 295        cx: AsyncApp,
 296    ) -> Result<Entity<Self>> {
 297        Self::from_join_response(
 298            client.request(proto::JoinRoom { id: room_id }).await?,
 299            client,
 300            user_store,
 301            cx,
 302        )
 303    }
 304
 305    fn released(&mut self, cx: &mut App) {
 306        if self.status.is_online() {
 307            self.leave_internal(cx).detach_and_log_err(cx);
 308        }
 309    }
 310
 311    fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
 312        let task = if self.status.is_online() {
 313            let leave = self.leave_internal(cx);
 314            Some(cx.background_executor().spawn(async move {
 315                leave.await.log_err();
 316            }))
 317        } else {
 318            None
 319        };
 320
 321        async move {
 322            if let Some(task) = task {
 323                task.await;
 324            }
 325        }
 326    }
 327
 328    pub fn mute_on_join(cx: &App) -> bool {
 329        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 330    }
 331
 332    fn from_join_response(
 333        response: proto::JoinRoomResponse,
 334        client: Arc<Client>,
 335        user_store: Entity<UserStore>,
 336        mut cx: AsyncApp,
 337    ) -> Result<Entity<Self>> {
 338        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 339        let room = cx.new(|cx| {
 340            Self::new(
 341                room_proto.id,
 342                response.channel_id.map(ChannelId),
 343                response.live_kit_connection_info,
 344                client,
 345                user_store,
 346                cx,
 347            )
 348        })?;
 349        room.update(&mut cx, |room, cx| {
 350            room.leave_when_empty = room.channel_id.is_none();
 351            room.apply_room_update(room_proto, cx)?;
 352            anyhow::Ok(())
 353        })??;
 354        Ok(room)
 355    }
 356
 357    fn should_leave(&self) -> bool {
 358        self.leave_when_empty
 359            && self.pending_room_update.is_none()
 360            && self.pending_participants.is_empty()
 361            && self.remote_participants.is_empty()
 362            && self.pending_call_count == 0
 363    }
 364
 365    pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 366        cx.notify();
 367        self.leave_internal(cx)
 368    }
 369
 370    fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
 371        if self.status.is_offline() {
 372            return Task::ready(Err(anyhow!("room is offline")));
 373        }
 374
 375        log::info!("leaving room");
 376        Audio::play_sound(Sound::Leave, cx);
 377
 378        self.clear_state(cx);
 379
 380        let leave_room = self.client.request(proto::LeaveRoom {});
 381        cx.background_executor().spawn(async move {
 382            leave_room.await?;
 383            anyhow::Ok(())
 384        })
 385    }
 386
 387    pub(crate) fn clear_state(&mut self, cx: &mut App) {
 388        for project in self.shared_projects.drain() {
 389            if let Some(project) = project.upgrade() {
 390                project.update(cx, |project, cx| {
 391                    project.unshare(cx).log_err();
 392                });
 393            }
 394        }
 395        for project in self.joined_projects.drain() {
 396            if let Some(project) = project.upgrade() {
 397                project.update(cx, |project, cx| {
 398                    project.disconnected_from_host(cx);
 399                    project.close(cx);
 400                });
 401            }
 402        }
 403
 404        self.status = RoomStatus::Offline;
 405        self.remote_participants.clear();
 406        self.pending_participants.clear();
 407        self.participant_user_ids.clear();
 408        self.client_subscriptions.clear();
 409        self.live_kit.take();
 410        self.pending_room_update.take();
 411        self.maintain_connection.take();
 412    }
 413
 414    async fn maintain_connection(
 415        this: WeakEntity<Self>,
 416        client: Arc<Client>,
 417        mut cx: AsyncApp,
 418    ) -> Result<()> {
 419        let mut client_status = client.status();
 420        loop {
 421            let _ = client_status.try_recv();
 422            let is_connected = client_status.borrow().is_connected();
 423            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 424            if !is_connected || client_status.next().await.is_some() {
 425                log::info!("detected client disconnection");
 426
 427                this.upgrade()
 428                    .ok_or_else(|| anyhow!("room was dropped"))?
 429                    .update(&mut cx, |this, cx| {
 430                        this.status = RoomStatus::Rejoining;
 431                        cx.notify();
 432                    })?;
 433
 434                // Wait for client to re-establish a connection to the server.
 435                {
 436                    let mut reconnection_timeout =
 437                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 438                    let client_reconnection = async {
 439                        let mut remaining_attempts = 3;
 440                        while remaining_attempts > 0 {
 441                            if client_status.borrow().is_connected() {
 442                                log::info!("client reconnected, attempting to rejoin room");
 443
 444                                let Some(this) = this.upgrade() else { break };
 445                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 446                                    Ok(task) => {
 447                                        if task.await.log_err().is_some() {
 448                                            return true;
 449                                        } else {
 450                                            remaining_attempts -= 1;
 451                                        }
 452                                    }
 453                                    Err(_app_dropped) => return false,
 454                                }
 455                            } else if client_status.borrow().is_signed_out() {
 456                                return false;
 457                            }
 458
 459                            log::info!(
 460                                "waiting for client status change, remaining attempts {}",
 461                                remaining_attempts
 462                            );
 463                            client_status.next().await;
 464                        }
 465                        false
 466                    }
 467                    .fuse();
 468                    futures::pin_mut!(client_reconnection);
 469
 470                    futures::select_biased! {
 471                        reconnected = client_reconnection => {
 472                            if reconnected {
 473                                log::info!("successfully reconnected to room");
 474                                // If we successfully joined the room, go back around the loop
 475                                // waiting for future connection status changes.
 476                                continue;
 477                            }
 478                        }
 479                        _ = reconnection_timeout => {
 480                            log::info!("room reconnection timeout expired");
 481                        }
 482                    }
 483                }
 484
 485                break;
 486            }
 487        }
 488
 489        // The client failed to re-establish a connection to the server
 490        // or an error occurred while trying to re-join the room. Either way
 491        // we leave the room and return an error.
 492        if let Some(this) = this.upgrade() {
 493            log::info!("reconnection failed, leaving room");
 494            this.update(&mut cx, |this, cx| this.leave(cx))?.await?;
 495        }
 496        Err(anyhow!(
 497            "can't reconnect to room: client failed to re-establish connection"
 498        ))
 499    }
 500
 501    fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 502        let mut projects = HashMap::default();
 503        let mut reshared_projects = Vec::new();
 504        let mut rejoined_projects = Vec::new();
 505        self.shared_projects.retain(|project| {
 506            if let Some(handle) = project.upgrade() {
 507                let project = handle.read(cx);
 508                if let Some(project_id) = project.remote_id() {
 509                    projects.insert(project_id, handle.clone());
 510                    reshared_projects.push(proto::UpdateProject {
 511                        project_id,
 512                        worktrees: project.worktree_metadata_protos(cx),
 513                    });
 514                    return true;
 515                }
 516            }
 517            false
 518        });
 519        self.joined_projects.retain(|project| {
 520            if let Some(handle) = project.upgrade() {
 521                let project = handle.read(cx);
 522                if let Some(project_id) = project.remote_id() {
 523                    projects.insert(project_id, handle.clone());
 524                    rejoined_projects.push(proto::RejoinProject {
 525                        id: project_id,
 526                        worktrees: project
 527                            .worktrees(cx)
 528                            .map(|worktree| {
 529                                let worktree = worktree.read(cx);
 530                                proto::RejoinWorktree {
 531                                    id: worktree.id().to_proto(),
 532                                    scan_id: worktree.completed_scan_id() as u64,
 533                                }
 534                            })
 535                            .collect(),
 536                    });
 537                }
 538                return true;
 539            }
 540            false
 541        });
 542
 543        let response = self.client.request_envelope(proto::RejoinRoom {
 544            id: self.id,
 545            reshared_projects,
 546            rejoined_projects,
 547        });
 548
 549        cx.spawn(|this, mut cx| async move {
 550            let response = response.await?;
 551            let message_id = response.message_id;
 552            let response = response.payload;
 553            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 554            this.update(&mut cx, |this, cx| {
 555                this.status = RoomStatus::Online;
 556                this.apply_room_update(room_proto, cx)?;
 557
 558                for reshared_project in response.reshared_projects {
 559                    if let Some(project) = projects.get(&reshared_project.id) {
 560                        project.update(cx, |project, cx| {
 561                            project.reshared(reshared_project, cx).log_err();
 562                        });
 563                    }
 564                }
 565
 566                for rejoined_project in response.rejoined_projects {
 567                    if let Some(project) = projects.get(&rejoined_project.id) {
 568                        project.update(cx, |project, cx| {
 569                            project.rejoined(rejoined_project, message_id, cx).log_err();
 570                        });
 571                    }
 572                }
 573
 574                anyhow::Ok(())
 575            })?
 576        })
 577    }
 578
 579    pub fn id(&self) -> u64 {
 580        self.id
 581    }
 582
 583    pub fn status(&self) -> RoomStatus {
 584        self.status
 585    }
 586
 587    pub fn local_participant(&self) -> &LocalParticipant {
 588        &self.local_participant
 589    }
 590
 591    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 592        &self.remote_participants
 593    }
 594
 595    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 596        self.remote_participants
 597            .values()
 598            .find(|p| p.peer_id == peer_id)
 599    }
 600
 601    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 602        self.remote_participants
 603            .get(&user_id)
 604            .map(|participant| participant.role)
 605    }
 606
 607    pub fn contains_guests(&self) -> bool {
 608        self.local_participant.role == proto::ChannelRole::Guest
 609            || self
 610                .remote_participants
 611                .values()
 612                .any(|p| p.role == proto::ChannelRole::Guest)
 613    }
 614
 615    pub fn local_participant_is_admin(&self) -> bool {
 616        self.local_participant.role == proto::ChannelRole::Admin
 617    }
 618
 619    pub fn local_participant_is_guest(&self) -> bool {
 620        self.local_participant.role == proto::ChannelRole::Guest
 621    }
 622
 623    pub fn set_participant_role(
 624        &mut self,
 625        user_id: u64,
 626        role: proto::ChannelRole,
 627        cx: &Context<Self>,
 628    ) -> Task<Result<()>> {
 629        let client = self.client.clone();
 630        let room_id = self.id;
 631        let role = role.into();
 632        cx.spawn(|_, _| async move {
 633            client
 634                .request(proto::SetRoomParticipantRole {
 635                    room_id,
 636                    user_id,
 637                    role,
 638                })
 639                .await
 640                .map(|_| ())
 641        })
 642    }
 643
 644    pub fn pending_participants(&self) -> &[Arc<User>] {
 645        &self.pending_participants
 646    }
 647
 648    pub fn contains_participant(&self, user_id: u64) -> bool {
 649        self.participant_user_ids.contains(&user_id)
 650    }
 651
 652    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 653        self.follows_by_leader_id_project_id
 654            .get(&(leader_id, project_id))
 655            .map_or(&[], |v| v.as_slice())
 656    }
 657
 658    /// Returns the most 'active' projects, defined as most people in the project
 659    pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
 660        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 661        for participant in self.remote_participants.values() {
 662            match participant.location {
 663                ParticipantLocation::SharedProject { project_id } => {
 664                    project_hosts_and_guest_counts
 665                        .entry(project_id)
 666                        .or_default()
 667                        .1 += 1;
 668                }
 669                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 670            }
 671            for project in &participant.projects {
 672                project_hosts_and_guest_counts
 673                    .entry(project.id)
 674                    .or_default()
 675                    .0 = Some(participant.user.id);
 676            }
 677        }
 678
 679        if let Some(user) = self.user_store.read(cx).current_user() {
 680            for project in &self.local_participant.projects {
 681                project_hosts_and_guest_counts
 682                    .entry(project.id)
 683                    .or_default()
 684                    .0 = Some(user.id);
 685            }
 686        }
 687
 688        project_hosts_and_guest_counts
 689            .into_iter()
 690            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 691            .max_by_key(|(_, _, guest_count)| *guest_count)
 692            .map(|(id, host, _)| (id, host))
 693    }
 694
 695    async fn handle_room_updated(
 696        this: Entity<Self>,
 697        envelope: TypedEnvelope<proto::RoomUpdated>,
 698        mut cx: AsyncApp,
 699    ) -> Result<()> {
 700        let room = envelope
 701            .payload
 702            .room
 703            .ok_or_else(|| anyhow!("invalid room"))?;
 704        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 705    }
 706
 707    fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
 708        // Filter ourselves out from the room's participants.
 709        let local_participant_ix = room
 710            .participants
 711            .iter()
 712            .position(|participant| Some(participant.user_id) == self.client.user_id());
 713        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 714
 715        let pending_participant_user_ids = room
 716            .pending_participants
 717            .iter()
 718            .map(|p| p.user_id)
 719            .collect::<Vec<_>>();
 720
 721        let remote_participant_user_ids = room
 722            .participants
 723            .iter()
 724            .map(|p| p.user_id)
 725            .collect::<Vec<_>>();
 726
 727        let (remote_participants, pending_participants) =
 728            self.user_store.update(cx, move |user_store, cx| {
 729                (
 730                    user_store.get_users(remote_participant_user_ids, cx),
 731                    user_store.get_users(pending_participant_user_ids, cx),
 732                )
 733            });
 734
 735        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 736            let (remote_participants, pending_participants) =
 737                futures::join!(remote_participants, pending_participants);
 738
 739            this.update(&mut cx, |this, cx| {
 740                this.participant_user_ids.clear();
 741
 742                if let Some(participant) = local_participant {
 743                    let role = participant.role();
 744                    this.local_participant.projects = participant.projects;
 745                    if this.local_participant.role != role {
 746                        this.local_participant.role = role;
 747
 748                        if role == proto::ChannelRole::Guest {
 749                            for project in mem::take(&mut this.shared_projects) {
 750                                if let Some(project) = project.upgrade() {
 751                                    this.unshare_project(project, cx).log_err();
 752                                }
 753                            }
 754                            this.local_participant.projects.clear();
 755                            if let Some(live_kit_room) = &mut this.live_kit {
 756                                live_kit_room.stop_publishing(cx);
 757                            }
 758                        }
 759
 760                        this.joined_projects.retain(|project| {
 761                            if let Some(project) = project.upgrade() {
 762                                project.update(cx, |project, cx| project.set_role(role, cx));
 763                                true
 764                            } else {
 765                                false
 766                            }
 767                        });
 768                    }
 769                } else {
 770                    this.local_participant.projects.clear();
 771                }
 772
 773                if let Some(participants) = remote_participants.log_err() {
 774                    for (participant, user) in room.participants.into_iter().zip(participants) {
 775                        let Some(peer_id) = participant.peer_id else {
 776                            continue;
 777                        };
 778                        let participant_index = ParticipantIndex(participant.participant_index);
 779                        this.participant_user_ids.insert(participant.user_id);
 780
 781                        let old_projects = this
 782                            .remote_participants
 783                            .get(&participant.user_id)
 784                            .into_iter()
 785                            .flat_map(|existing| &existing.projects)
 786                            .map(|project| project.id)
 787                            .collect::<HashSet<_>>();
 788                        let new_projects = participant
 789                            .projects
 790                            .iter()
 791                            .map(|project| project.id)
 792                            .collect::<HashSet<_>>();
 793
 794                        for project in &participant.projects {
 795                            if !old_projects.contains(&project.id) {
 796                                cx.emit(Event::RemoteProjectShared {
 797                                    owner: user.clone(),
 798                                    project_id: project.id,
 799                                    worktree_root_names: project.worktree_root_names.clone(),
 800                                });
 801                            }
 802                        }
 803
 804                        for unshared_project_id in old_projects.difference(&new_projects) {
 805                            this.joined_projects.retain(|project| {
 806                                if let Some(project) = project.upgrade() {
 807                                    project.update(cx, |project, cx| {
 808                                        if project.remote_id() == Some(*unshared_project_id) {
 809                                            project.disconnected_from_host(cx);
 810                                            false
 811                                        } else {
 812                                            true
 813                                        }
 814                                    })
 815                                } else {
 816                                    false
 817                                }
 818                            });
 819                            cx.emit(Event::RemoteProjectUnshared {
 820                                project_id: *unshared_project_id,
 821                            });
 822                        }
 823
 824                        let role = participant.role();
 825                        let location = ParticipantLocation::from_proto(participant.location)
 826                            .unwrap_or(ParticipantLocation::External);
 827                        if let Some(remote_participant) =
 828                            this.remote_participants.get_mut(&participant.user_id)
 829                        {
 830                            remote_participant.peer_id = peer_id;
 831                            remote_participant.projects = participant.projects;
 832                            remote_participant.participant_index = participant_index;
 833                            if location != remote_participant.location
 834                                || role != remote_participant.role
 835                            {
 836                                remote_participant.location = location;
 837                                remote_participant.role = role;
 838                                cx.emit(Event::ParticipantLocationChanged {
 839                                    participant_id: peer_id,
 840                                });
 841                            }
 842                        } else {
 843                            this.remote_participants.insert(
 844                                participant.user_id,
 845                                RemoteParticipant {
 846                                    user: user.clone(),
 847                                    participant_index,
 848                                    peer_id,
 849                                    projects: participant.projects,
 850                                    location,
 851                                    role,
 852                                    muted: true,
 853                                    speaking: false,
 854                                    video_tracks: Default::default(),
 855                                    audio_tracks: Default::default(),
 856                                },
 857                            );
 858
 859                            Audio::play_sound(Sound::Joined, cx);
 860
 861                            if let Some(live_kit) = this.live_kit.as_ref() {
 862                                let video_tracks =
 863                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 864                                let audio_tracks =
 865                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 866                                let publications = live_kit
 867                                    .room
 868                                    .remote_audio_track_publications(&user.id.to_string());
 869
 870                                for track in video_tracks {
 871                                    this.live_kit_room_updated(
 872                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
 873                                        cx,
 874                                    )
 875                                    .log_err();
 876                                }
 877
 878                                for (track, publication) in
 879                                    audio_tracks.iter().zip(publications.iter())
 880                                {
 881                                    this.live_kit_room_updated(
 882                                        RoomUpdate::SubscribedToRemoteAudioTrack(
 883                                            track.clone(),
 884                                            publication.clone(),
 885                                        ),
 886                                        cx,
 887                                    )
 888                                    .log_err();
 889                                }
 890                            }
 891                        }
 892                    }
 893
 894                    this.remote_participants.retain(|user_id, participant| {
 895                        if this.participant_user_ids.contains(user_id) {
 896                            true
 897                        } else {
 898                            for project in &participant.projects {
 899                                cx.emit(Event::RemoteProjectUnshared {
 900                                    project_id: project.id,
 901                                });
 902                            }
 903                            false
 904                        }
 905                    });
 906                }
 907
 908                if let Some(pending_participants) = pending_participants.log_err() {
 909                    this.pending_participants = pending_participants;
 910                    for participant in &this.pending_participants {
 911                        this.participant_user_ids.insert(participant.id);
 912                    }
 913                }
 914
 915                this.follows_by_leader_id_project_id.clear();
 916                for follower in room.followers {
 917                    let project_id = follower.project_id;
 918                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 919                        (Some(leader), Some(follower)) => (leader, follower),
 920
 921                        _ => {
 922                            log::error!("Follower message {follower:?} missing some state");
 923                            continue;
 924                        }
 925                    };
 926
 927                    let list = this
 928                        .follows_by_leader_id_project_id
 929                        .entry((leader, project_id))
 930                        .or_default();
 931                    if !list.contains(&follower) {
 932                        list.push(follower);
 933                    }
 934                }
 935
 936                this.pending_room_update.take();
 937                if this.should_leave() {
 938                    log::info!("room is empty, leaving");
 939                    this.leave(cx).detach();
 940                }
 941
 942                this.user_store.update(cx, |user_store, cx| {
 943                    let participant_indices_by_user_id = this
 944                        .remote_participants
 945                        .iter()
 946                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 947                        .collect();
 948                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 949                });
 950
 951                this.check_invariants();
 952                this.room_update_completed_tx.try_send(Some(())).ok();
 953                cx.notify();
 954            })
 955            .ok();
 956        }));
 957
 958        cx.notify();
 959        Ok(())
 960    }
 961
 962    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 963        let mut done_rx = self.room_update_completed_rx.clone();
 964        async move {
 965            while let Some(result) = done_rx.next().await {
 966                if result.is_some() {
 967                    break;
 968                }
 969            }
 970        }
 971    }
 972
 973    fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
 974        match update {
 975            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
 976                let user_id = track.publisher_id().parse()?;
 977                let track_id = track.sid().to_string();
 978                let participant = self
 979                    .remote_participants
 980                    .get_mut(&user_id)
 981                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 982                participant.video_tracks.insert(track_id.clone(), track);
 983                cx.emit(Event::RemoteVideoTracksChanged {
 984                    participant_id: participant.peer_id,
 985                });
 986            }
 987
 988            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
 989                publisher_id,
 990                track_id,
 991            } => {
 992                let user_id = publisher_id.parse()?;
 993                let participant = self
 994                    .remote_participants
 995                    .get_mut(&user_id)
 996                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 997                participant.video_tracks.remove(&track_id);
 998                cx.emit(Event::RemoteVideoTracksChanged {
 999                    participant_id: participant.peer_id,
1000                });
1001            }
1002
1003            RoomUpdate::ActiveSpeakersChanged { speakers } => {
1004                let mut speaker_ids = speakers
1005                    .into_iter()
1006                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
1007                    .collect::<Vec<u64>>();
1008                speaker_ids.sort_unstable();
1009                for (sid, participant) in &mut self.remote_participants {
1010                    participant.speaking = speaker_ids.binary_search(sid).is_ok();
1011                }
1012                if let Some(id) = self.client.user_id() {
1013                    if let Some(room) = &mut self.live_kit {
1014                        room.speaking = speaker_ids.binary_search(&id).is_ok();
1015                    }
1016                }
1017            }
1018
1019            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1020                let mut found = false;
1021                for participant in &mut self.remote_participants.values_mut() {
1022                    for track in participant.audio_tracks.values() {
1023                        if track.sid() == track_id {
1024                            found = true;
1025                            break;
1026                        }
1027                    }
1028                    if found {
1029                        participant.muted = muted;
1030                        break;
1031                    }
1032                }
1033            }
1034
1035            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1036                if let Some(live_kit) = &self.live_kit {
1037                    if live_kit.deafened {
1038                        track.stop();
1039                        cx.foreground_executor()
1040                            .spawn(publication.set_enabled(false))
1041                            .detach();
1042                    }
1043                }
1044
1045                let user_id = track.publisher_id().parse()?;
1046                let track_id = track.sid().to_string();
1047                let participant = self
1048                    .remote_participants
1049                    .get_mut(&user_id)
1050                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1051                participant.audio_tracks.insert(track_id.clone(), track);
1052                participant.muted = publication.is_muted();
1053
1054                cx.emit(Event::RemoteAudioTracksChanged {
1055                    participant_id: participant.peer_id,
1056                });
1057            }
1058
1059            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1060                publisher_id,
1061                track_id,
1062            } => {
1063                let user_id = publisher_id.parse()?;
1064                let participant = self
1065                    .remote_participants
1066                    .get_mut(&user_id)
1067                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1068                participant.audio_tracks.remove(&track_id);
1069                cx.emit(Event::RemoteAudioTracksChanged {
1070                    participant_id: participant.peer_id,
1071                });
1072            }
1073
1074            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1075                log::info!("unpublished audio track {}", publication.sid());
1076                if let Some(room) = &mut self.live_kit {
1077                    room.microphone_track = LocalTrack::None;
1078                }
1079            }
1080
1081            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1082                log::info!("unpublished video track {}", publication.sid());
1083                if let Some(room) = &mut self.live_kit {
1084                    room.screen_track = LocalTrack::None;
1085                }
1086            }
1087
1088            RoomUpdate::LocalAudioTrackPublished { publication } => {
1089                log::info!("published audio track {}", publication.sid());
1090            }
1091
1092            RoomUpdate::LocalVideoTrackPublished { publication } => {
1093                log::info!("published video track {}", publication.sid());
1094            }
1095        }
1096
1097        cx.notify();
1098        Ok(())
1099    }
1100
1101    fn check_invariants(&self) {
1102        #[cfg(any(test, feature = "test-support"))]
1103        {
1104            for participant in self.remote_participants.values() {
1105                assert!(self.participant_user_ids.contains(&participant.user.id));
1106                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1107            }
1108
1109            for participant in &self.pending_participants {
1110                assert!(self.participant_user_ids.contains(&participant.id));
1111                assert_ne!(participant.id, self.client.user_id().unwrap());
1112            }
1113
1114            assert_eq!(
1115                self.participant_user_ids.len(),
1116                self.remote_participants.len() + self.pending_participants.len()
1117            );
1118        }
1119    }
1120
1121    pub(crate) fn call(
1122        &mut self,
1123        called_user_id: u64,
1124        initial_project_id: Option<u64>,
1125        cx: &mut Context<Self>,
1126    ) -> Task<Result<()>> {
1127        if self.status.is_offline() {
1128            return Task::ready(Err(anyhow!("room is offline")));
1129        }
1130
1131        cx.notify();
1132        let client = self.client.clone();
1133        let room_id = self.id;
1134        self.pending_call_count += 1;
1135        cx.spawn(move |this, mut cx| async move {
1136            let result = client
1137                .request(proto::Call {
1138                    room_id,
1139                    called_user_id,
1140                    initial_project_id,
1141                })
1142                .await;
1143            this.update(&mut cx, |this, cx| {
1144                this.pending_call_count -= 1;
1145                if this.should_leave() {
1146                    this.leave(cx).detach_and_log_err(cx);
1147                }
1148            })?;
1149            result?;
1150            Ok(())
1151        })
1152    }
1153
1154    pub fn join_project(
1155        &mut self,
1156        id: u64,
1157        language_registry: Arc<LanguageRegistry>,
1158        fs: Arc<dyn Fs>,
1159        cx: &mut Context<Self>,
1160    ) -> Task<Result<Entity<Project>>> {
1161        let client = self.client.clone();
1162        let user_store = self.user_store.clone();
1163        cx.emit(Event::RemoteProjectJoined { project_id: id });
1164        cx.spawn(move |this, mut cx| async move {
1165            let project =
1166                Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1167
1168            this.update(&mut cx, |this, cx| {
1169                this.joined_projects.retain(|project| {
1170                    if let Some(project) = project.upgrade() {
1171                        !project.read(cx).is_disconnected(cx)
1172                    } else {
1173                        false
1174                    }
1175                });
1176                this.joined_projects.insert(project.downgrade());
1177            })?;
1178            Ok(project)
1179        })
1180    }
1181
1182    pub fn share_project(
1183        &mut self,
1184        project: Entity<Project>,
1185        cx: &mut Context<Self>,
1186    ) -> Task<Result<u64>> {
1187        if let Some(project_id) = project.read(cx).remote_id() {
1188            return Task::ready(Ok(project_id));
1189        }
1190
1191        let request = self.client.request(proto::ShareProject {
1192            room_id: self.id(),
1193            worktrees: project.read(cx).worktree_metadata_protos(cx),
1194            is_ssh_project: project.read(cx).is_via_ssh(),
1195        });
1196
1197        cx.spawn(|this, mut cx| async move {
1198            let response = request.await?;
1199
1200            project.update(&mut cx, |project, cx| {
1201                project.shared(response.project_id, cx)
1202            })??;
1203
1204            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1205            this.update(&mut cx, |this, cx| {
1206                this.shared_projects.insert(project.downgrade());
1207                let active_project = this.local_participant.active_project.as_ref();
1208                if active_project.map_or(false, |location| *location == project) {
1209                    this.set_location(Some(&project), cx)
1210                } else {
1211                    Task::ready(Ok(()))
1212                }
1213            })?
1214            .await?;
1215
1216            Ok(response.project_id)
1217        })
1218    }
1219
1220    pub(crate) fn unshare_project(
1221        &mut self,
1222        project: Entity<Project>,
1223        cx: &mut Context<Self>,
1224    ) -> Result<()> {
1225        let project_id = match project.read(cx).remote_id() {
1226            Some(project_id) => project_id,
1227            None => return Ok(()),
1228        };
1229
1230        self.client.send(proto::UnshareProject { project_id })?;
1231        project.update(cx, |this, cx| this.unshare(cx))?;
1232
1233        if self.local_participant.active_project == Some(project.downgrade()) {
1234            self.set_location(Some(&project), cx).detach_and_log_err(cx);
1235        }
1236        Ok(())
1237    }
1238
1239    pub(crate) fn set_location(
1240        &mut self,
1241        project: Option<&Entity<Project>>,
1242        cx: &mut Context<Self>,
1243    ) -> Task<Result<()>> {
1244        if self.status.is_offline() {
1245            return Task::ready(Err(anyhow!("room is offline")));
1246        }
1247
1248        let client = self.client.clone();
1249        let room_id = self.id;
1250        let location = if let Some(project) = project {
1251            self.local_participant.active_project = Some(project.downgrade());
1252            if let Some(project_id) = project.read(cx).remote_id() {
1253                proto::participant_location::Variant::SharedProject(
1254                    proto::participant_location::SharedProject { id: project_id },
1255                )
1256            } else {
1257                proto::participant_location::Variant::UnsharedProject(
1258                    proto::participant_location::UnsharedProject {},
1259                )
1260            }
1261        } else {
1262            self.local_participant.active_project = None;
1263            proto::participant_location::Variant::External(proto::participant_location::External {})
1264        };
1265
1266        cx.notify();
1267        cx.background_executor().spawn(async move {
1268            client
1269                .request(proto::UpdateParticipantLocation {
1270                    room_id,
1271                    location: Some(proto::ParticipantLocation {
1272                        variant: Some(location),
1273                    }),
1274                })
1275                .await?;
1276            Ok(())
1277        })
1278    }
1279
1280    pub fn is_screen_sharing(&self) -> bool {
1281        self.live_kit.as_ref().map_or(false, |live_kit| {
1282            !matches!(live_kit.screen_track, LocalTrack::None)
1283        })
1284    }
1285
1286    pub fn is_sharing_mic(&self) -> bool {
1287        self.live_kit.as_ref().map_or(false, |live_kit| {
1288            !matches!(live_kit.microphone_track, LocalTrack::None)
1289        })
1290    }
1291
1292    pub fn is_muted(&self) -> bool {
1293        self.live_kit.as_ref().map_or(false, |live_kit| {
1294            matches!(live_kit.microphone_track, LocalTrack::None)
1295                || live_kit.muted_by_user
1296                || live_kit.deafened
1297        })
1298    }
1299
1300    pub fn muted_by_user(&self) -> bool {
1301        self.live_kit
1302            .as_ref()
1303            .map_or(false, |live_kit| live_kit.muted_by_user)
1304    }
1305
1306    pub fn is_speaking(&self) -> bool {
1307        self.live_kit
1308            .as_ref()
1309            .map_or(false, |live_kit| live_kit.speaking)
1310    }
1311
1312    pub fn is_deafened(&self) -> Option<bool> {
1313        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1314    }
1315
1316    pub fn can_use_microphone(&self) -> bool {
1317        use proto::ChannelRole::*;
1318        match self.local_participant.role {
1319            Admin | Member | Talker => true,
1320            Guest | Banned => false,
1321        }
1322    }
1323
1324    pub fn can_share_projects(&self) -> bool {
1325        use proto::ChannelRole::*;
1326        match self.local_participant.role {
1327            Admin | Member => true,
1328            Guest | Banned | Talker => false,
1329        }
1330    }
1331
1332    #[track_caller]
1333    pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1334        if self.status.is_offline() {
1335            return Task::ready(Err(anyhow!("room is offline")));
1336        }
1337
1338        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1339            let publish_id = post_inc(&mut live_kit.next_publish_id);
1340            live_kit.microphone_track = LocalTrack::Pending { publish_id };
1341            cx.notify();
1342            publish_id
1343        } else {
1344            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1345        };
1346
1347        cx.spawn(move |this, mut cx| async move {
1348            let publish_track = async {
1349                let track = LocalAudioTrack::create();
1350                this.upgrade()
1351                    .ok_or_else(|| anyhow!("room was dropped"))?
1352                    .update(&mut cx, |this, _| {
1353                        this.live_kit
1354                            .as_ref()
1355                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1356                    })?
1357                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1358                    .await
1359            };
1360            let publication = publish_track.await;
1361            this.upgrade()
1362                .ok_or_else(|| anyhow!("room was dropped"))?
1363                .update(&mut cx, |this, cx| {
1364                    let live_kit = this
1365                        .live_kit
1366                        .as_mut()
1367                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1368
1369                    let canceled = if let LocalTrack::Pending {
1370                        publish_id: cur_publish_id,
1371                    } = &live_kit.microphone_track
1372                    {
1373                        *cur_publish_id != publish_id
1374                    } else {
1375                        true
1376                    };
1377
1378                    match publication {
1379                        Ok(publication) => {
1380                            if canceled {
1381                                live_kit.room.unpublish_track(publication);
1382                            } else {
1383                                if live_kit.muted_by_user || live_kit.deafened {
1384                                    cx.background_executor()
1385                                        .spawn(publication.set_mute(true))
1386                                        .detach();
1387                                }
1388                                live_kit.microphone_track = LocalTrack::Published {
1389                                    track_publication: publication,
1390                                };
1391                                cx.notify();
1392                            }
1393                            Ok(())
1394                        }
1395                        Err(error) => {
1396                            if canceled {
1397                                Ok(())
1398                            } else {
1399                                live_kit.microphone_track = LocalTrack::None;
1400                                cx.notify();
1401                                Err(error)
1402                            }
1403                        }
1404                    }
1405                })?
1406        })
1407    }
1408
1409    pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1410        if self.status.is_offline() {
1411            return Task::ready(Err(anyhow!("room is offline")));
1412        } else if self.is_screen_sharing() {
1413            return Task::ready(Err(anyhow!("screen was already shared")));
1414        }
1415
1416        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1417            let publish_id = post_inc(&mut live_kit.next_publish_id);
1418            live_kit.screen_track = LocalTrack::Pending { publish_id };
1419            cx.notify();
1420            (live_kit.room.display_sources(), publish_id)
1421        } else {
1422            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1423        };
1424
1425        cx.spawn(move |this, mut cx| async move {
1426            let publish_track = async {
1427                let displays = displays.await?;
1428                let display = displays
1429                    .first()
1430                    .ok_or_else(|| anyhow!("no display found"))?;
1431                let track = LocalVideoTrack::screen_share_for_display(display);
1432                this.upgrade()
1433                    .ok_or_else(|| anyhow!("room was dropped"))?
1434                    .update(&mut cx, |this, _| {
1435                        this.live_kit
1436                            .as_ref()
1437                            .map(|live_kit| live_kit.room.publish_video_track(track))
1438                    })?
1439                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1440                    .await
1441            };
1442
1443            let publication = publish_track.await;
1444            this.upgrade()
1445                .ok_or_else(|| anyhow!("room was dropped"))?
1446                .update(&mut cx, |this, cx| {
1447                    let live_kit = this
1448                        .live_kit
1449                        .as_mut()
1450                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1451
1452                    let canceled = if let LocalTrack::Pending {
1453                        publish_id: cur_publish_id,
1454                    } = &live_kit.screen_track
1455                    {
1456                        *cur_publish_id != publish_id
1457                    } else {
1458                        true
1459                    };
1460
1461                    match publication {
1462                        Ok(publication) => {
1463                            if canceled {
1464                                live_kit.room.unpublish_track(publication);
1465                            } else {
1466                                live_kit.screen_track = LocalTrack::Published {
1467                                    track_publication: publication,
1468                                };
1469                                cx.notify();
1470                            }
1471
1472                            Audio::play_sound(Sound::StartScreenshare, cx);
1473
1474                            Ok(())
1475                        }
1476                        Err(error) => {
1477                            if canceled {
1478                                Ok(())
1479                            } else {
1480                                live_kit.screen_track = LocalTrack::None;
1481                                cx.notify();
1482                                Err(error)
1483                            }
1484                        }
1485                    }
1486                })?
1487        })
1488    }
1489
1490    pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1491        if let Some(live_kit) = self.live_kit.as_mut() {
1492            // When unmuting, undeafen if the user was deafened before.
1493            let was_deafened = live_kit.deafened;
1494            if live_kit.muted_by_user
1495                || live_kit.deafened
1496                || matches!(live_kit.microphone_track, LocalTrack::None)
1497            {
1498                live_kit.muted_by_user = false;
1499                live_kit.deafened = false;
1500            } else {
1501                live_kit.muted_by_user = true;
1502            }
1503            let muted = live_kit.muted_by_user;
1504            let should_undeafen = was_deafened && !live_kit.deafened;
1505
1506            if let Some(task) = self.set_mute(muted, cx) {
1507                task.detach_and_log_err(cx);
1508            }
1509
1510            if should_undeafen {
1511                if let Some(task) = self.set_deafened(false, cx) {
1512                    task.detach_and_log_err(cx);
1513                }
1514            }
1515        }
1516    }
1517
1518    pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1519        if let Some(live_kit) = self.live_kit.as_mut() {
1520            // When deafening, mute the microphone if it was not already muted.
1521            // When un-deafening, unmute the microphone, unless it was explicitly muted.
1522            let deafened = !live_kit.deafened;
1523            live_kit.deafened = deafened;
1524            let should_change_mute = !live_kit.muted_by_user;
1525
1526            if let Some(task) = self.set_deafened(deafened, cx) {
1527                task.detach_and_log_err(cx);
1528            }
1529
1530            if should_change_mute {
1531                if let Some(task) = self.set_mute(deafened, cx) {
1532                    task.detach_and_log_err(cx);
1533                }
1534            }
1535        }
1536    }
1537
1538    pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1539        if self.status.is_offline() {
1540            return Err(anyhow!("room is offline"));
1541        }
1542
1543        let live_kit = self
1544            .live_kit
1545            .as_mut()
1546            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1547        match mem::take(&mut live_kit.screen_track) {
1548            LocalTrack::None => Err(anyhow!("screen was not shared")),
1549            LocalTrack::Pending { .. } => {
1550                cx.notify();
1551                Ok(())
1552            }
1553            LocalTrack::Published {
1554                track_publication, ..
1555            } => {
1556                live_kit.room.unpublish_track(track_publication);
1557                cx.notify();
1558
1559                Audio::play_sound(Sound::StopScreenshare, cx);
1560                Ok(())
1561            }
1562        }
1563    }
1564
1565    fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1566        let live_kit = self.live_kit.as_mut()?;
1567        cx.notify();
1568
1569        let mut track_updates = Vec::new();
1570        for participant in self.remote_participants.values() {
1571            for publication in live_kit
1572                .room
1573                .remote_audio_track_publications(&participant.user.id.to_string())
1574            {
1575                track_updates.push(publication.set_enabled(!deafened));
1576            }
1577
1578            for track in participant.audio_tracks.values() {
1579                if deafened {
1580                    track.stop();
1581                } else {
1582                    track.start();
1583                }
1584            }
1585        }
1586
1587        Some(cx.foreground_executor().spawn(async move {
1588            for result in futures::future::join_all(track_updates).await {
1589                result?;
1590            }
1591            Ok(())
1592        }))
1593    }
1594
1595    fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1596        let live_kit = self.live_kit.as_mut()?;
1597        cx.notify();
1598
1599        if should_mute {
1600            Audio::play_sound(Sound::Mute, cx);
1601        } else {
1602            Audio::play_sound(Sound::Unmute, cx);
1603        }
1604
1605        match &mut live_kit.microphone_track {
1606            LocalTrack::None => {
1607                if should_mute {
1608                    None
1609                } else {
1610                    Some(self.share_microphone(cx))
1611                }
1612            }
1613            LocalTrack::Pending { .. } => None,
1614            LocalTrack::Published { track_publication } => Some(
1615                cx.foreground_executor()
1616                    .spawn(track_publication.set_mute(should_mute)),
1617            ),
1618        }
1619    }
1620
1621    #[cfg(any(test, feature = "test-support"))]
1622    pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1623        self.live_kit
1624            .as_ref()
1625            .unwrap()
1626            .room
1627            .set_display_sources(sources);
1628    }
1629}
1630
1631struct LiveKitRoom {
1632    room: Arc<livekit_client_macos::Room>,
1633    screen_track: LocalTrack,
1634    microphone_track: LocalTrack,
1635    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1636    muted_by_user: bool,
1637    deafened: bool,
1638    speaking: bool,
1639    next_publish_id: usize,
1640    _maintain_room: Task<()>,
1641    _handle_updates: Task<()>,
1642}
1643
1644impl LiveKitRoom {
1645    fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1646        if let LocalTrack::Published {
1647            track_publication, ..
1648        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1649        {
1650            self.room.unpublish_track(track_publication);
1651            cx.notify();
1652        }
1653
1654        if let LocalTrack::Published {
1655            track_publication, ..
1656        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1657        {
1658            self.room.unpublish_track(track_publication);
1659            cx.notify();
1660        }
1661    }
1662}
1663
1664enum LocalTrack {
1665    None,
1666    Pending {
1667        publish_id: usize,
1668    },
1669    Published {
1670        track_publication: LocalTrackPublication,
1671    },
1672}
1673
1674impl Default for LocalTrack {
1675    fn default() -> Self {
1676        Self::None
1677    }
1678}
1679
1680#[derive(Copy, Clone, PartialEq, Eq)]
1681pub enum RoomStatus {
1682    Online,
1683    Rejoining,
1684    Offline,
1685}
1686
1687impl RoomStatus {
1688    pub fn is_offline(&self) -> bool {
1689        matches!(self, RoomStatus::Offline)
1690    }
1691
1692    pub fn is_online(&self) -> bool {
1693        matches!(self, RoomStatus::Online)
1694    }
1695}