room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{
  19    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  20    RemoteVideoTrackUpdate,
  21};
  22use postage::{sink::Sink, stream::Stream, watch};
  23use project::Project;
  24use settings::Settings as _;
  25use std::{future::Future, mem, sync::Arc, time::Duration};
  26use util::{post_inc, ResultExt, TryFutureExt};
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30#[derive(Clone, Debug, PartialEq, Eq)]
  31pub enum Event {
  32    ParticipantLocationChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteVideoTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteAudioTracksChanged {
  39        participant_id: proto::PeerId,
  40    },
  41    RemoteProjectShared {
  42        owner: Arc<User>,
  43        project_id: u64,
  44        worktree_root_names: Vec<String>,
  45    },
  46    RemoteProjectUnshared {
  47        project_id: u64,
  48    },
  49    RemoteProjectJoined {
  50        project_id: u64,
  51    },
  52    RemoteProjectInvitationDiscarded {
  53        project_id: u64,
  54    },
  55    Left,
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<u64>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakModel<Project>>,
  64    joined_projects: HashSet<WeakModel<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Model<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<u64> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                live_kit_client::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<u64>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Model<UserStore>,
 111        cx: &mut ModelContext<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = live_kit_client::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(|this, mut cx| async move {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == live_kit_client::ConnectionState::Disconnected {
 127                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 128                            .ok();
 129                        break;
 130                    }
 131                }
 132            });
 133
 134            let _maintain_video_tracks = cx.spawn({
 135                let room = room.clone();
 136                move |this, mut cx| async move {
 137                    let mut track_video_changes = room.remote_video_track_updates();
 138                    while let Some(track_change) = track_video_changes.next().await {
 139                        let this = if let Some(this) = this.upgrade() {
 140                            this
 141                        } else {
 142                            break;
 143                        };
 144
 145                        this.update(&mut cx, |this, cx| {
 146                            this.remote_video_track_updated(track_change, cx).log_err()
 147                        })
 148                        .ok();
 149                    }
 150                }
 151            });
 152
 153            let _maintain_audio_tracks = cx.spawn({
 154                let room = room.clone();
 155                |this, mut cx| async move {
 156                    let mut track_audio_changes = room.remote_audio_track_updates();
 157                    while let Some(track_change) = track_audio_changes.next().await {
 158                        let this = if let Some(this) = this.upgrade() {
 159                            this
 160                        } else {
 161                            break;
 162                        };
 163
 164                        this.update(&mut cx, |this, cx| {
 165                            this.remote_audio_track_updated(track_change, cx).log_err()
 166                        })
 167                        .ok();
 168                    }
 169                }
 170            });
 171
 172            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 173            cx.spawn(|this, mut cx| async move {
 174                connect.await?;
 175
 176                if !cx.update(|cx| Self::mute_on_join(cx))? {
 177                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 178                        .await?;
 179                }
 180
 181                anyhow::Ok(())
 182            })
 183            .detach_and_log_err(cx);
 184
 185            Some(LiveKitRoom {
 186                room,
 187                screen_track: LocalTrack::None,
 188                microphone_track: LocalTrack::None,
 189                next_publish_id: 0,
 190                muted_by_user: false,
 191                deafened: false,
 192                speaking: false,
 193                _maintain_room,
 194                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 195            })
 196        } else {
 197            None
 198        };
 199
 200        let maintain_connection = cx.spawn({
 201            let client = client.clone();
 202            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 203        });
 204
 205        Audio::play_sound(Sound::Joined, cx);
 206
 207        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 208
 209        Self {
 210            id,
 211            channel_id,
 212            live_kit: live_kit_room,
 213            status: RoomStatus::Online,
 214            shared_projects: Default::default(),
 215            joined_projects: Default::default(),
 216            participant_user_ids: Default::default(),
 217            local_participant: Default::default(),
 218            remote_participants: Default::default(),
 219            pending_participants: Default::default(),
 220            pending_call_count: 0,
 221            client_subscriptions: vec![
 222                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 223            ],
 224            _subscriptions: vec![
 225                cx.on_release(Self::released),
 226                cx.on_app_quit(Self::app_will_quit),
 227            ],
 228            leave_when_empty: false,
 229            pending_room_update: None,
 230            client,
 231            user_store,
 232            follows_by_leader_id_project_id: Default::default(),
 233            maintain_connection: Some(maintain_connection),
 234            room_update_completed_tx,
 235            room_update_completed_rx,
 236        }
 237    }
 238
 239    pub(crate) fn create(
 240        called_user_id: u64,
 241        initial_project: Option<Model<Project>>,
 242        client: Arc<Client>,
 243        user_store: Model<UserStore>,
 244        cx: &mut AppContext,
 245    ) -> Task<Result<Model<Self>>> {
 246        cx.spawn(move |mut cx| async move {
 247            let response = client.request(proto::CreateRoom {}).await?;
 248            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 249            let room = cx.new_model(|cx| {
 250                let mut room = Self::new(
 251                    room_proto.id,
 252                    None,
 253                    response.live_kit_connection_info,
 254                    client,
 255                    user_store,
 256                    cx,
 257                );
 258                if let Some(participant) = room_proto.participants.first() {
 259                    room.local_participant.role = participant.role()
 260                }
 261                room
 262            })?;
 263
 264            let initial_project_id = if let Some(initial_project) = initial_project {
 265                let initial_project_id = room
 266                    .update(&mut cx, |room, cx| {
 267                        room.share_project(initial_project.clone(), cx)
 268                    })?
 269                    .await?;
 270                Some(initial_project_id)
 271            } else {
 272                None
 273            };
 274
 275            match room
 276                .update(&mut cx, |room, cx| {
 277                    room.leave_when_empty = true;
 278                    room.call(called_user_id, initial_project_id, cx)
 279                })?
 280                .await
 281            {
 282                Ok(()) => Ok(room),
 283                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 284            }
 285        })
 286    }
 287
 288    pub(crate) async fn join_channel(
 289        channel_id: u64,
 290        client: Arc<Client>,
 291        user_store: Model<UserStore>,
 292        cx: AsyncAppContext,
 293    ) -> Result<Model<Self>> {
 294        Self::from_join_response(
 295            client.request(proto::JoinChannel { channel_id }).await?,
 296            client,
 297            user_store,
 298            cx,
 299        )
 300    }
 301
 302    pub(crate) async fn join(
 303        room_id: u64,
 304        client: Arc<Client>,
 305        user_store: Model<UserStore>,
 306        cx: AsyncAppContext,
 307    ) -> Result<Model<Self>> {
 308        Self::from_join_response(
 309            client.request(proto::JoinRoom { id: room_id }).await?,
 310            client,
 311            user_store,
 312            cx,
 313        )
 314    }
 315
 316    fn released(&mut self, cx: &mut AppContext) {
 317        if self.status.is_online() {
 318            self.leave_internal(cx).detach_and_log_err(cx);
 319        }
 320    }
 321
 322    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 323        let task = if self.status.is_online() {
 324            let leave = self.leave_internal(cx);
 325            Some(cx.background_executor().spawn(async move {
 326                leave.await.log_err();
 327            }))
 328        } else {
 329            None
 330        };
 331
 332        async move {
 333            if let Some(task) = task {
 334                task.await;
 335            }
 336        }
 337    }
 338
 339    pub fn mute_on_join(cx: &AppContext) -> bool {
 340        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 341    }
 342
 343    fn from_join_response(
 344        response: proto::JoinRoomResponse,
 345        client: Arc<Client>,
 346        user_store: Model<UserStore>,
 347        mut cx: AsyncAppContext,
 348    ) -> Result<Model<Self>> {
 349        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 350        let room = cx.new_model(|cx| {
 351            Self::new(
 352                room_proto.id,
 353                response.channel_id,
 354                response.live_kit_connection_info,
 355                client,
 356                user_store,
 357                cx,
 358            )
 359        })?;
 360        room.update(&mut cx, |room, cx| {
 361            room.leave_when_empty = room.channel_id.is_none();
 362            room.apply_room_update(room_proto, cx)?;
 363            anyhow::Ok(())
 364        })??;
 365        Ok(room)
 366    }
 367
 368    fn should_leave(&self) -> bool {
 369        self.leave_when_empty
 370            && self.pending_room_update.is_none()
 371            && self.pending_participants.is_empty()
 372            && self.remote_participants.is_empty()
 373            && self.pending_call_count == 0
 374    }
 375
 376    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 377        cx.notify();
 378        cx.emit(Event::Left);
 379        self.leave_internal(cx)
 380    }
 381
 382    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 383        if self.status.is_offline() {
 384            return Task::ready(Err(anyhow!("room is offline")));
 385        }
 386
 387        log::info!("leaving room");
 388        Audio::play_sound(Sound::Leave, cx);
 389
 390        self.clear_state(cx);
 391
 392        let leave_room = self.client.request(proto::LeaveRoom {});
 393        cx.background_executor().spawn(async move {
 394            leave_room.await?;
 395            anyhow::Ok(())
 396        })
 397    }
 398
 399    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 400        for project in self.shared_projects.drain() {
 401            if let Some(project) = project.upgrade() {
 402                project.update(cx, |project, cx| {
 403                    project.unshare(cx).log_err();
 404                });
 405            }
 406        }
 407        for project in self.joined_projects.drain() {
 408            if let Some(project) = project.upgrade() {
 409                project.update(cx, |project, cx| {
 410                    project.disconnected_from_host(cx);
 411                    project.close(cx);
 412                });
 413            }
 414        }
 415
 416        self.status = RoomStatus::Offline;
 417        self.remote_participants.clear();
 418        self.pending_participants.clear();
 419        self.participant_user_ids.clear();
 420        self.client_subscriptions.clear();
 421        self.live_kit.take();
 422        self.pending_room_update.take();
 423        self.maintain_connection.take();
 424    }
 425
 426    async fn maintain_connection(
 427        this: WeakModel<Self>,
 428        client: Arc<Client>,
 429        mut cx: AsyncAppContext,
 430    ) -> Result<()> {
 431        let mut client_status = client.status();
 432        loop {
 433            let _ = client_status.try_recv();
 434            let is_connected = client_status.borrow().is_connected();
 435            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 436            if !is_connected || client_status.next().await.is_some() {
 437                log::info!("detected client disconnection");
 438
 439                this.upgrade()
 440                    .ok_or_else(|| anyhow!("room was dropped"))?
 441                    .update(&mut cx, |this, cx| {
 442                        this.status = RoomStatus::Rejoining;
 443                        cx.notify();
 444                    })?;
 445
 446                // Wait for client to re-establish a connection to the server.
 447                {
 448                    let mut reconnection_timeout =
 449                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 450                    let client_reconnection = async {
 451                        let mut remaining_attempts = 3;
 452                        while remaining_attempts > 0 {
 453                            if client_status.borrow().is_connected() {
 454                                log::info!("client reconnected, attempting to rejoin room");
 455
 456                                let Some(this) = this.upgrade() else { break };
 457                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 458                                    Ok(task) => {
 459                                        if task.await.log_err().is_some() {
 460                                            return true;
 461                                        } else {
 462                                            remaining_attempts -= 1;
 463                                        }
 464                                    }
 465                                    Err(_app_dropped) => return false,
 466                                }
 467                            } else if client_status.borrow().is_signed_out() {
 468                                return false;
 469                            }
 470
 471                            log::info!(
 472                                "waiting for client status change, remaining attempts {}",
 473                                remaining_attempts
 474                            );
 475                            client_status.next().await;
 476                        }
 477                        false
 478                    }
 479                    .fuse();
 480                    futures::pin_mut!(client_reconnection);
 481
 482                    futures::select_biased! {
 483                        reconnected = client_reconnection => {
 484                            if reconnected {
 485                                log::info!("successfully reconnected to room");
 486                                // If we successfully joined the room, go back around the loop
 487                                // waiting for future connection status changes.
 488                                continue;
 489                            }
 490                        }
 491                        _ = reconnection_timeout => {
 492                            log::info!("room reconnection timeout expired");
 493                        }
 494                    }
 495                }
 496
 497                break;
 498            }
 499        }
 500
 501        // The client failed to re-establish a connection to the server
 502        // or an error occurred while trying to re-join the room. Either way
 503        // we leave the room and return an error.
 504        if let Some(this) = this.upgrade() {
 505            log::info!("reconnection failed, leaving room");
 506            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 507        }
 508        Err(anyhow!(
 509            "can't reconnect to room: client failed to re-establish connection"
 510        ))
 511    }
 512
 513    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 514        let mut projects = HashMap::default();
 515        let mut reshared_projects = Vec::new();
 516        let mut rejoined_projects = Vec::new();
 517        self.shared_projects.retain(|project| {
 518            if let Some(handle) = project.upgrade() {
 519                let project = handle.read(cx);
 520                if let Some(project_id) = project.remote_id() {
 521                    projects.insert(project_id, handle.clone());
 522                    reshared_projects.push(proto::UpdateProject {
 523                        project_id,
 524                        worktrees: project.worktree_metadata_protos(cx),
 525                    });
 526                    return true;
 527                }
 528            }
 529            false
 530        });
 531        self.joined_projects.retain(|project| {
 532            if let Some(handle) = project.upgrade() {
 533                let project = handle.read(cx);
 534                if let Some(project_id) = project.remote_id() {
 535                    projects.insert(project_id, handle.clone());
 536                    rejoined_projects.push(proto::RejoinProject {
 537                        id: project_id,
 538                        worktrees: project
 539                            .worktrees()
 540                            .map(|worktree| {
 541                                let worktree = worktree.read(cx);
 542                                proto::RejoinWorktree {
 543                                    id: worktree.id().to_proto(),
 544                                    scan_id: worktree.completed_scan_id() as u64,
 545                                }
 546                            })
 547                            .collect(),
 548                    });
 549                }
 550                return true;
 551            }
 552            false
 553        });
 554
 555        let response = self.client.request_envelope(proto::RejoinRoom {
 556            id: self.id,
 557            reshared_projects,
 558            rejoined_projects,
 559        });
 560
 561        cx.spawn(|this, mut cx| async move {
 562            let response = response.await?;
 563            let message_id = response.message_id;
 564            let response = response.payload;
 565            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 566            this.update(&mut cx, |this, cx| {
 567                this.status = RoomStatus::Online;
 568                this.apply_room_update(room_proto, cx)?;
 569
 570                for reshared_project in response.reshared_projects {
 571                    if let Some(project) = projects.get(&reshared_project.id) {
 572                        project.update(cx, |project, cx| {
 573                            project.reshared(reshared_project, cx).log_err();
 574                        });
 575                    }
 576                }
 577
 578                for rejoined_project in response.rejoined_projects {
 579                    if let Some(project) = projects.get(&rejoined_project.id) {
 580                        project.update(cx, |project, cx| {
 581                            project.rejoined(rejoined_project, message_id, cx).log_err();
 582                        });
 583                    }
 584                }
 585
 586                anyhow::Ok(())
 587            })?
 588        })
 589    }
 590
 591    pub fn id(&self) -> u64 {
 592        self.id
 593    }
 594
 595    pub fn status(&self) -> RoomStatus {
 596        self.status
 597    }
 598
 599    pub fn local_participant(&self) -> &LocalParticipant {
 600        &self.local_participant
 601    }
 602
 603    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 604        &self.remote_participants
 605    }
 606
 607    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 608        self.remote_participants
 609            .values()
 610            .find(|p| p.peer_id == peer_id)
 611    }
 612
 613    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 614        self.remote_participants
 615            .get(&user_id)
 616            .map(|participant| participant.role)
 617    }
 618
 619    pub fn local_participant_is_admin(&self) -> bool {
 620        self.local_participant.role == proto::ChannelRole::Admin
 621    }
 622
 623    pub fn pending_participants(&self) -> &[Arc<User>] {
 624        &self.pending_participants
 625    }
 626
 627    pub fn contains_participant(&self, user_id: u64) -> bool {
 628        self.participant_user_ids.contains(&user_id)
 629    }
 630
 631    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 632        self.follows_by_leader_id_project_id
 633            .get(&(leader_id, project_id))
 634            .map_or(&[], |v| v.as_slice())
 635    }
 636
 637    /// Returns the most 'active' projects, defined as most people in the project
 638    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 639        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 640        for participant in self.remote_participants.values() {
 641            match participant.location {
 642                ParticipantLocation::SharedProject { project_id } => {
 643                    project_hosts_and_guest_counts
 644                        .entry(project_id)
 645                        .or_default()
 646                        .1 += 1;
 647                }
 648                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 649            }
 650            for project in &participant.projects {
 651                project_hosts_and_guest_counts
 652                    .entry(project.id)
 653                    .or_default()
 654                    .0 = Some(participant.user.id);
 655            }
 656        }
 657
 658        if let Some(user) = self.user_store.read(cx).current_user() {
 659            for project in &self.local_participant.projects {
 660                project_hosts_and_guest_counts
 661                    .entry(project.id)
 662                    .or_default()
 663                    .0 = Some(user.id);
 664            }
 665        }
 666
 667        project_hosts_and_guest_counts
 668            .into_iter()
 669            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 670            .max_by_key(|(_, _, guest_count)| *guest_count)
 671            .map(|(id, host, _)| (id, host))
 672    }
 673
 674    async fn handle_room_updated(
 675        this: Model<Self>,
 676        envelope: TypedEnvelope<proto::RoomUpdated>,
 677        _: Arc<Client>,
 678        mut cx: AsyncAppContext,
 679    ) -> Result<()> {
 680        let room = envelope
 681            .payload
 682            .room
 683            .ok_or_else(|| anyhow!("invalid room"))?;
 684        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 685    }
 686
 687    fn apply_room_update(
 688        &mut self,
 689        mut room: proto::Room,
 690        cx: &mut ModelContext<Self>,
 691    ) -> Result<()> {
 692        // Filter ourselves out from the room's participants.
 693        let local_participant_ix = room
 694            .participants
 695            .iter()
 696            .position(|participant| Some(participant.user_id) == self.client.user_id());
 697        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 698
 699        let pending_participant_user_ids = room
 700            .pending_participants
 701            .iter()
 702            .map(|p| p.user_id)
 703            .collect::<Vec<_>>();
 704
 705        let remote_participant_user_ids = room
 706            .participants
 707            .iter()
 708            .map(|p| p.user_id)
 709            .collect::<Vec<_>>();
 710
 711        let (remote_participants, pending_participants) =
 712            self.user_store.update(cx, move |user_store, cx| {
 713                (
 714                    user_store.get_users(remote_participant_user_ids, cx),
 715                    user_store.get_users(pending_participant_user_ids, cx),
 716                )
 717            });
 718
 719        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 720            let (remote_participants, pending_participants) =
 721                futures::join!(remote_participants, pending_participants);
 722
 723            this.update(&mut cx, |this, cx| {
 724                this.participant_user_ids.clear();
 725
 726                if let Some(participant) = local_participant {
 727                    let role = participant.role();
 728                    this.local_participant.projects = participant.projects;
 729                    if this.local_participant.role != role {
 730                        this.local_participant.role = role;
 731
 732                        this.joined_projects.retain(|project| {
 733                            if let Some(project) = project.upgrade() {
 734                                project.update(cx, |project, _| project.set_role(role));
 735                                true
 736                            } else {
 737                                false
 738                            }
 739                        });
 740                    }
 741                } else {
 742                    this.local_participant.projects.clear();
 743                }
 744
 745                if let Some(participants) = remote_participants.log_err() {
 746                    for (participant, user) in room.participants.into_iter().zip(participants) {
 747                        let Some(peer_id) = participant.peer_id else {
 748                            continue;
 749                        };
 750                        let participant_index = ParticipantIndex(participant.participant_index);
 751                        this.participant_user_ids.insert(participant.user_id);
 752
 753                        let old_projects = this
 754                            .remote_participants
 755                            .get(&participant.user_id)
 756                            .into_iter()
 757                            .flat_map(|existing| &existing.projects)
 758                            .map(|project| project.id)
 759                            .collect::<HashSet<_>>();
 760                        let new_projects = participant
 761                            .projects
 762                            .iter()
 763                            .map(|project| project.id)
 764                            .collect::<HashSet<_>>();
 765
 766                        for project in &participant.projects {
 767                            if !old_projects.contains(&project.id) {
 768                                cx.emit(Event::RemoteProjectShared {
 769                                    owner: user.clone(),
 770                                    project_id: project.id,
 771                                    worktree_root_names: project.worktree_root_names.clone(),
 772                                });
 773                            }
 774                        }
 775
 776                        for unshared_project_id in old_projects.difference(&new_projects) {
 777                            this.joined_projects.retain(|project| {
 778                                if let Some(project) = project.upgrade() {
 779                                    project.update(cx, |project, cx| {
 780                                        if project.remote_id() == Some(*unshared_project_id) {
 781                                            project.disconnected_from_host(cx);
 782                                            false
 783                                        } else {
 784                                            true
 785                                        }
 786                                    })
 787                                } else {
 788                                    false
 789                                }
 790                            });
 791                            cx.emit(Event::RemoteProjectUnshared {
 792                                project_id: *unshared_project_id,
 793                            });
 794                        }
 795
 796                        let role = participant.role();
 797                        let location = ParticipantLocation::from_proto(participant.location)
 798                            .unwrap_or(ParticipantLocation::External);
 799                        if let Some(remote_participant) =
 800                            this.remote_participants.get_mut(&participant.user_id)
 801                        {
 802                            remote_participant.peer_id = peer_id;
 803                            remote_participant.projects = participant.projects;
 804                            remote_participant.participant_index = participant_index;
 805                            if location != remote_participant.location
 806                                || role != remote_participant.role
 807                            {
 808                                remote_participant.location = location;
 809                                remote_participant.role = role;
 810                                cx.emit(Event::ParticipantLocationChanged {
 811                                    participant_id: peer_id,
 812                                });
 813                            }
 814                        } else {
 815                            this.remote_participants.insert(
 816                                participant.user_id,
 817                                RemoteParticipant {
 818                                    user: user.clone(),
 819                                    participant_index,
 820                                    peer_id,
 821                                    projects: participant.projects,
 822                                    location,
 823                                    role,
 824                                    muted: true,
 825                                    speaking: false,
 826                                    video_tracks: Default::default(),
 827                                    audio_tracks: Default::default(),
 828                                },
 829                            );
 830
 831                            Audio::play_sound(Sound::Joined, cx);
 832
 833                            if let Some(live_kit) = this.live_kit.as_ref() {
 834                                let video_tracks =
 835                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 836                                let audio_tracks =
 837                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 838                                let publications = live_kit
 839                                    .room
 840                                    .remote_audio_track_publications(&user.id.to_string());
 841
 842                                for track in video_tracks {
 843                                    this.remote_video_track_updated(
 844                                        RemoteVideoTrackUpdate::Subscribed(track),
 845                                        cx,
 846                                    )
 847                                    .log_err();
 848                                }
 849
 850                                for (track, publication) in
 851                                    audio_tracks.iter().zip(publications.iter())
 852                                {
 853                                    this.remote_audio_track_updated(
 854                                        RemoteAudioTrackUpdate::Subscribed(
 855                                            track.clone(),
 856                                            publication.clone(),
 857                                        ),
 858                                        cx,
 859                                    )
 860                                    .log_err();
 861                                }
 862                            }
 863                        }
 864                    }
 865
 866                    this.remote_participants.retain(|user_id, participant| {
 867                        if this.participant_user_ids.contains(user_id) {
 868                            true
 869                        } else {
 870                            for project in &participant.projects {
 871                                cx.emit(Event::RemoteProjectUnshared {
 872                                    project_id: project.id,
 873                                });
 874                            }
 875                            false
 876                        }
 877                    });
 878                }
 879
 880                if let Some(pending_participants) = pending_participants.log_err() {
 881                    this.pending_participants = pending_participants;
 882                    for participant in &this.pending_participants {
 883                        this.participant_user_ids.insert(participant.id);
 884                    }
 885                }
 886
 887                this.follows_by_leader_id_project_id.clear();
 888                for follower in room.followers {
 889                    let project_id = follower.project_id;
 890                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 891                        (Some(leader), Some(follower)) => (leader, follower),
 892
 893                        _ => {
 894                            log::error!("Follower message {follower:?} missing some state");
 895                            continue;
 896                        }
 897                    };
 898
 899                    let list = this
 900                        .follows_by_leader_id_project_id
 901                        .entry((leader, project_id))
 902                        .or_insert(Vec::new());
 903                    if !list.contains(&follower) {
 904                        list.push(follower);
 905                    }
 906                }
 907
 908                this.pending_room_update.take();
 909                if this.should_leave() {
 910                    log::info!("room is empty, leaving");
 911                    let _ = this.leave(cx);
 912                }
 913
 914                this.user_store.update(cx, |user_store, cx| {
 915                    let participant_indices_by_user_id = this
 916                        .remote_participants
 917                        .iter()
 918                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 919                        .collect();
 920                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 921                });
 922
 923                this.check_invariants();
 924                this.room_update_completed_tx.try_send(Some(())).ok();
 925                cx.notify();
 926            })
 927            .ok();
 928        }));
 929
 930        cx.notify();
 931        Ok(())
 932    }
 933
 934    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 935        let mut done_rx = self.room_update_completed_rx.clone();
 936        async move {
 937            while let Some(result) = done_rx.next().await {
 938                if result.is_some() {
 939                    break;
 940                }
 941            }
 942        }
 943    }
 944
 945    fn remote_video_track_updated(
 946        &mut self,
 947        change: RemoteVideoTrackUpdate,
 948        cx: &mut ModelContext<Self>,
 949    ) -> Result<()> {
 950        match change {
 951            RemoteVideoTrackUpdate::Subscribed(track) => {
 952                let user_id = track.publisher_id().parse()?;
 953                let track_id = track.sid().to_string();
 954                let participant = self
 955                    .remote_participants
 956                    .get_mut(&user_id)
 957                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 958                participant.video_tracks.insert(track_id.clone(), track);
 959                cx.emit(Event::RemoteVideoTracksChanged {
 960                    participant_id: participant.peer_id,
 961                });
 962            }
 963            RemoteVideoTrackUpdate::Unsubscribed {
 964                publisher_id,
 965                track_id,
 966            } => {
 967                let user_id = publisher_id.parse()?;
 968                let participant = self
 969                    .remote_participants
 970                    .get_mut(&user_id)
 971                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 972                participant.video_tracks.remove(&track_id);
 973                cx.emit(Event::RemoteVideoTracksChanged {
 974                    participant_id: participant.peer_id,
 975                });
 976            }
 977        }
 978
 979        cx.notify();
 980        Ok(())
 981    }
 982
 983    fn remote_audio_track_updated(
 984        &mut self,
 985        change: RemoteAudioTrackUpdate,
 986        cx: &mut ModelContext<Self>,
 987    ) -> Result<()> {
 988        match change {
 989            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 990                let mut speaker_ids = speakers
 991                    .into_iter()
 992                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 993                    .collect::<Vec<u64>>();
 994                speaker_ids.sort_unstable();
 995                for (sid, participant) in &mut self.remote_participants {
 996                    if let Ok(_) = speaker_ids.binary_search(sid) {
 997                        participant.speaking = true;
 998                    } else {
 999                        participant.speaking = false;
1000                    }
1001                }
1002                if let Some(id) = self.client.user_id() {
1003                    if let Some(room) = &mut self.live_kit {
1004                        if let Ok(_) = speaker_ids.binary_search(&id) {
1005                            room.speaking = true;
1006                        } else {
1007                            room.speaking = false;
1008                        }
1009                    }
1010                }
1011                cx.notify();
1012            }
1013            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
1014                let mut found = false;
1015                for participant in &mut self.remote_participants.values_mut() {
1016                    for track in participant.audio_tracks.values() {
1017                        if track.sid() == track_id {
1018                            found = true;
1019                            break;
1020                        }
1021                    }
1022                    if found {
1023                        participant.muted = muted;
1024                        break;
1025                    }
1026                }
1027
1028                cx.notify();
1029            }
1030            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1031                let user_id = track.publisher_id().parse()?;
1032                let track_id = track.sid().to_string();
1033                let participant = self
1034                    .remote_participants
1035                    .get_mut(&user_id)
1036                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1037                participant.audio_tracks.insert(track_id.clone(), track);
1038                participant.muted = publication.is_muted();
1039
1040                cx.emit(Event::RemoteAudioTracksChanged {
1041                    participant_id: participant.peer_id,
1042                });
1043            }
1044            RemoteAudioTrackUpdate::Unsubscribed {
1045                publisher_id,
1046                track_id,
1047            } => {
1048                let user_id = publisher_id.parse()?;
1049                let participant = self
1050                    .remote_participants
1051                    .get_mut(&user_id)
1052                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1053                participant.audio_tracks.remove(&track_id);
1054                cx.emit(Event::RemoteAudioTracksChanged {
1055                    participant_id: participant.peer_id,
1056                });
1057            }
1058        }
1059
1060        cx.notify();
1061        Ok(())
1062    }
1063
1064    fn check_invariants(&self) {
1065        #[cfg(any(test, feature = "test-support"))]
1066        {
1067            for participant in self.remote_participants.values() {
1068                assert!(self.participant_user_ids.contains(&participant.user.id));
1069                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1070            }
1071
1072            for participant in &self.pending_participants {
1073                assert!(self.participant_user_ids.contains(&participant.id));
1074                assert_ne!(participant.id, self.client.user_id().unwrap());
1075            }
1076
1077            assert_eq!(
1078                self.participant_user_ids.len(),
1079                self.remote_participants.len() + self.pending_participants.len()
1080            );
1081        }
1082    }
1083
1084    pub(crate) fn call(
1085        &mut self,
1086        called_user_id: u64,
1087        initial_project_id: Option<u64>,
1088        cx: &mut ModelContext<Self>,
1089    ) -> Task<Result<()>> {
1090        if self.status.is_offline() {
1091            return Task::ready(Err(anyhow!("room is offline")));
1092        }
1093
1094        cx.notify();
1095        let client = self.client.clone();
1096        let room_id = self.id;
1097        self.pending_call_count += 1;
1098        cx.spawn(move |this, mut cx| async move {
1099            let result = client
1100                .request(proto::Call {
1101                    room_id,
1102                    called_user_id,
1103                    initial_project_id,
1104                })
1105                .await;
1106            this.update(&mut cx, |this, cx| {
1107                this.pending_call_count -= 1;
1108                if this.should_leave() {
1109                    this.leave(cx).detach_and_log_err(cx);
1110                }
1111            })?;
1112            result?;
1113            Ok(())
1114        })
1115    }
1116
1117    pub fn join_project(
1118        &mut self,
1119        id: u64,
1120        language_registry: Arc<LanguageRegistry>,
1121        fs: Arc<dyn Fs>,
1122        cx: &mut ModelContext<Self>,
1123    ) -> Task<Result<Model<Project>>> {
1124        let client = self.client.clone();
1125        let user_store = self.user_store.clone();
1126        let role = self.local_participant.role;
1127        cx.emit(Event::RemoteProjectJoined { project_id: id });
1128        cx.spawn(move |this, mut cx| async move {
1129            let project = Project::remote(
1130                id,
1131                client,
1132                user_store,
1133                language_registry,
1134                fs,
1135                role,
1136                cx.clone(),
1137            )
1138            .await?;
1139
1140            this.update(&mut cx, |this, cx| {
1141                this.joined_projects.retain(|project| {
1142                    if let Some(project) = project.upgrade() {
1143                        !project.read(cx).is_disconnected()
1144                    } else {
1145                        false
1146                    }
1147                });
1148                this.joined_projects.insert(project.downgrade());
1149            })?;
1150            Ok(project)
1151        })
1152    }
1153
1154    pub(crate) fn share_project(
1155        &mut self,
1156        project: Model<Project>,
1157        cx: &mut ModelContext<Self>,
1158    ) -> Task<Result<u64>> {
1159        if let Some(project_id) = project.read(cx).remote_id() {
1160            return Task::ready(Ok(project_id));
1161        }
1162
1163        let request = self.client.request(proto::ShareProject {
1164            room_id: self.id(),
1165            worktrees: project.read(cx).worktree_metadata_protos(cx),
1166        });
1167        cx.spawn(|this, mut cx| async move {
1168            let response = request.await?;
1169
1170            project.update(&mut cx, |project, cx| {
1171                project.shared(response.project_id, cx)
1172            })??;
1173
1174            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1175            this.update(&mut cx, |this, cx| {
1176                this.shared_projects.insert(project.downgrade());
1177                let active_project = this.local_participant.active_project.as_ref();
1178                if active_project.map_or(false, |location| *location == project) {
1179                    this.set_location(Some(&project), cx)
1180                } else {
1181                    Task::ready(Ok(()))
1182                }
1183            })?
1184            .await?;
1185
1186            Ok(response.project_id)
1187        })
1188    }
1189
1190    pub(crate) fn unshare_project(
1191        &mut self,
1192        project: Model<Project>,
1193        cx: &mut ModelContext<Self>,
1194    ) -> Result<()> {
1195        let project_id = match project.read(cx).remote_id() {
1196            Some(project_id) => project_id,
1197            None => return Ok(()),
1198        };
1199
1200        self.client.send(proto::UnshareProject { project_id })?;
1201        project.update(cx, |this, cx| this.unshare(cx))
1202    }
1203
1204    pub(crate) fn set_location(
1205        &mut self,
1206        project: Option<&Model<Project>>,
1207        cx: &mut ModelContext<Self>,
1208    ) -> Task<Result<()>> {
1209        if self.status.is_offline() {
1210            return Task::ready(Err(anyhow!("room is offline")));
1211        }
1212
1213        let client = self.client.clone();
1214        let room_id = self.id;
1215        let location = if let Some(project) = project {
1216            self.local_participant.active_project = Some(project.downgrade());
1217            if let Some(project_id) = project.read(cx).remote_id() {
1218                proto::participant_location::Variant::SharedProject(
1219                    proto::participant_location::SharedProject { id: project_id },
1220                )
1221            } else {
1222                proto::participant_location::Variant::UnsharedProject(
1223                    proto::participant_location::UnsharedProject {},
1224                )
1225            }
1226        } else {
1227            self.local_participant.active_project = None;
1228            proto::participant_location::Variant::External(proto::participant_location::External {})
1229        };
1230
1231        cx.notify();
1232        cx.background_executor().spawn(async move {
1233            client
1234                .request(proto::UpdateParticipantLocation {
1235                    room_id,
1236                    location: Some(proto::ParticipantLocation {
1237                        variant: Some(location),
1238                    }),
1239                })
1240                .await?;
1241            Ok(())
1242        })
1243    }
1244
1245    pub fn is_screen_sharing(&self) -> bool {
1246        self.live_kit.as_ref().map_or(false, |live_kit| {
1247            !matches!(live_kit.screen_track, LocalTrack::None)
1248        })
1249    }
1250
1251    pub fn is_sharing_mic(&self) -> bool {
1252        self.live_kit.as_ref().map_or(false, |live_kit| {
1253            !matches!(live_kit.microphone_track, LocalTrack::None)
1254        })
1255    }
1256
1257    pub fn is_muted(&self, cx: &AppContext) -> bool {
1258        self.live_kit
1259            .as_ref()
1260            .and_then(|live_kit| match &live_kit.microphone_track {
1261                LocalTrack::None => Some(Self::mute_on_join(cx)),
1262                LocalTrack::Pending { muted, .. } => Some(*muted),
1263                LocalTrack::Published { muted, .. } => Some(*muted),
1264            })
1265            .unwrap_or(false)
1266    }
1267
1268    pub fn read_only(&self) -> bool {
1269        !(self.local_participant().role == proto::ChannelRole::Member
1270            || self.local_participant().role == proto::ChannelRole::Admin)
1271    }
1272
1273    pub fn is_speaking(&self) -> bool {
1274        self.live_kit
1275            .as_ref()
1276            .map_or(false, |live_kit| live_kit.speaking)
1277    }
1278
1279    pub fn is_deafened(&self) -> Option<bool> {
1280        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1281    }
1282
1283    #[track_caller]
1284    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1285        if self.status.is_offline() {
1286            return Task::ready(Err(anyhow!("room is offline")));
1287        } else if self.is_sharing_mic() {
1288            return Task::ready(Err(anyhow!("microphone was already shared")));
1289        }
1290
1291        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1292            let publish_id = post_inc(&mut live_kit.next_publish_id);
1293            live_kit.microphone_track = LocalTrack::Pending {
1294                publish_id,
1295                muted: false,
1296            };
1297            cx.notify();
1298            publish_id
1299        } else {
1300            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1301        };
1302
1303        cx.spawn(move |this, mut cx| async move {
1304            let publish_track = async {
1305                let track = LocalAudioTrack::create();
1306                this.upgrade()
1307                    .ok_or_else(|| anyhow!("room was dropped"))?
1308                    .update(&mut cx, |this, _| {
1309                        this.live_kit
1310                            .as_ref()
1311                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1312                    })?
1313                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1314                    .await
1315            };
1316            let publication = publish_track.await;
1317            this.upgrade()
1318                .ok_or_else(|| anyhow!("room was dropped"))?
1319                .update(&mut cx, |this, cx| {
1320                    let live_kit = this
1321                        .live_kit
1322                        .as_mut()
1323                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1324
1325                    let (canceled, muted) = if let LocalTrack::Pending {
1326                        publish_id: cur_publish_id,
1327                        muted,
1328                    } = &live_kit.microphone_track
1329                    {
1330                        (*cur_publish_id != publish_id, *muted)
1331                    } else {
1332                        (true, false)
1333                    };
1334
1335                    match publication {
1336                        Ok(publication) => {
1337                            if canceled {
1338                                live_kit.room.unpublish_track(publication);
1339                            } else {
1340                                if muted {
1341                                    cx.background_executor()
1342                                        .spawn(publication.set_mute(muted))
1343                                        .detach();
1344                                }
1345                                live_kit.microphone_track = LocalTrack::Published {
1346                                    track_publication: publication,
1347                                    muted,
1348                                };
1349                                cx.notify();
1350                            }
1351                            Ok(())
1352                        }
1353                        Err(error) => {
1354                            if canceled {
1355                                Ok(())
1356                            } else {
1357                                live_kit.microphone_track = LocalTrack::None;
1358                                cx.notify();
1359                                Err(error)
1360                            }
1361                        }
1362                    }
1363                })?
1364        })
1365    }
1366
1367    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1368        if self.status.is_offline() {
1369            return Task::ready(Err(anyhow!("room is offline")));
1370        } else if self.is_screen_sharing() {
1371            return Task::ready(Err(anyhow!("screen was already shared")));
1372        }
1373
1374        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1375            let publish_id = post_inc(&mut live_kit.next_publish_id);
1376            live_kit.screen_track = LocalTrack::Pending {
1377                publish_id,
1378                muted: false,
1379            };
1380            cx.notify();
1381            (live_kit.room.display_sources(), publish_id)
1382        } else {
1383            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1384        };
1385
1386        cx.spawn(move |this, mut cx| async move {
1387            let publish_track = async {
1388                let displays = displays.await?;
1389                let display = displays
1390                    .first()
1391                    .ok_or_else(|| anyhow!("no display found"))?;
1392                let track = LocalVideoTrack::screen_share_for_display(&display);
1393                this.upgrade()
1394                    .ok_or_else(|| anyhow!("room was dropped"))?
1395                    .update(&mut cx, |this, _| {
1396                        this.live_kit
1397                            .as_ref()
1398                            .map(|live_kit| live_kit.room.publish_video_track(track))
1399                    })?
1400                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1401                    .await
1402            };
1403
1404            let publication = publish_track.await;
1405            this.upgrade()
1406                .ok_or_else(|| anyhow!("room was dropped"))?
1407                .update(&mut cx, |this, cx| {
1408                    let live_kit = this
1409                        .live_kit
1410                        .as_mut()
1411                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1412
1413                    let (canceled, muted) = if let LocalTrack::Pending {
1414                        publish_id: cur_publish_id,
1415                        muted,
1416                    } = &live_kit.screen_track
1417                    {
1418                        (*cur_publish_id != publish_id, *muted)
1419                    } else {
1420                        (true, false)
1421                    };
1422
1423                    match publication {
1424                        Ok(publication) => {
1425                            if canceled {
1426                                live_kit.room.unpublish_track(publication);
1427                            } else {
1428                                if muted {
1429                                    cx.background_executor()
1430                                        .spawn(publication.set_mute(muted))
1431                                        .detach();
1432                                }
1433                                live_kit.screen_track = LocalTrack::Published {
1434                                    track_publication: publication,
1435                                    muted,
1436                                };
1437                                cx.notify();
1438                            }
1439
1440                            Audio::play_sound(Sound::StartScreenshare, cx);
1441
1442                            Ok(())
1443                        }
1444                        Err(error) => {
1445                            if canceled {
1446                                Ok(())
1447                            } else {
1448                                live_kit.screen_track = LocalTrack::None;
1449                                cx.notify();
1450                                Err(error)
1451                            }
1452                        }
1453                    }
1454                })?
1455        })
1456    }
1457
1458    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1459        let should_mute = !self.is_muted(cx);
1460        if let Some(live_kit) = self.live_kit.as_mut() {
1461            if matches!(live_kit.microphone_track, LocalTrack::None) {
1462                return Ok(self.share_microphone(cx));
1463            }
1464
1465            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1466            live_kit.muted_by_user = should_mute;
1467
1468            if old_muted == true && live_kit.deafened == true {
1469                if let Some(task) = self.toggle_deafen(cx).ok() {
1470                    task.detach();
1471                }
1472            }
1473
1474            Ok(ret_task)
1475        } else {
1476            Err(anyhow!("LiveKit not started"))
1477        }
1478    }
1479
1480    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1481        if let Some(live_kit) = self.live_kit.as_mut() {
1482            (*live_kit).deafened = !live_kit.deafened;
1483
1484            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1485            // Context notification is sent within set_mute itself.
1486            let mut mute_task = None;
1487            // When deafening, mute user's mic as well.
1488            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1489            if live_kit.deafened || !live_kit.muted_by_user {
1490                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1491            };
1492            for participant in self.remote_participants.values() {
1493                for track in live_kit
1494                    .room
1495                    .remote_audio_track_publications(&participant.user.id.to_string())
1496                {
1497                    let deafened = live_kit.deafened;
1498                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1499                }
1500            }
1501
1502            Ok(cx.foreground_executor().spawn(async move {
1503                if let Some(mute_task) = mute_task {
1504                    mute_task.await?;
1505                }
1506                for task in tasks {
1507                    task.await?;
1508                }
1509                Ok(())
1510            }))
1511        } else {
1512            Err(anyhow!("LiveKit not started"))
1513        }
1514    }
1515
1516    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1517        if self.status.is_offline() {
1518            return Err(anyhow!("room is offline"));
1519        }
1520
1521        let live_kit = self
1522            .live_kit
1523            .as_mut()
1524            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1525        match mem::take(&mut live_kit.screen_track) {
1526            LocalTrack::None => Err(anyhow!("screen was not shared")),
1527            LocalTrack::Pending { .. } => {
1528                cx.notify();
1529                Ok(())
1530            }
1531            LocalTrack::Published {
1532                track_publication, ..
1533            } => {
1534                live_kit.room.unpublish_track(track_publication);
1535                cx.notify();
1536
1537                Audio::play_sound(Sound::StopScreenshare, cx);
1538                Ok(())
1539            }
1540        }
1541    }
1542
1543    #[cfg(any(test, feature = "test-support"))]
1544    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1545        self.live_kit
1546            .as_ref()
1547            .unwrap()
1548            .room
1549            .set_display_sources(sources);
1550    }
1551}
1552
1553struct LiveKitRoom {
1554    room: Arc<live_kit_client::Room>,
1555    screen_track: LocalTrack,
1556    microphone_track: LocalTrack,
1557    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1558    muted_by_user: bool,
1559    deafened: bool,
1560    speaking: bool,
1561    next_publish_id: usize,
1562    _maintain_room: Task<()>,
1563    _maintain_tracks: [Task<()>; 2],
1564}
1565
1566impl LiveKitRoom {
1567    fn set_mute(
1568        self: &mut LiveKitRoom,
1569        should_mute: bool,
1570        cx: &mut ModelContext<Room>,
1571    ) -> Result<(Task<Result<()>>, bool)> {
1572        if !should_mute {
1573            // clear user muting state.
1574            self.muted_by_user = false;
1575        }
1576
1577        let (result, old_muted) = match &mut self.microphone_track {
1578            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1579            LocalTrack::Pending { muted, .. } => {
1580                let old_muted = *muted;
1581                *muted = should_mute;
1582                cx.notify();
1583                Ok((Task::Ready(Some(Ok(()))), old_muted))
1584            }
1585            LocalTrack::Published {
1586                track_publication,
1587                muted,
1588            } => {
1589                let old_muted = *muted;
1590                *muted = should_mute;
1591                cx.notify();
1592                Ok((
1593                    cx.background_executor()
1594                        .spawn(track_publication.set_mute(*muted)),
1595                    old_muted,
1596                ))
1597            }
1598        }?;
1599
1600        if old_muted != should_mute {
1601            if should_mute {
1602                Audio::play_sound(Sound::Mute, cx);
1603            } else {
1604                Audio::play_sound(Sound::Unmute, cx);
1605            }
1606        }
1607
1608        Ok((result, old_muted))
1609    }
1610}
1611
1612enum LocalTrack {
1613    None,
1614    Pending {
1615        publish_id: usize,
1616        muted: bool,
1617    },
1618    Published {
1619        track_publication: LocalTrackPublication,
1620        muted: bool,
1621    },
1622}
1623
1624impl Default for LocalTrack {
1625    fn default() -> Self {
1626        Self::None
1627    }
1628}
1629
1630#[derive(Copy, Clone, PartialEq, Eq)]
1631pub enum RoomStatus {
1632    Online,
1633    Rejoining,
1634    Offline,
1635}
1636
1637impl RoomStatus {
1638    pub fn is_offline(&self) -> bool {
1639        matches!(self, RoomStatus::Offline)
1640    }
1641
1642    pub fn is_online(&self) -> bool {
1643        matches!(self, RoomStatus::Online)
1644    }
1645}