room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{
  19    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  20    RemoteVideoTrackUpdate,
  21};
  22use postage::{sink::Sink, stream::Stream, watch};
  23use project::Project;
  24use settings::Settings as _;
  25use std::{future::Future, mem, sync::Arc, time::Duration};
  26use util::{post_inc, ResultExt, TryFutureExt};
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30#[derive(Clone, Debug, PartialEq, Eq)]
  31pub enum Event {
  32    ParticipantLocationChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteVideoTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteAudioTracksChanged {
  39        participant_id: proto::PeerId,
  40    },
  41    RemoteProjectShared {
  42        owner: Arc<User>,
  43        project_id: u64,
  44        worktree_root_names: Vec<String>,
  45    },
  46    RemoteProjectUnshared {
  47        project_id: u64,
  48    },
  49    RemoteProjectJoined {
  50        project_id: u64,
  51    },
  52    RemoteProjectInvitationDiscarded {
  53        project_id: u64,
  54    },
  55    Left,
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<u64>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakModel<Project>>,
  64    joined_projects: HashSet<WeakModel<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Model<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<u64> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                live_kit_client::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<u64>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Model<UserStore>,
 111        cx: &mut ModelContext<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = live_kit_client::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(|this, mut cx| async move {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == live_kit_client::ConnectionState::Disconnected {
 127                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 128                            .ok();
 129                        break;
 130                    }
 131                }
 132            });
 133
 134            let _maintain_video_tracks = cx.spawn({
 135                let room = room.clone();
 136                move |this, mut cx| async move {
 137                    let mut track_video_changes = room.remote_video_track_updates();
 138                    while let Some(track_change) = track_video_changes.next().await {
 139                        let this = if let Some(this) = this.upgrade() {
 140                            this
 141                        } else {
 142                            break;
 143                        };
 144
 145                        this.update(&mut cx, |this, cx| {
 146                            this.remote_video_track_updated(track_change, cx).log_err()
 147                        })
 148                        .ok();
 149                    }
 150                }
 151            });
 152
 153            let _maintain_audio_tracks = cx.spawn({
 154                let room = room.clone();
 155                |this, mut cx| async move {
 156                    let mut track_audio_changes = room.remote_audio_track_updates();
 157                    while let Some(track_change) = track_audio_changes.next().await {
 158                        let this = if let Some(this) = this.upgrade() {
 159                            this
 160                        } else {
 161                            break;
 162                        };
 163
 164                        this.update(&mut cx, |this, cx| {
 165                            this.remote_audio_track_updated(track_change, cx).log_err()
 166                        })
 167                        .ok();
 168                    }
 169                }
 170            });
 171
 172            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 173            cx.spawn(|this, mut cx| async move {
 174                connect.await?;
 175
 176                let is_read_only = this
 177                    .update(&mut cx, |room, _| room.read_only())
 178                    .unwrap_or(true);
 179
 180                if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only {
 181                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 182                        .await?;
 183                }
 184
 185                anyhow::Ok(())
 186            })
 187            .detach_and_log_err(cx);
 188
 189            Some(LiveKitRoom {
 190                room,
 191                screen_track: LocalTrack::None,
 192                microphone_track: LocalTrack::None,
 193                next_publish_id: 0,
 194                muted_by_user: false,
 195                deafened: false,
 196                speaking: false,
 197                _maintain_room,
 198                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 199            })
 200        } else {
 201            None
 202        };
 203
 204        let maintain_connection = cx.spawn({
 205            let client = client.clone();
 206            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 207        });
 208
 209        Audio::play_sound(Sound::Joined, cx);
 210
 211        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 212
 213        Self {
 214            id,
 215            channel_id,
 216            live_kit: live_kit_room,
 217            status: RoomStatus::Online,
 218            shared_projects: Default::default(),
 219            joined_projects: Default::default(),
 220            participant_user_ids: Default::default(),
 221            local_participant: Default::default(),
 222            remote_participants: Default::default(),
 223            pending_participants: Default::default(),
 224            pending_call_count: 0,
 225            client_subscriptions: vec![
 226                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 227            ],
 228            _subscriptions: vec![
 229                cx.on_release(Self::released),
 230                cx.on_app_quit(Self::app_will_quit),
 231            ],
 232            leave_when_empty: false,
 233            pending_room_update: None,
 234            client,
 235            user_store,
 236            follows_by_leader_id_project_id: Default::default(),
 237            maintain_connection: Some(maintain_connection),
 238            room_update_completed_tx,
 239            room_update_completed_rx,
 240        }
 241    }
 242
 243    pub(crate) fn create(
 244        called_user_id: u64,
 245        initial_project: Option<Model<Project>>,
 246        client: Arc<Client>,
 247        user_store: Model<UserStore>,
 248        cx: &mut AppContext,
 249    ) -> Task<Result<Model<Self>>> {
 250        cx.spawn(move |mut cx| async move {
 251            let response = client.request(proto::CreateRoom {}).await?;
 252            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 253            let room = cx.new_model(|cx| {
 254                let mut room = Self::new(
 255                    room_proto.id,
 256                    None,
 257                    response.live_kit_connection_info,
 258                    client,
 259                    user_store,
 260                    cx,
 261                );
 262                if let Some(participant) = room_proto.participants.first() {
 263                    room.local_participant.role = participant.role()
 264                }
 265                room
 266            })?;
 267
 268            let initial_project_id = if let Some(initial_project) = initial_project {
 269                let initial_project_id = room
 270                    .update(&mut cx, |room, cx| {
 271                        room.share_project(initial_project.clone(), cx)
 272                    })?
 273                    .await?;
 274                Some(initial_project_id)
 275            } else {
 276                None
 277            };
 278
 279            match room
 280                .update(&mut cx, |room, cx| {
 281                    room.leave_when_empty = true;
 282                    room.call(called_user_id, initial_project_id, cx)
 283                })?
 284                .await
 285            {
 286                Ok(()) => Ok(room),
 287                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 288            }
 289        })
 290    }
 291
 292    pub(crate) async fn join_channel(
 293        channel_id: u64,
 294        client: Arc<Client>,
 295        user_store: Model<UserStore>,
 296        cx: AsyncAppContext,
 297    ) -> Result<Model<Self>> {
 298        Self::from_join_response(
 299            client.request(proto::JoinChannel { channel_id }).await?,
 300            client,
 301            user_store,
 302            cx,
 303        )
 304    }
 305
 306    pub(crate) async fn join(
 307        room_id: u64,
 308        client: Arc<Client>,
 309        user_store: Model<UserStore>,
 310        cx: AsyncAppContext,
 311    ) -> Result<Model<Self>> {
 312        Self::from_join_response(
 313            client.request(proto::JoinRoom { id: room_id }).await?,
 314            client,
 315            user_store,
 316            cx,
 317        )
 318    }
 319
 320    fn released(&mut self, cx: &mut AppContext) {
 321        if self.status.is_online() {
 322            self.leave_internal(cx).detach_and_log_err(cx);
 323        }
 324    }
 325
 326    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 327        let task = if self.status.is_online() {
 328            let leave = self.leave_internal(cx);
 329            Some(cx.background_executor().spawn(async move {
 330                leave.await.log_err();
 331            }))
 332        } else {
 333            None
 334        };
 335
 336        async move {
 337            if let Some(task) = task {
 338                task.await;
 339            }
 340        }
 341    }
 342
 343    pub fn mute_on_join(cx: &AppContext) -> bool {
 344        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 345    }
 346
 347    fn from_join_response(
 348        response: proto::JoinRoomResponse,
 349        client: Arc<Client>,
 350        user_store: Model<UserStore>,
 351        mut cx: AsyncAppContext,
 352    ) -> Result<Model<Self>> {
 353        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 354        let room = cx.new_model(|cx| {
 355            Self::new(
 356                room_proto.id,
 357                response.channel_id,
 358                response.live_kit_connection_info,
 359                client,
 360                user_store,
 361                cx,
 362            )
 363        })?;
 364        room.update(&mut cx, |room, cx| {
 365            room.leave_when_empty = room.channel_id.is_none();
 366            room.apply_room_update(room_proto, cx)?;
 367            anyhow::Ok(())
 368        })??;
 369        Ok(room)
 370    }
 371
 372    fn should_leave(&self) -> bool {
 373        self.leave_when_empty
 374            && self.pending_room_update.is_none()
 375            && self.pending_participants.is_empty()
 376            && self.remote_participants.is_empty()
 377            && self.pending_call_count == 0
 378    }
 379
 380    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 381        cx.notify();
 382        cx.emit(Event::Left);
 383        self.leave_internal(cx)
 384    }
 385
 386    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 387        if self.status.is_offline() {
 388            return Task::ready(Err(anyhow!("room is offline")));
 389        }
 390
 391        log::info!("leaving room");
 392        Audio::play_sound(Sound::Leave, cx);
 393
 394        self.clear_state(cx);
 395
 396        let leave_room = self.client.request(proto::LeaveRoom {});
 397        cx.background_executor().spawn(async move {
 398            leave_room.await?;
 399            anyhow::Ok(())
 400        })
 401    }
 402
 403    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 404        for project in self.shared_projects.drain() {
 405            if let Some(project) = project.upgrade() {
 406                project.update(cx, |project, cx| {
 407                    project.unshare(cx).log_err();
 408                });
 409            }
 410        }
 411        for project in self.joined_projects.drain() {
 412            if let Some(project) = project.upgrade() {
 413                project.update(cx, |project, cx| {
 414                    project.disconnected_from_host(cx);
 415                    project.close(cx);
 416                });
 417            }
 418        }
 419
 420        self.status = RoomStatus::Offline;
 421        self.remote_participants.clear();
 422        self.pending_participants.clear();
 423        self.participant_user_ids.clear();
 424        self.client_subscriptions.clear();
 425        self.live_kit.take();
 426        self.pending_room_update.take();
 427        self.maintain_connection.take();
 428    }
 429
 430    async fn maintain_connection(
 431        this: WeakModel<Self>,
 432        client: Arc<Client>,
 433        mut cx: AsyncAppContext,
 434    ) -> Result<()> {
 435        let mut client_status = client.status();
 436        loop {
 437            let _ = client_status.try_recv();
 438            let is_connected = client_status.borrow().is_connected();
 439            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 440            if !is_connected || client_status.next().await.is_some() {
 441                log::info!("detected client disconnection");
 442
 443                this.upgrade()
 444                    .ok_or_else(|| anyhow!("room was dropped"))?
 445                    .update(&mut cx, |this, cx| {
 446                        this.status = RoomStatus::Rejoining;
 447                        cx.notify();
 448                    })?;
 449
 450                // Wait for client to re-establish a connection to the server.
 451                {
 452                    let mut reconnection_timeout =
 453                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 454                    let client_reconnection = async {
 455                        let mut remaining_attempts = 3;
 456                        while remaining_attempts > 0 {
 457                            if client_status.borrow().is_connected() {
 458                                log::info!("client reconnected, attempting to rejoin room");
 459
 460                                let Some(this) = this.upgrade() else { break };
 461                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 462                                    Ok(task) => {
 463                                        if task.await.log_err().is_some() {
 464                                            return true;
 465                                        } else {
 466                                            remaining_attempts -= 1;
 467                                        }
 468                                    }
 469                                    Err(_app_dropped) => return false,
 470                                }
 471                            } else if client_status.borrow().is_signed_out() {
 472                                return false;
 473                            }
 474
 475                            log::info!(
 476                                "waiting for client status change, remaining attempts {}",
 477                                remaining_attempts
 478                            );
 479                            client_status.next().await;
 480                        }
 481                        false
 482                    }
 483                    .fuse();
 484                    futures::pin_mut!(client_reconnection);
 485
 486                    futures::select_biased! {
 487                        reconnected = client_reconnection => {
 488                            if reconnected {
 489                                log::info!("successfully reconnected to room");
 490                                // If we successfully joined the room, go back around the loop
 491                                // waiting for future connection status changes.
 492                                continue;
 493                            }
 494                        }
 495                        _ = reconnection_timeout => {
 496                            log::info!("room reconnection timeout expired");
 497                        }
 498                    }
 499                }
 500
 501                break;
 502            }
 503        }
 504
 505        // The client failed to re-establish a connection to the server
 506        // or an error occurred while trying to re-join the room. Either way
 507        // we leave the room and return an error.
 508        if let Some(this) = this.upgrade() {
 509            log::info!("reconnection failed, leaving room");
 510            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 511        }
 512        Err(anyhow!(
 513            "can't reconnect to room: client failed to re-establish connection"
 514        ))
 515    }
 516
 517    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 518        let mut projects = HashMap::default();
 519        let mut reshared_projects = Vec::new();
 520        let mut rejoined_projects = Vec::new();
 521        self.shared_projects.retain(|project| {
 522            if let Some(handle) = project.upgrade() {
 523                let project = handle.read(cx);
 524                if let Some(project_id) = project.remote_id() {
 525                    projects.insert(project_id, handle.clone());
 526                    reshared_projects.push(proto::UpdateProject {
 527                        project_id,
 528                        worktrees: project.worktree_metadata_protos(cx),
 529                    });
 530                    return true;
 531                }
 532            }
 533            false
 534        });
 535        self.joined_projects.retain(|project| {
 536            if let Some(handle) = project.upgrade() {
 537                let project = handle.read(cx);
 538                if let Some(project_id) = project.remote_id() {
 539                    projects.insert(project_id, handle.clone());
 540                    rejoined_projects.push(proto::RejoinProject {
 541                        id: project_id,
 542                        worktrees: project
 543                            .worktrees()
 544                            .map(|worktree| {
 545                                let worktree = worktree.read(cx);
 546                                proto::RejoinWorktree {
 547                                    id: worktree.id().to_proto(),
 548                                    scan_id: worktree.completed_scan_id() as u64,
 549                                }
 550                            })
 551                            .collect(),
 552                    });
 553                }
 554                return true;
 555            }
 556            false
 557        });
 558
 559        let response = self.client.request_envelope(proto::RejoinRoom {
 560            id: self.id,
 561            reshared_projects,
 562            rejoined_projects,
 563        });
 564
 565        cx.spawn(|this, mut cx| async move {
 566            let response = response.await?;
 567            let message_id = response.message_id;
 568            let response = response.payload;
 569            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 570            this.update(&mut cx, |this, cx| {
 571                this.status = RoomStatus::Online;
 572                this.apply_room_update(room_proto, cx)?;
 573
 574                for reshared_project in response.reshared_projects {
 575                    if let Some(project) = projects.get(&reshared_project.id) {
 576                        project.update(cx, |project, cx| {
 577                            project.reshared(reshared_project, cx).log_err();
 578                        });
 579                    }
 580                }
 581
 582                for rejoined_project in response.rejoined_projects {
 583                    if let Some(project) = projects.get(&rejoined_project.id) {
 584                        project.update(cx, |project, cx| {
 585                            project.rejoined(rejoined_project, message_id, cx).log_err();
 586                        });
 587                    }
 588                }
 589
 590                anyhow::Ok(())
 591            })?
 592        })
 593    }
 594
 595    pub fn id(&self) -> u64 {
 596        self.id
 597    }
 598
 599    pub fn status(&self) -> RoomStatus {
 600        self.status
 601    }
 602
 603    pub fn local_participant(&self) -> &LocalParticipant {
 604        &self.local_participant
 605    }
 606
 607    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 608        &self.remote_participants
 609    }
 610
 611    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 612        self.remote_participants
 613            .values()
 614            .find(|p| p.peer_id == peer_id)
 615    }
 616
 617    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 618        self.remote_participants
 619            .get(&user_id)
 620            .map(|participant| participant.role)
 621    }
 622
 623    pub fn local_participant_is_admin(&self) -> bool {
 624        self.local_participant.role == proto::ChannelRole::Admin
 625    }
 626
 627    pub fn set_participant_role(
 628        &mut self,
 629        user_id: u64,
 630        role: proto::ChannelRole,
 631        cx: &ModelContext<Self>,
 632    ) -> Task<Result<()>> {
 633        let client = self.client.clone();
 634        let room_id = self.id;
 635        let role = role.into();
 636        cx.spawn(|_, _| async move {
 637            client
 638                .request(proto::SetRoomParticipantRole {
 639                    room_id,
 640                    user_id,
 641                    role,
 642                })
 643                .await
 644                .map(|_| ())
 645        })
 646    }
 647
 648    pub fn pending_participants(&self) -> &[Arc<User>] {
 649        &self.pending_participants
 650    }
 651
 652    pub fn contains_participant(&self, user_id: u64) -> bool {
 653        self.participant_user_ids.contains(&user_id)
 654    }
 655
 656    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 657        self.follows_by_leader_id_project_id
 658            .get(&(leader_id, project_id))
 659            .map_or(&[], |v| v.as_slice())
 660    }
 661
 662    /// Returns the most 'active' projects, defined as most people in the project
 663    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 664        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 665        for participant in self.remote_participants.values() {
 666            match participant.location {
 667                ParticipantLocation::SharedProject { project_id } => {
 668                    project_hosts_and_guest_counts
 669                        .entry(project_id)
 670                        .or_default()
 671                        .1 += 1;
 672                }
 673                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 674            }
 675            for project in &participant.projects {
 676                project_hosts_and_guest_counts
 677                    .entry(project.id)
 678                    .or_default()
 679                    .0 = Some(participant.user.id);
 680            }
 681        }
 682
 683        if let Some(user) = self.user_store.read(cx).current_user() {
 684            for project in &self.local_participant.projects {
 685                project_hosts_and_guest_counts
 686                    .entry(project.id)
 687                    .or_default()
 688                    .0 = Some(user.id);
 689            }
 690        }
 691
 692        project_hosts_and_guest_counts
 693            .into_iter()
 694            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 695            .max_by_key(|(_, _, guest_count)| *guest_count)
 696            .map(|(id, host, _)| (id, host))
 697    }
 698
 699    async fn handle_room_updated(
 700        this: Model<Self>,
 701        envelope: TypedEnvelope<proto::RoomUpdated>,
 702        _: Arc<Client>,
 703        mut cx: AsyncAppContext,
 704    ) -> Result<()> {
 705        let room = envelope
 706            .payload
 707            .room
 708            .ok_or_else(|| anyhow!("invalid room"))?;
 709        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 710    }
 711
 712    fn apply_room_update(
 713        &mut self,
 714        mut room: proto::Room,
 715        cx: &mut ModelContext<Self>,
 716    ) -> Result<()> {
 717        // Filter ourselves out from the room's participants.
 718        let local_participant_ix = room
 719            .participants
 720            .iter()
 721            .position(|participant| Some(participant.user_id) == self.client.user_id());
 722        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 723
 724        let pending_participant_user_ids = room
 725            .pending_participants
 726            .iter()
 727            .map(|p| p.user_id)
 728            .collect::<Vec<_>>();
 729
 730        let remote_participant_user_ids = room
 731            .participants
 732            .iter()
 733            .map(|p| p.user_id)
 734            .collect::<Vec<_>>();
 735
 736        let (remote_participants, pending_participants) =
 737            self.user_store.update(cx, move |user_store, cx| {
 738                (
 739                    user_store.get_users(remote_participant_user_ids, cx),
 740                    user_store.get_users(pending_participant_user_ids, cx),
 741                )
 742            });
 743
 744        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 745            let (remote_participants, pending_participants) =
 746                futures::join!(remote_participants, pending_participants);
 747
 748            this.update(&mut cx, |this, cx| {
 749                this.participant_user_ids.clear();
 750
 751                if let Some(participant) = local_participant {
 752                    let role = participant.role();
 753                    this.local_participant.projects = participant.projects;
 754                    if this.local_participant.role != role {
 755                        this.local_participant.role = role;
 756
 757                        if role == proto::ChannelRole::Guest {
 758                            for project in mem::take(&mut this.shared_projects) {
 759                                if let Some(project) = project.upgrade() {
 760                                    this.unshare_project(project, cx).log_err();
 761                                }
 762                            }
 763                            this.local_participant.projects.clear();
 764                            if let Some(live_kit_room) = &mut this.live_kit {
 765                                live_kit_room.stop_publishing(cx);
 766                            }
 767                        }
 768
 769                        this.joined_projects.retain(|project| {
 770                            if let Some(project) = project.upgrade() {
 771                                project.update(cx, |project, cx| project.set_role(role, cx));
 772                                true
 773                            } else {
 774                                false
 775                            }
 776                        });
 777                    }
 778                } else {
 779                    this.local_participant.projects.clear();
 780                }
 781
 782                if let Some(participants) = remote_participants.log_err() {
 783                    for (participant, user) in room.participants.into_iter().zip(participants) {
 784                        let Some(peer_id) = participant.peer_id else {
 785                            continue;
 786                        };
 787                        let participant_index = ParticipantIndex(participant.participant_index);
 788                        this.participant_user_ids.insert(participant.user_id);
 789
 790                        let old_projects = this
 791                            .remote_participants
 792                            .get(&participant.user_id)
 793                            .into_iter()
 794                            .flat_map(|existing| &existing.projects)
 795                            .map(|project| project.id)
 796                            .collect::<HashSet<_>>();
 797                        let new_projects = participant
 798                            .projects
 799                            .iter()
 800                            .map(|project| project.id)
 801                            .collect::<HashSet<_>>();
 802
 803                        for project in &participant.projects {
 804                            if !old_projects.contains(&project.id) {
 805                                cx.emit(Event::RemoteProjectShared {
 806                                    owner: user.clone(),
 807                                    project_id: project.id,
 808                                    worktree_root_names: project.worktree_root_names.clone(),
 809                                });
 810                            }
 811                        }
 812
 813                        for unshared_project_id in old_projects.difference(&new_projects) {
 814                            this.joined_projects.retain(|project| {
 815                                if let Some(project) = project.upgrade() {
 816                                    project.update(cx, |project, cx| {
 817                                        if project.remote_id() == Some(*unshared_project_id) {
 818                                            project.disconnected_from_host(cx);
 819                                            false
 820                                        } else {
 821                                            true
 822                                        }
 823                                    })
 824                                } else {
 825                                    false
 826                                }
 827                            });
 828                            cx.emit(Event::RemoteProjectUnshared {
 829                                project_id: *unshared_project_id,
 830                            });
 831                        }
 832
 833                        let role = participant.role();
 834                        let location = ParticipantLocation::from_proto(participant.location)
 835                            .unwrap_or(ParticipantLocation::External);
 836                        if let Some(remote_participant) =
 837                            this.remote_participants.get_mut(&participant.user_id)
 838                        {
 839                            remote_participant.peer_id = peer_id;
 840                            remote_participant.projects = participant.projects;
 841                            remote_participant.participant_index = participant_index;
 842                            if location != remote_participant.location
 843                                || role != remote_participant.role
 844                            {
 845                                remote_participant.location = location;
 846                                remote_participant.role = role;
 847                                cx.emit(Event::ParticipantLocationChanged {
 848                                    participant_id: peer_id,
 849                                });
 850                            }
 851                        } else {
 852                            this.remote_participants.insert(
 853                                participant.user_id,
 854                                RemoteParticipant {
 855                                    user: user.clone(),
 856                                    participant_index,
 857                                    peer_id,
 858                                    projects: participant.projects,
 859                                    location,
 860                                    role,
 861                                    muted: true,
 862                                    speaking: false,
 863                                    video_tracks: Default::default(),
 864                                    audio_tracks: Default::default(),
 865                                },
 866                            );
 867
 868                            Audio::play_sound(Sound::Joined, cx);
 869
 870                            if let Some(live_kit) = this.live_kit.as_ref() {
 871                                let video_tracks =
 872                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 873                                let audio_tracks =
 874                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 875                                let publications = live_kit
 876                                    .room
 877                                    .remote_audio_track_publications(&user.id.to_string());
 878
 879                                for track in video_tracks {
 880                                    this.remote_video_track_updated(
 881                                        RemoteVideoTrackUpdate::Subscribed(track),
 882                                        cx,
 883                                    )
 884                                    .log_err();
 885                                }
 886
 887                                for (track, publication) in
 888                                    audio_tracks.iter().zip(publications.iter())
 889                                {
 890                                    this.remote_audio_track_updated(
 891                                        RemoteAudioTrackUpdate::Subscribed(
 892                                            track.clone(),
 893                                            publication.clone(),
 894                                        ),
 895                                        cx,
 896                                    )
 897                                    .log_err();
 898                                }
 899                            }
 900                        }
 901                    }
 902
 903                    this.remote_participants.retain(|user_id, participant| {
 904                        if this.participant_user_ids.contains(user_id) {
 905                            true
 906                        } else {
 907                            for project in &participant.projects {
 908                                cx.emit(Event::RemoteProjectUnshared {
 909                                    project_id: project.id,
 910                                });
 911                            }
 912                            false
 913                        }
 914                    });
 915                }
 916
 917                if let Some(pending_participants) = pending_participants.log_err() {
 918                    this.pending_participants = pending_participants;
 919                    for participant in &this.pending_participants {
 920                        this.participant_user_ids.insert(participant.id);
 921                    }
 922                }
 923
 924                this.follows_by_leader_id_project_id.clear();
 925                for follower in room.followers {
 926                    let project_id = follower.project_id;
 927                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 928                        (Some(leader), Some(follower)) => (leader, follower),
 929
 930                        _ => {
 931                            log::error!("Follower message {follower:?} missing some state");
 932                            continue;
 933                        }
 934                    };
 935
 936                    let list = this
 937                        .follows_by_leader_id_project_id
 938                        .entry((leader, project_id))
 939                        .or_insert(Vec::new());
 940                    if !list.contains(&follower) {
 941                        list.push(follower);
 942                    }
 943                }
 944
 945                this.pending_room_update.take();
 946                if this.should_leave() {
 947                    log::info!("room is empty, leaving");
 948                    let _ = this.leave(cx);
 949                }
 950
 951                this.user_store.update(cx, |user_store, cx| {
 952                    let participant_indices_by_user_id = this
 953                        .remote_participants
 954                        .iter()
 955                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 956                        .collect();
 957                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 958                });
 959
 960                this.check_invariants();
 961                this.room_update_completed_tx.try_send(Some(())).ok();
 962                cx.notify();
 963            })
 964            .ok();
 965        }));
 966
 967        cx.notify();
 968        Ok(())
 969    }
 970
 971    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 972        let mut done_rx = self.room_update_completed_rx.clone();
 973        async move {
 974            while let Some(result) = done_rx.next().await {
 975                if result.is_some() {
 976                    break;
 977                }
 978            }
 979        }
 980    }
 981
 982    fn remote_video_track_updated(
 983        &mut self,
 984        change: RemoteVideoTrackUpdate,
 985        cx: &mut ModelContext<Self>,
 986    ) -> Result<()> {
 987        match change {
 988            RemoteVideoTrackUpdate::Subscribed(track) => {
 989                let user_id = track.publisher_id().parse()?;
 990                let track_id = track.sid().to_string();
 991                let participant = self
 992                    .remote_participants
 993                    .get_mut(&user_id)
 994                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 995                participant.video_tracks.insert(track_id.clone(), track);
 996                cx.emit(Event::RemoteVideoTracksChanged {
 997                    participant_id: participant.peer_id,
 998                });
 999            }
1000            RemoteVideoTrackUpdate::Unsubscribed {
1001                publisher_id,
1002                track_id,
1003            } => {
1004                let user_id = publisher_id.parse()?;
1005                let participant = self
1006                    .remote_participants
1007                    .get_mut(&user_id)
1008                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1009                participant.video_tracks.remove(&track_id);
1010                cx.emit(Event::RemoteVideoTracksChanged {
1011                    participant_id: participant.peer_id,
1012                });
1013            }
1014        }
1015
1016        cx.notify();
1017        Ok(())
1018    }
1019
1020    fn remote_audio_track_updated(
1021        &mut self,
1022        change: RemoteAudioTrackUpdate,
1023        cx: &mut ModelContext<Self>,
1024    ) -> Result<()> {
1025        match change {
1026            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
1027                let mut speaker_ids = speakers
1028                    .into_iter()
1029                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
1030                    .collect::<Vec<u64>>();
1031                speaker_ids.sort_unstable();
1032                for (sid, participant) in &mut self.remote_participants {
1033                    if let Ok(_) = speaker_ids.binary_search(sid) {
1034                        participant.speaking = true;
1035                    } else {
1036                        participant.speaking = false;
1037                    }
1038                }
1039                if let Some(id) = self.client.user_id() {
1040                    if let Some(room) = &mut self.live_kit {
1041                        if let Ok(_) = speaker_ids.binary_search(&id) {
1042                            room.speaking = true;
1043                        } else {
1044                            room.speaking = false;
1045                        }
1046                    }
1047                }
1048                cx.notify();
1049            }
1050            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
1051                let mut found = false;
1052                for participant in &mut self.remote_participants.values_mut() {
1053                    for track in participant.audio_tracks.values() {
1054                        if track.sid() == track_id {
1055                            found = true;
1056                            break;
1057                        }
1058                    }
1059                    if found {
1060                        participant.muted = muted;
1061                        break;
1062                    }
1063                }
1064
1065                cx.notify();
1066            }
1067            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1068                let user_id = track.publisher_id().parse()?;
1069                let track_id = track.sid().to_string();
1070                let participant = self
1071                    .remote_participants
1072                    .get_mut(&user_id)
1073                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1074                participant.audio_tracks.insert(track_id.clone(), track);
1075                participant.muted = publication.is_muted();
1076
1077                cx.emit(Event::RemoteAudioTracksChanged {
1078                    participant_id: participant.peer_id,
1079                });
1080            }
1081            RemoteAudioTrackUpdate::Unsubscribed {
1082                publisher_id,
1083                track_id,
1084            } => {
1085                let user_id = publisher_id.parse()?;
1086                let participant = self
1087                    .remote_participants
1088                    .get_mut(&user_id)
1089                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1090                participant.audio_tracks.remove(&track_id);
1091                cx.emit(Event::RemoteAudioTracksChanged {
1092                    participant_id: participant.peer_id,
1093                });
1094            }
1095        }
1096
1097        cx.notify();
1098        Ok(())
1099    }
1100
1101    fn check_invariants(&self) {
1102        #[cfg(any(test, feature = "test-support"))]
1103        {
1104            for participant in self.remote_participants.values() {
1105                assert!(self.participant_user_ids.contains(&participant.user.id));
1106                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1107            }
1108
1109            for participant in &self.pending_participants {
1110                assert!(self.participant_user_ids.contains(&participant.id));
1111                assert_ne!(participant.id, self.client.user_id().unwrap());
1112            }
1113
1114            assert_eq!(
1115                self.participant_user_ids.len(),
1116                self.remote_participants.len() + self.pending_participants.len()
1117            );
1118        }
1119    }
1120
1121    pub(crate) fn call(
1122        &mut self,
1123        called_user_id: u64,
1124        initial_project_id: Option<u64>,
1125        cx: &mut ModelContext<Self>,
1126    ) -> Task<Result<()>> {
1127        if self.status.is_offline() {
1128            return Task::ready(Err(anyhow!("room is offline")));
1129        }
1130
1131        cx.notify();
1132        let client = self.client.clone();
1133        let room_id = self.id;
1134        self.pending_call_count += 1;
1135        cx.spawn(move |this, mut cx| async move {
1136            let result = client
1137                .request(proto::Call {
1138                    room_id,
1139                    called_user_id,
1140                    initial_project_id,
1141                })
1142                .await;
1143            this.update(&mut cx, |this, cx| {
1144                this.pending_call_count -= 1;
1145                if this.should_leave() {
1146                    this.leave(cx).detach_and_log_err(cx);
1147                }
1148            })?;
1149            result?;
1150            Ok(())
1151        })
1152    }
1153
1154    pub fn join_project(
1155        &mut self,
1156        id: u64,
1157        language_registry: Arc<LanguageRegistry>,
1158        fs: Arc<dyn Fs>,
1159        cx: &mut ModelContext<Self>,
1160    ) -> Task<Result<Model<Project>>> {
1161        let client = self.client.clone();
1162        let user_store = self.user_store.clone();
1163        let role = self.local_participant.role;
1164        cx.emit(Event::RemoteProjectJoined { project_id: id });
1165        cx.spawn(move |this, mut cx| async move {
1166            let project = Project::remote(
1167                id,
1168                client,
1169                user_store,
1170                language_registry,
1171                fs,
1172                role,
1173                cx.clone(),
1174            )
1175            .await?;
1176
1177            this.update(&mut cx, |this, cx| {
1178                this.joined_projects.retain(|project| {
1179                    if let Some(project) = project.upgrade() {
1180                        !project.read(cx).is_disconnected()
1181                    } else {
1182                        false
1183                    }
1184                });
1185                this.joined_projects.insert(project.downgrade());
1186            })?;
1187            Ok(project)
1188        })
1189    }
1190
1191    pub(crate) fn share_project(
1192        &mut self,
1193        project: Model<Project>,
1194        cx: &mut ModelContext<Self>,
1195    ) -> Task<Result<u64>> {
1196        if let Some(project_id) = project.read(cx).remote_id() {
1197            return Task::ready(Ok(project_id));
1198        }
1199
1200        let request = self.client.request(proto::ShareProject {
1201            room_id: self.id(),
1202            worktrees: project.read(cx).worktree_metadata_protos(cx),
1203        });
1204        cx.spawn(|this, mut cx| async move {
1205            let response = request.await?;
1206
1207            project.update(&mut cx, |project, cx| {
1208                project.shared(response.project_id, cx)
1209            })??;
1210
1211            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1212            this.update(&mut cx, |this, cx| {
1213                this.shared_projects.insert(project.downgrade());
1214                let active_project = this.local_participant.active_project.as_ref();
1215                if active_project.map_or(false, |location| *location == project) {
1216                    this.set_location(Some(&project), cx)
1217                } else {
1218                    Task::ready(Ok(()))
1219                }
1220            })?
1221            .await?;
1222
1223            Ok(response.project_id)
1224        })
1225    }
1226
1227    pub(crate) fn unshare_project(
1228        &mut self,
1229        project: Model<Project>,
1230        cx: &mut ModelContext<Self>,
1231    ) -> Result<()> {
1232        let project_id = match project.read(cx).remote_id() {
1233            Some(project_id) => project_id,
1234            None => return Ok(()),
1235        };
1236
1237        self.client.send(proto::UnshareProject { project_id })?;
1238        project.update(cx, |this, cx| this.unshare(cx))
1239    }
1240
1241    pub(crate) fn set_location(
1242        &mut self,
1243        project: Option<&Model<Project>>,
1244        cx: &mut ModelContext<Self>,
1245    ) -> Task<Result<()>> {
1246        if self.status.is_offline() {
1247            return Task::ready(Err(anyhow!("room is offline")));
1248        }
1249
1250        let client = self.client.clone();
1251        let room_id = self.id;
1252        let location = if let Some(project) = project {
1253            self.local_participant.active_project = Some(project.downgrade());
1254            if let Some(project_id) = project.read(cx).remote_id() {
1255                proto::participant_location::Variant::SharedProject(
1256                    proto::participant_location::SharedProject { id: project_id },
1257                )
1258            } else {
1259                proto::participant_location::Variant::UnsharedProject(
1260                    proto::participant_location::UnsharedProject {},
1261                )
1262            }
1263        } else {
1264            self.local_participant.active_project = None;
1265            proto::participant_location::Variant::External(proto::participant_location::External {})
1266        };
1267
1268        cx.notify();
1269        cx.background_executor().spawn(async move {
1270            client
1271                .request(proto::UpdateParticipantLocation {
1272                    room_id,
1273                    location: Some(proto::ParticipantLocation {
1274                        variant: Some(location),
1275                    }),
1276                })
1277                .await?;
1278            Ok(())
1279        })
1280    }
1281
1282    pub fn is_screen_sharing(&self) -> bool {
1283        self.live_kit.as_ref().map_or(false, |live_kit| {
1284            !matches!(live_kit.screen_track, LocalTrack::None)
1285        })
1286    }
1287
1288    pub fn is_sharing_mic(&self) -> bool {
1289        self.live_kit.as_ref().map_or(false, |live_kit| {
1290            !matches!(live_kit.microphone_track, LocalTrack::None)
1291        })
1292    }
1293
1294    pub fn is_muted(&self, cx: &AppContext) -> bool {
1295        self.live_kit
1296            .as_ref()
1297            .and_then(|live_kit| match &live_kit.microphone_track {
1298                LocalTrack::None => Some(Self::mute_on_join(cx)),
1299                LocalTrack::Pending { muted, .. } => Some(*muted),
1300                LocalTrack::Published { muted, .. } => Some(*muted),
1301            })
1302            .unwrap_or(false)
1303    }
1304
1305    pub fn read_only(&self) -> bool {
1306        !(self.local_participant().role == proto::ChannelRole::Member
1307            || self.local_participant().role == proto::ChannelRole::Admin)
1308    }
1309
1310    pub fn is_speaking(&self) -> bool {
1311        self.live_kit
1312            .as_ref()
1313            .map_or(false, |live_kit| live_kit.speaking)
1314    }
1315
1316    pub fn is_deafened(&self) -> Option<bool> {
1317        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1318    }
1319
1320    #[track_caller]
1321    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1322        if self.status.is_offline() {
1323            return Task::ready(Err(anyhow!("room is offline")));
1324        } else if self.is_sharing_mic() {
1325            return Task::ready(Err(anyhow!("microphone was already shared")));
1326        }
1327
1328        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1329            let publish_id = post_inc(&mut live_kit.next_publish_id);
1330            live_kit.microphone_track = LocalTrack::Pending {
1331                publish_id,
1332                muted: false,
1333            };
1334            cx.notify();
1335            publish_id
1336        } else {
1337            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1338        };
1339
1340        cx.spawn(move |this, mut cx| async move {
1341            let publish_track = async {
1342                let track = LocalAudioTrack::create();
1343                this.upgrade()
1344                    .ok_or_else(|| anyhow!("room was dropped"))?
1345                    .update(&mut cx, |this, _| {
1346                        this.live_kit
1347                            .as_ref()
1348                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1349                    })?
1350                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1351                    .await
1352            };
1353            let publication = publish_track.await;
1354            this.upgrade()
1355                .ok_or_else(|| anyhow!("room was dropped"))?
1356                .update(&mut cx, |this, cx| {
1357                    let live_kit = this
1358                        .live_kit
1359                        .as_mut()
1360                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1361
1362                    let (canceled, muted) = if let LocalTrack::Pending {
1363                        publish_id: cur_publish_id,
1364                        muted,
1365                    } = &live_kit.microphone_track
1366                    {
1367                        (*cur_publish_id != publish_id, *muted)
1368                    } else {
1369                        (true, false)
1370                    };
1371
1372                    match publication {
1373                        Ok(publication) => {
1374                            if canceled {
1375                                live_kit.room.unpublish_track(publication);
1376                            } else {
1377                                if muted {
1378                                    cx.background_executor()
1379                                        .spawn(publication.set_mute(muted))
1380                                        .detach();
1381                                }
1382                                live_kit.microphone_track = LocalTrack::Published {
1383                                    track_publication: publication,
1384                                    muted,
1385                                };
1386                                cx.notify();
1387                            }
1388                            Ok(())
1389                        }
1390                        Err(error) => {
1391                            if canceled {
1392                                Ok(())
1393                            } else {
1394                                live_kit.microphone_track = LocalTrack::None;
1395                                cx.notify();
1396                                Err(error)
1397                            }
1398                        }
1399                    }
1400                })?
1401        })
1402    }
1403
1404    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1405        if self.status.is_offline() {
1406            return Task::ready(Err(anyhow!("room is offline")));
1407        } else if self.is_screen_sharing() {
1408            return Task::ready(Err(anyhow!("screen was already shared")));
1409        }
1410
1411        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1412            let publish_id = post_inc(&mut live_kit.next_publish_id);
1413            live_kit.screen_track = LocalTrack::Pending {
1414                publish_id,
1415                muted: false,
1416            };
1417            cx.notify();
1418            (live_kit.room.display_sources(), publish_id)
1419        } else {
1420            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1421        };
1422
1423        cx.spawn(move |this, mut cx| async move {
1424            let publish_track = async {
1425                let displays = displays.await?;
1426                let display = displays
1427                    .first()
1428                    .ok_or_else(|| anyhow!("no display found"))?;
1429                let track = LocalVideoTrack::screen_share_for_display(&display);
1430                this.upgrade()
1431                    .ok_or_else(|| anyhow!("room was dropped"))?
1432                    .update(&mut cx, |this, _| {
1433                        this.live_kit
1434                            .as_ref()
1435                            .map(|live_kit| live_kit.room.publish_video_track(track))
1436                    })?
1437                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1438                    .await
1439            };
1440
1441            let publication = publish_track.await;
1442            this.upgrade()
1443                .ok_or_else(|| anyhow!("room was dropped"))?
1444                .update(&mut cx, |this, cx| {
1445                    let live_kit = this
1446                        .live_kit
1447                        .as_mut()
1448                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1449
1450                    let (canceled, muted) = if let LocalTrack::Pending {
1451                        publish_id: cur_publish_id,
1452                        muted,
1453                    } = &live_kit.screen_track
1454                    {
1455                        (*cur_publish_id != publish_id, *muted)
1456                    } else {
1457                        (true, false)
1458                    };
1459
1460                    match publication {
1461                        Ok(publication) => {
1462                            if canceled {
1463                                live_kit.room.unpublish_track(publication);
1464                            } else {
1465                                if muted {
1466                                    cx.background_executor()
1467                                        .spawn(publication.set_mute(muted))
1468                                        .detach();
1469                                }
1470                                live_kit.screen_track = LocalTrack::Published {
1471                                    track_publication: publication,
1472                                    muted,
1473                                };
1474                                cx.notify();
1475                            }
1476
1477                            Audio::play_sound(Sound::StartScreenshare, cx);
1478
1479                            Ok(())
1480                        }
1481                        Err(error) => {
1482                            if canceled {
1483                                Ok(())
1484                            } else {
1485                                live_kit.screen_track = LocalTrack::None;
1486                                cx.notify();
1487                                Err(error)
1488                            }
1489                        }
1490                    }
1491                })?
1492        })
1493    }
1494
1495    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1496        let should_mute = !self.is_muted(cx);
1497        if let Some(live_kit) = self.live_kit.as_mut() {
1498            if matches!(live_kit.microphone_track, LocalTrack::None) {
1499                return Ok(self.share_microphone(cx));
1500            }
1501
1502            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1503            live_kit.muted_by_user = should_mute;
1504
1505            if old_muted == true && live_kit.deafened == true {
1506                if let Some(task) = self.toggle_deafen(cx).ok() {
1507                    task.detach();
1508                }
1509            }
1510
1511            Ok(ret_task)
1512        } else {
1513            Err(anyhow!("LiveKit not started"))
1514        }
1515    }
1516
1517    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1518        if let Some(live_kit) = self.live_kit.as_mut() {
1519            (*live_kit).deafened = !live_kit.deafened;
1520
1521            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1522            // Context notification is sent within set_mute itself.
1523            let mut mute_task = None;
1524            // When deafening, mute user's mic as well.
1525            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1526            if live_kit.deafened || !live_kit.muted_by_user {
1527                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1528            };
1529            for participant in self.remote_participants.values() {
1530                for track in live_kit
1531                    .room
1532                    .remote_audio_track_publications(&participant.user.id.to_string())
1533                {
1534                    let deafened = live_kit.deafened;
1535                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1536                }
1537            }
1538
1539            Ok(cx.foreground_executor().spawn(async move {
1540                if let Some(mute_task) = mute_task {
1541                    mute_task.await?;
1542                }
1543                for task in tasks {
1544                    task.await?;
1545                }
1546                Ok(())
1547            }))
1548        } else {
1549            Err(anyhow!("LiveKit not started"))
1550        }
1551    }
1552
1553    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1554        if self.status.is_offline() {
1555            return Err(anyhow!("room is offline"));
1556        }
1557
1558        let live_kit = self
1559            .live_kit
1560            .as_mut()
1561            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1562        match mem::take(&mut live_kit.screen_track) {
1563            LocalTrack::None => Err(anyhow!("screen was not shared")),
1564            LocalTrack::Pending { .. } => {
1565                cx.notify();
1566                Ok(())
1567            }
1568            LocalTrack::Published {
1569                track_publication, ..
1570            } => {
1571                live_kit.room.unpublish_track(track_publication);
1572                cx.notify();
1573
1574                Audio::play_sound(Sound::StopScreenshare, cx);
1575                Ok(())
1576            }
1577        }
1578    }
1579
1580    #[cfg(any(test, feature = "test-support"))]
1581    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1582        self.live_kit
1583            .as_ref()
1584            .unwrap()
1585            .room
1586            .set_display_sources(sources);
1587    }
1588}
1589
1590struct LiveKitRoom {
1591    room: Arc<live_kit_client::Room>,
1592    screen_track: LocalTrack,
1593    microphone_track: LocalTrack,
1594    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1595    muted_by_user: bool,
1596    deafened: bool,
1597    speaking: bool,
1598    next_publish_id: usize,
1599    _maintain_room: Task<()>,
1600    _maintain_tracks: [Task<()>; 2],
1601}
1602
1603impl LiveKitRoom {
1604    fn set_mute(
1605        self: &mut LiveKitRoom,
1606        should_mute: bool,
1607        cx: &mut ModelContext<Room>,
1608    ) -> Result<(Task<Result<()>>, bool)> {
1609        if !should_mute {
1610            // clear user muting state.
1611            self.muted_by_user = false;
1612        }
1613
1614        let (result, old_muted) = match &mut self.microphone_track {
1615            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1616            LocalTrack::Pending { muted, .. } => {
1617                let old_muted = *muted;
1618                *muted = should_mute;
1619                cx.notify();
1620                Ok((Task::Ready(Some(Ok(()))), old_muted))
1621            }
1622            LocalTrack::Published {
1623                track_publication,
1624                muted,
1625            } => {
1626                let old_muted = *muted;
1627                *muted = should_mute;
1628                cx.notify();
1629                Ok((
1630                    cx.background_executor()
1631                        .spawn(track_publication.set_mute(*muted)),
1632                    old_muted,
1633                ))
1634            }
1635        }?;
1636
1637        if old_muted != should_mute {
1638            if should_mute {
1639                Audio::play_sound(Sound::Mute, cx);
1640            } else {
1641                Audio::play_sound(Sound::Unmute, cx);
1642            }
1643        }
1644
1645        Ok((result, old_muted))
1646    }
1647
1648    fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1649        if let LocalTrack::Published {
1650            track_publication, ..
1651        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1652        {
1653            self.room.unpublish_track(track_publication);
1654            cx.notify();
1655        }
1656
1657        if let LocalTrack::Published {
1658            track_publication, ..
1659        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1660        {
1661            self.room.unpublish_track(track_publication);
1662            cx.notify();
1663        }
1664    }
1665}
1666
1667enum LocalTrack {
1668    None,
1669    Pending {
1670        publish_id: usize,
1671        muted: bool,
1672    },
1673    Published {
1674        track_publication: LocalTrackPublication,
1675        muted: bool,
1676    },
1677}
1678
1679impl Default for LocalTrack {
1680    fn default() -> Self {
1681        Self::None
1682    }
1683}
1684
1685#[derive(Copy, Clone, PartialEq, Eq)]
1686pub enum RoomStatus {
1687    Online,
1688    Rejoining,
1689    Offline,
1690}
1691
1692impl RoomStatus {
1693    pub fn is_offline(&self) -> bool {
1694        matches!(self, RoomStatus::Offline)
1695    }
1696
1697    pub fn is_online(&self) -> bool {
1698        matches!(self, RoomStatus::Online)
1699    }
1700}