room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::{sink::Sink, stream::Stream, watch};
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    RemoteProjectJoined {
  48        project_id: u64,
  49    },
  50    RemoteProjectInvitationDiscarded {
  51        project_id: u64,
  52    },
  53    Left,
  54}
  55
  56pub struct Room {
  57    id: u64,
  58    channel_id: Option<u64>,
  59    live_kit: Option<LiveKitRoom>,
  60    status: RoomStatus,
  61    shared_projects: HashSet<WeakModelHandle<Project>>,
  62    joined_projects: HashSet<WeakModelHandle<Project>>,
  63    local_participant: LocalParticipant,
  64    remote_participants: BTreeMap<u64, RemoteParticipant>,
  65    pending_participants: Vec<Arc<User>>,
  66    participant_user_ids: HashSet<u64>,
  67    pending_call_count: usize,
  68    leave_when_empty: bool,
  69    client: Arc<Client>,
  70    user_store: ModelHandle<UserStore>,
  71    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  72    subscriptions: Vec<client::Subscription>,
  73    room_update_completed_tx: watch::Sender<Option<()>>,
  74    room_update_completed_rx: watch::Receiver<Option<()>>,
  75    pending_room_update: Option<Task<()>>,
  76    maintain_connection: Option<Task<Option<()>>>,
  77}
  78
  79impl Entity for Room {
  80    type Event = Event;
  81
  82    fn release(&mut self, cx: &mut AppContext) {
  83        if self.status.is_online() {
  84            self.leave_internal(cx).detach_and_log_err(cx);
  85        }
  86    }
  87
  88    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  89        if self.status.is_online() {
  90            let leave = self.leave_internal(cx);
  91            Some(
  92                cx.background()
  93                    .spawn(async move {
  94                        leave.await.log_err();
  95                    })
  96                    .boxed(),
  97            )
  98        } else {
  99            None
 100        }
 101    }
 102}
 103
 104impl Room {
 105    pub fn channel_id(&self) -> Option<u64> {
 106        self.channel_id
 107    }
 108
 109    pub fn is_sharing_project(&self) -> bool {
 110        !self.shared_projects.is_empty()
 111    }
 112
 113    #[cfg(any(test, feature = "test-support"))]
 114    pub fn is_connected(&self) -> bool {
 115        if let Some(live_kit) = self.live_kit.as_ref() {
 116            matches!(
 117                *live_kit.room.status().borrow(),
 118                live_kit_client::ConnectionState::Connected { .. }
 119            )
 120        } else {
 121            false
 122        }
 123    }
 124
 125    fn new(
 126        id: u64,
 127        channel_id: Option<u64>,
 128        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 129        client: Arc<Client>,
 130        user_store: ModelHandle<UserStore>,
 131        cx: &mut ModelContext<Self>,
 132    ) -> Self {
 133        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 134            let room = live_kit_client::Room::new();
 135            let mut status = room.status();
 136            // Consume the initial status of the room.
 137            let _ = status.try_recv();
 138            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 139                while let Some(status) = status.next().await {
 140                    let this = if let Some(this) = this.upgrade(&cx) {
 141                        this
 142                    } else {
 143                        break;
 144                    };
 145
 146                    if status == live_kit_client::ConnectionState::Disconnected {
 147                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 148                        break;
 149                    }
 150                }
 151            });
 152
 153            let mut track_video_changes = room.remote_video_track_updates();
 154            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 155                while let Some(track_change) = track_video_changes.next().await {
 156                    let this = if let Some(this) = this.upgrade(&cx) {
 157                        this
 158                    } else {
 159                        break;
 160                    };
 161
 162                    this.update(&mut cx, |this, cx| {
 163                        this.remote_video_track_updated(track_change, cx).log_err()
 164                    });
 165                }
 166            });
 167
 168            let mut track_audio_changes = room.remote_audio_track_updates();
 169            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 170                while let Some(track_change) = track_audio_changes.next().await {
 171                    let this = if let Some(this) = this.upgrade(&cx) {
 172                        this
 173                    } else {
 174                        break;
 175                    };
 176
 177                    this.update(&mut cx, |this, cx| {
 178                        this.remote_audio_track_updated(track_change, cx).log_err()
 179                    });
 180                }
 181            });
 182
 183            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 184            cx.spawn(|this, mut cx| async move {
 185                connect.await?;
 186
 187                if !cx.read(Self::mute_on_join) {
 188                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 189                        .await?;
 190                }
 191
 192                anyhow::Ok(())
 193            })
 194            .detach_and_log_err(cx);
 195
 196            Some(LiveKitRoom {
 197                room,
 198                screen_track: LocalTrack::None,
 199                microphone_track: LocalTrack::None,
 200                next_publish_id: 0,
 201                muted_by_user: false,
 202                deafened: false,
 203                speaking: false,
 204                _maintain_room,
 205                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 206            })
 207        } else {
 208            None
 209        };
 210
 211        let maintain_connection =
 212            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 213
 214        Audio::play_sound(Sound::Joined, cx);
 215
 216        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 217
 218        Self {
 219            id,
 220            channel_id,
 221            live_kit: live_kit_room,
 222            status: RoomStatus::Online,
 223            shared_projects: Default::default(),
 224            joined_projects: Default::default(),
 225            participant_user_ids: Default::default(),
 226            local_participant: Default::default(),
 227            remote_participants: Default::default(),
 228            pending_participants: Default::default(),
 229            pending_call_count: 0,
 230            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 231            leave_when_empty: false,
 232            pending_room_update: None,
 233            client,
 234            user_store,
 235            follows_by_leader_id_project_id: Default::default(),
 236            maintain_connection: Some(maintain_connection),
 237            room_update_completed_tx,
 238            room_update_completed_rx,
 239        }
 240    }
 241
 242    pub(crate) fn create(
 243        called_user_id: u64,
 244        initial_project: Option<ModelHandle<Project>>,
 245        client: Arc<Client>,
 246        user_store: ModelHandle<UserStore>,
 247        cx: &mut AppContext,
 248    ) -> Task<Result<ModelHandle<Self>>> {
 249        cx.spawn(|mut cx| async move {
 250            let response = client.request(proto::CreateRoom {}).await?;
 251            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 252            let room = cx.add_model(|cx| {
 253                Self::new(
 254                    room_proto.id,
 255                    None,
 256                    response.live_kit_connection_info,
 257                    client,
 258                    user_store,
 259                    cx,
 260                )
 261            });
 262
 263            let initial_project_id = if let Some(initial_project) = initial_project {
 264                let initial_project_id = room
 265                    .update(&mut cx, |room, cx| {
 266                        room.share_project(initial_project.clone(), cx)
 267                    })
 268                    .await?;
 269                Some(initial_project_id)
 270            } else {
 271                None
 272            };
 273
 274            match room
 275                .update(&mut cx, |room, cx| {
 276                    room.leave_when_empty = true;
 277                    room.call(called_user_id, initial_project_id, cx)
 278                })
 279                .await
 280            {
 281                Ok(()) => Ok(room),
 282                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 283            }
 284        })
 285    }
 286
 287    pub(crate) fn join_channel(
 288        channel_id: u64,
 289        client: Arc<Client>,
 290        user_store: ModelHandle<UserStore>,
 291        cx: &mut AppContext,
 292    ) -> Task<Result<ModelHandle<Self>>> {
 293        cx.spawn(|cx| async move {
 294            Self::from_join_response(
 295                client.request(proto::JoinChannel { channel_id }).await?,
 296                client,
 297                user_store,
 298                cx,
 299            )
 300        })
 301    }
 302
 303    pub(crate) fn join(
 304        call: &IncomingCall,
 305        client: Arc<Client>,
 306        user_store: ModelHandle<UserStore>,
 307        cx: &mut AppContext,
 308    ) -> Task<Result<ModelHandle<Self>>> {
 309        let id = call.room_id;
 310        cx.spawn(|cx| async move {
 311            Self::from_join_response(
 312                client.request(proto::JoinRoom { id }).await?,
 313                client,
 314                user_store,
 315                cx,
 316            )
 317        })
 318    }
 319
 320    pub fn mute_on_join(cx: &AppContext) -> bool {
 321        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 322    }
 323
 324    fn from_join_response(
 325        response: proto::JoinRoomResponse,
 326        client: Arc<Client>,
 327        user_store: ModelHandle<UserStore>,
 328        mut cx: AsyncAppContext,
 329    ) -> Result<ModelHandle<Self>> {
 330        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 331        let room = cx.add_model(|cx| {
 332            Self::new(
 333                room_proto.id,
 334                response.channel_id,
 335                response.live_kit_connection_info,
 336                client,
 337                user_store,
 338                cx,
 339            )
 340        });
 341        room.update(&mut cx, |room, cx| {
 342            room.leave_when_empty = room.channel_id.is_none();
 343            room.apply_room_update(room_proto, cx)?;
 344            anyhow::Ok(())
 345        })?;
 346        Ok(room)
 347    }
 348
 349    fn should_leave(&self) -> bool {
 350        self.leave_when_empty
 351            && self.pending_room_update.is_none()
 352            && self.pending_participants.is_empty()
 353            && self.remote_participants.is_empty()
 354            && self.pending_call_count == 0
 355    }
 356
 357    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 358        cx.notify();
 359        cx.emit(Event::Left);
 360        self.leave_internal(cx)
 361    }
 362
 363    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 364        if self.status.is_offline() {
 365            return Task::ready(Err(anyhow!("room is offline")));
 366        }
 367
 368        log::info!("leaving room");
 369        Audio::play_sound(Sound::Leave, cx);
 370
 371        self.clear_state(cx);
 372
 373        let leave_room = self.client.request(proto::LeaveRoom {});
 374        cx.background().spawn(async move {
 375            leave_room.await?;
 376            anyhow::Ok(())
 377        })
 378    }
 379
 380    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 381        for project in self.shared_projects.drain() {
 382            if let Some(project) = project.upgrade(cx) {
 383                project.update(cx, |project, cx| {
 384                    project.unshare(cx).log_err();
 385                });
 386            }
 387        }
 388        for project in self.joined_projects.drain() {
 389            if let Some(project) = project.upgrade(cx) {
 390                project.update(cx, |project, cx| {
 391                    project.disconnected_from_host(cx);
 392                    project.close(cx);
 393                });
 394            }
 395        }
 396
 397        self.status = RoomStatus::Offline;
 398        self.remote_participants.clear();
 399        self.pending_participants.clear();
 400        self.participant_user_ids.clear();
 401        self.subscriptions.clear();
 402        self.live_kit.take();
 403        self.pending_room_update.take();
 404        self.maintain_connection.take();
 405    }
 406
 407    async fn maintain_connection(
 408        this: WeakModelHandle<Self>,
 409        client: Arc<Client>,
 410        mut cx: AsyncAppContext,
 411    ) -> Result<()> {
 412        let mut client_status = client.status();
 413        loop {
 414            let _ = client_status.try_recv();
 415            let is_connected = client_status.borrow().is_connected();
 416            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 417            if !is_connected || client_status.next().await.is_some() {
 418                log::info!("detected client disconnection");
 419
 420                this.upgrade(&cx)
 421                    .ok_or_else(|| anyhow!("room was dropped"))?
 422                    .update(&mut cx, |this, cx| {
 423                        this.status = RoomStatus::Rejoining;
 424                        cx.notify();
 425                    });
 426
 427                // Wait for client to re-establish a connection to the server.
 428                {
 429                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 430                    let client_reconnection = async {
 431                        let mut remaining_attempts = 3;
 432                        while remaining_attempts > 0 {
 433                            if client_status.borrow().is_connected() {
 434                                log::info!("client reconnected, attempting to rejoin room");
 435
 436                                let Some(this) = this.upgrade(&cx) else { break };
 437                                if this
 438                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 439                                    .await
 440                                    .log_err()
 441                                    .is_some()
 442                                {
 443                                    return true;
 444                                } else {
 445                                    remaining_attempts -= 1;
 446                                }
 447                            } else if client_status.borrow().is_signed_out() {
 448                                return false;
 449                            }
 450
 451                            log::info!(
 452                                "waiting for client status change, remaining attempts {}",
 453                                remaining_attempts
 454                            );
 455                            client_status.next().await;
 456                        }
 457                        false
 458                    }
 459                    .fuse();
 460                    futures::pin_mut!(client_reconnection);
 461
 462                    futures::select_biased! {
 463                        reconnected = client_reconnection => {
 464                            if reconnected {
 465                                log::info!("successfully reconnected to room");
 466                                // If we successfully joined the room, go back around the loop
 467                                // waiting for future connection status changes.
 468                                continue;
 469                            }
 470                        }
 471                        _ = reconnection_timeout => {
 472                            log::info!("room reconnection timeout expired");
 473                        }
 474                    }
 475                }
 476
 477                break;
 478            }
 479        }
 480
 481        // The client failed to re-establish a connection to the server
 482        // or an error occurred while trying to re-join the room. Either way
 483        // we leave the room and return an error.
 484        if let Some(this) = this.upgrade(&cx) {
 485            log::info!("reconnection failed, leaving room");
 486            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 487        }
 488        Err(anyhow!(
 489            "can't reconnect to room: client failed to re-establish connection"
 490        ))
 491    }
 492
 493    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 494        let mut projects = HashMap::default();
 495        let mut reshared_projects = Vec::new();
 496        let mut rejoined_projects = Vec::new();
 497        self.shared_projects.retain(|project| {
 498            if let Some(handle) = project.upgrade(cx) {
 499                let project = handle.read(cx);
 500                if let Some(project_id) = project.remote_id() {
 501                    projects.insert(project_id, handle.clone());
 502                    reshared_projects.push(proto::UpdateProject {
 503                        project_id,
 504                        worktrees: project.worktree_metadata_protos(cx),
 505                    });
 506                    return true;
 507                }
 508            }
 509            false
 510        });
 511        self.joined_projects.retain(|project| {
 512            if let Some(handle) = project.upgrade(cx) {
 513                let project = handle.read(cx);
 514                if let Some(project_id) = project.remote_id() {
 515                    projects.insert(project_id, handle.clone());
 516                    rejoined_projects.push(proto::RejoinProject {
 517                        id: project_id,
 518                        worktrees: project
 519                            .worktrees(cx)
 520                            .map(|worktree| {
 521                                let worktree = worktree.read(cx);
 522                                proto::RejoinWorktree {
 523                                    id: worktree.id().to_proto(),
 524                                    scan_id: worktree.completed_scan_id() as u64,
 525                                }
 526                            })
 527                            .collect(),
 528                    });
 529                }
 530                return true;
 531            }
 532            false
 533        });
 534
 535        let response = self.client.request_envelope(proto::RejoinRoom {
 536            id: self.id,
 537            reshared_projects,
 538            rejoined_projects,
 539        });
 540
 541        cx.spawn(|this, mut cx| async move {
 542            let response = response.await?;
 543            let message_id = response.message_id;
 544            let response = response.payload;
 545            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 546            this.update(&mut cx, |this, cx| {
 547                this.status = RoomStatus::Online;
 548                this.apply_room_update(room_proto, cx)?;
 549
 550                for reshared_project in response.reshared_projects {
 551                    if let Some(project) = projects.get(&reshared_project.id) {
 552                        project.update(cx, |project, cx| {
 553                            project.reshared(reshared_project, cx).log_err();
 554                        });
 555                    }
 556                }
 557
 558                for rejoined_project in response.rejoined_projects {
 559                    if let Some(project) = projects.get(&rejoined_project.id) {
 560                        project.update(cx, |project, cx| {
 561                            project.rejoined(rejoined_project, message_id, cx).log_err();
 562                        });
 563                    }
 564                }
 565
 566                anyhow::Ok(())
 567            })
 568        })
 569    }
 570
 571    pub fn id(&self) -> u64 {
 572        self.id
 573    }
 574
 575    pub fn status(&self) -> RoomStatus {
 576        self.status
 577    }
 578
 579    pub fn local_participant(&self) -> &LocalParticipant {
 580        &self.local_participant
 581    }
 582
 583    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 584        &self.remote_participants
 585    }
 586
 587    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 588        self.remote_participants
 589            .values()
 590            .find(|p| p.peer_id == peer_id)
 591    }
 592
 593    pub fn pending_participants(&self) -> &[Arc<User>] {
 594        &self.pending_participants
 595    }
 596
 597    pub fn contains_participant(&self, user_id: u64) -> bool {
 598        self.participant_user_ids.contains(&user_id)
 599    }
 600
 601    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 602        self.follows_by_leader_id_project_id
 603            .get(&(leader_id, project_id))
 604            .map_or(&[], |v| v.as_slice())
 605    }
 606
 607    /// Returns the most 'active' projects, defined as most people in the project
 608    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 609        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 610        for participant in self.remote_participants.values() {
 611            match participant.location {
 612                ParticipantLocation::SharedProject { project_id } => {
 613                    project_hosts_and_guest_counts
 614                        .entry(project_id)
 615                        .or_default()
 616                        .1 += 1;
 617                }
 618                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 619            }
 620            for project in &participant.projects {
 621                project_hosts_and_guest_counts
 622                    .entry(project.id)
 623                    .or_default()
 624                    .0 = Some(participant.user.id);
 625            }
 626        }
 627
 628        if let Some(user) = self.user_store.read(cx).current_user() {
 629            for project in &self.local_participant.projects {
 630                project_hosts_and_guest_counts
 631                    .entry(project.id)
 632                    .or_default()
 633                    .0 = Some(user.id);
 634            }
 635        }
 636
 637        project_hosts_and_guest_counts
 638            .into_iter()
 639            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 640            .max_by_key(|(_, _, guest_count)| *guest_count)
 641            .map(|(id, host, _)| (id, host))
 642    }
 643
 644    async fn handle_room_updated(
 645        this: ModelHandle<Self>,
 646        envelope: TypedEnvelope<proto::RoomUpdated>,
 647        _: Arc<Client>,
 648        mut cx: AsyncAppContext,
 649    ) -> Result<()> {
 650        let room = envelope
 651            .payload
 652            .room
 653            .ok_or_else(|| anyhow!("invalid room"))?;
 654        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 655    }
 656
 657    fn apply_room_update(
 658        &mut self,
 659        mut room: proto::Room,
 660        cx: &mut ModelContext<Self>,
 661    ) -> Result<()> {
 662        // Filter ourselves out from the room's participants.
 663        let local_participant_ix = room
 664            .participants
 665            .iter()
 666            .position(|participant| Some(participant.user_id) == self.client.user_id());
 667        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 668
 669        let pending_participant_user_ids = room
 670            .pending_participants
 671            .iter()
 672            .map(|p| p.user_id)
 673            .collect::<Vec<_>>();
 674
 675        let remote_participant_user_ids = room
 676            .participants
 677            .iter()
 678            .map(|p| p.user_id)
 679            .collect::<Vec<_>>();
 680
 681        let (remote_participants, pending_participants) =
 682            self.user_store.update(cx, move |user_store, cx| {
 683                (
 684                    user_store.get_users(remote_participant_user_ids, cx),
 685                    user_store.get_users(pending_participant_user_ids, cx),
 686                )
 687            });
 688
 689        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 690            let (remote_participants, pending_participants) =
 691                futures::join!(remote_participants, pending_participants);
 692
 693            this.update(&mut cx, |this, cx| {
 694                this.participant_user_ids.clear();
 695
 696                if let Some(participant) = local_participant {
 697                    this.local_participant.projects = participant.projects;
 698                } else {
 699                    this.local_participant.projects.clear();
 700                }
 701
 702                if let Some(participants) = remote_participants.log_err() {
 703                    for (participant, user) in room.participants.into_iter().zip(participants) {
 704                        let Some(peer_id) = participant.peer_id else {
 705                            continue;
 706                        };
 707                        let participant_index = ParticipantIndex(participant.participant_index);
 708                        this.participant_user_ids.insert(participant.user_id);
 709
 710                        let old_projects = this
 711                            .remote_participants
 712                            .get(&participant.user_id)
 713                            .into_iter()
 714                            .flat_map(|existing| &existing.projects)
 715                            .map(|project| project.id)
 716                            .collect::<HashSet<_>>();
 717                        let new_projects = participant
 718                            .projects
 719                            .iter()
 720                            .map(|project| project.id)
 721                            .collect::<HashSet<_>>();
 722
 723                        for project in &participant.projects {
 724                            if !old_projects.contains(&project.id) {
 725                                cx.emit(Event::RemoteProjectShared {
 726                                    owner: user.clone(),
 727                                    project_id: project.id,
 728                                    worktree_root_names: project.worktree_root_names.clone(),
 729                                });
 730                            }
 731                        }
 732
 733                        for unshared_project_id in old_projects.difference(&new_projects) {
 734                            this.joined_projects.retain(|project| {
 735                                if let Some(project) = project.upgrade(cx) {
 736                                    project.update(cx, |project, cx| {
 737                                        if project.remote_id() == Some(*unshared_project_id) {
 738                                            project.disconnected_from_host(cx);
 739                                            false
 740                                        } else {
 741                                            true
 742                                        }
 743                                    })
 744                                } else {
 745                                    false
 746                                }
 747                            });
 748                            cx.emit(Event::RemoteProjectUnshared {
 749                                project_id: *unshared_project_id,
 750                            });
 751                        }
 752
 753                        let location = ParticipantLocation::from_proto(participant.location)
 754                            .unwrap_or(ParticipantLocation::External);
 755                        if let Some(remote_participant) =
 756                            this.remote_participants.get_mut(&participant.user_id)
 757                        {
 758                            remote_participant.peer_id = peer_id;
 759                            remote_participant.projects = participant.projects;
 760                            remote_participant.participant_index = participant_index;
 761                            if location != remote_participant.location {
 762                                remote_participant.location = location;
 763                                cx.emit(Event::ParticipantLocationChanged {
 764                                    participant_id: peer_id,
 765                                });
 766                            }
 767                        } else {
 768                            this.remote_participants.insert(
 769                                participant.user_id,
 770                                RemoteParticipant {
 771                                    user: user.clone(),
 772                                    participant_index,
 773                                    peer_id,
 774                                    projects: participant.projects,
 775                                    location,
 776                                    muted: true,
 777                                    speaking: false,
 778                                    video_tracks: Default::default(),
 779                                    audio_tracks: Default::default(),
 780                                },
 781                            );
 782
 783                            Audio::play_sound(Sound::Joined, cx);
 784
 785                            if let Some(live_kit) = this.live_kit.as_ref() {
 786                                let video_tracks =
 787                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 788                                let audio_tracks =
 789                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 790                                let publications = live_kit
 791                                    .room
 792                                    .remote_audio_track_publications(&user.id.to_string());
 793
 794                                for track in video_tracks {
 795                                    this.remote_video_track_updated(
 796                                        RemoteVideoTrackUpdate::Subscribed(track),
 797                                        cx,
 798                                    )
 799                                    .log_err();
 800                                }
 801
 802                                for (track, publication) in
 803                                    audio_tracks.iter().zip(publications.iter())
 804                                {
 805                                    this.remote_audio_track_updated(
 806                                        RemoteAudioTrackUpdate::Subscribed(
 807                                            track.clone(),
 808                                            publication.clone(),
 809                                        ),
 810                                        cx,
 811                                    )
 812                                    .log_err();
 813                                }
 814                            }
 815                        }
 816                    }
 817
 818                    this.remote_participants.retain(|user_id, participant| {
 819                        if this.participant_user_ids.contains(user_id) {
 820                            true
 821                        } else {
 822                            for project in &participant.projects {
 823                                cx.emit(Event::RemoteProjectUnshared {
 824                                    project_id: project.id,
 825                                });
 826                            }
 827                            false
 828                        }
 829                    });
 830                }
 831
 832                if let Some(pending_participants) = pending_participants.log_err() {
 833                    this.pending_participants = pending_participants;
 834                    for participant in &this.pending_participants {
 835                        this.participant_user_ids.insert(participant.id);
 836                    }
 837                }
 838
 839                this.follows_by_leader_id_project_id.clear();
 840                for follower in room.followers {
 841                    let project_id = follower.project_id;
 842                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 843                        (Some(leader), Some(follower)) => (leader, follower),
 844
 845                        _ => {
 846                            log::error!("Follower message {follower:?} missing some state");
 847                            continue;
 848                        }
 849                    };
 850
 851                    let list = this
 852                        .follows_by_leader_id_project_id
 853                        .entry((leader, project_id))
 854                        .or_insert(Vec::new());
 855                    if !list.contains(&follower) {
 856                        list.push(follower);
 857                    }
 858                }
 859
 860                this.pending_room_update.take();
 861                if this.should_leave() {
 862                    log::info!("room is empty, leaving");
 863                    let _ = this.leave(cx);
 864                }
 865
 866                this.user_store.update(cx, |user_store, cx| {
 867                    let participant_indices_by_user_id = this
 868                        .remote_participants
 869                        .iter()
 870                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 871                        .collect();
 872                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 873                });
 874
 875                this.check_invariants();
 876                this.room_update_completed_tx.try_send(Some(())).ok();
 877                cx.notify();
 878            });
 879        }));
 880
 881        cx.notify();
 882        Ok(())
 883    }
 884
 885    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 886        let mut done_rx = self.room_update_completed_rx.clone();
 887        async move {
 888            while let Some(result) = done_rx.next().await {
 889                if result.is_some() {
 890                    break;
 891                }
 892            }
 893        }
 894    }
 895
 896    fn remote_video_track_updated(
 897        &mut self,
 898        change: RemoteVideoTrackUpdate,
 899        cx: &mut ModelContext<Self>,
 900    ) -> Result<()> {
 901        match change {
 902            RemoteVideoTrackUpdate::Subscribed(track) => {
 903                let user_id = track.publisher_id().parse()?;
 904                let track_id = track.sid().to_string();
 905                let participant = self
 906                    .remote_participants
 907                    .get_mut(&user_id)
 908                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 909                participant.video_tracks.insert(
 910                    track_id.clone(),
 911                    Arc::new(RemoteVideoTrack {
 912                        live_kit_track: track,
 913                    }),
 914                );
 915                cx.emit(Event::RemoteVideoTracksChanged {
 916                    participant_id: participant.peer_id,
 917                });
 918            }
 919            RemoteVideoTrackUpdate::Unsubscribed {
 920                publisher_id,
 921                track_id,
 922            } => {
 923                let user_id = publisher_id.parse()?;
 924                let participant = self
 925                    .remote_participants
 926                    .get_mut(&user_id)
 927                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 928                participant.video_tracks.remove(&track_id);
 929                cx.emit(Event::RemoteVideoTracksChanged {
 930                    participant_id: participant.peer_id,
 931                });
 932            }
 933        }
 934
 935        cx.notify();
 936        Ok(())
 937    }
 938
 939    fn remote_audio_track_updated(
 940        &mut self,
 941        change: RemoteAudioTrackUpdate,
 942        cx: &mut ModelContext<Self>,
 943    ) -> Result<()> {
 944        match change {
 945            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 946                let mut speaker_ids = speakers
 947                    .into_iter()
 948                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 949                    .collect::<Vec<u64>>();
 950                speaker_ids.sort_unstable();
 951                for (sid, participant) in &mut self.remote_participants {
 952                    if let Ok(_) = speaker_ids.binary_search(sid) {
 953                        participant.speaking = true;
 954                    } else {
 955                        participant.speaking = false;
 956                    }
 957                }
 958                if let Some(id) = self.client.user_id() {
 959                    if let Some(room) = &mut self.live_kit {
 960                        if let Ok(_) = speaker_ids.binary_search(&id) {
 961                            room.speaking = true;
 962                        } else {
 963                            room.speaking = false;
 964                        }
 965                    }
 966                }
 967                cx.notify();
 968            }
 969            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 970                let mut found = false;
 971                for participant in &mut self.remote_participants.values_mut() {
 972                    for track in participant.audio_tracks.values() {
 973                        if track.sid() == track_id {
 974                            found = true;
 975                            break;
 976                        }
 977                    }
 978                    if found {
 979                        participant.muted = muted;
 980                        break;
 981                    }
 982                }
 983
 984                cx.notify();
 985            }
 986            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 987                let user_id = track.publisher_id().parse()?;
 988                let track_id = track.sid().to_string();
 989                let participant = self
 990                    .remote_participants
 991                    .get_mut(&user_id)
 992                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 993
 994                participant.audio_tracks.insert(track_id.clone(), track);
 995                participant.muted = publication.is_muted();
 996
 997                cx.emit(Event::RemoteAudioTracksChanged {
 998                    participant_id: participant.peer_id,
 999                });
1000            }
1001            RemoteAudioTrackUpdate::Unsubscribed {
1002                publisher_id,
1003                track_id,
1004            } => {
1005                let user_id = publisher_id.parse()?;
1006                let participant = self
1007                    .remote_participants
1008                    .get_mut(&user_id)
1009                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1010                participant.audio_tracks.remove(&track_id);
1011                cx.emit(Event::RemoteAudioTracksChanged {
1012                    participant_id: participant.peer_id,
1013                });
1014            }
1015        }
1016
1017        cx.notify();
1018        Ok(())
1019    }
1020
1021    fn check_invariants(&self) {
1022        #[cfg(any(test, feature = "test-support"))]
1023        {
1024            for participant in self.remote_participants.values() {
1025                assert!(self.participant_user_ids.contains(&participant.user.id));
1026                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1027            }
1028
1029            for participant in &self.pending_participants {
1030                assert!(self.participant_user_ids.contains(&participant.id));
1031                assert_ne!(participant.id, self.client.user_id().unwrap());
1032            }
1033
1034            assert_eq!(
1035                self.participant_user_ids.len(),
1036                self.remote_participants.len() + self.pending_participants.len()
1037            );
1038        }
1039    }
1040
1041    pub(crate) fn call(
1042        &mut self,
1043        called_user_id: u64,
1044        initial_project_id: Option<u64>,
1045        cx: &mut ModelContext<Self>,
1046    ) -> Task<Result<()>> {
1047        if self.status.is_offline() {
1048            return Task::ready(Err(anyhow!("room is offline")));
1049        }
1050
1051        cx.notify();
1052        let client = self.client.clone();
1053        let room_id = self.id;
1054        self.pending_call_count += 1;
1055        cx.spawn(|this, mut cx| async move {
1056            let result = client
1057                .request(proto::Call {
1058                    room_id,
1059                    called_user_id,
1060                    initial_project_id,
1061                })
1062                .await;
1063            this.update(&mut cx, |this, cx| {
1064                this.pending_call_count -= 1;
1065                if this.should_leave() {
1066                    this.leave(cx).detach_and_log_err(cx);
1067                }
1068            });
1069            result?;
1070            Ok(())
1071        })
1072    }
1073
1074    pub fn join_project(
1075        &mut self,
1076        id: u64,
1077        language_registry: Arc<LanguageRegistry>,
1078        fs: Arc<dyn Fs>,
1079        cx: &mut ModelContext<Self>,
1080    ) -> Task<Result<ModelHandle<Project>>> {
1081        let client = self.client.clone();
1082        let user_store = self.user_store.clone();
1083        cx.emit(Event::RemoteProjectJoined { project_id: id });
1084        cx.spawn(|this, mut cx| async move {
1085            let project =
1086                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1087
1088            this.update(&mut cx, |this, cx| {
1089                this.joined_projects.retain(|project| {
1090                    if let Some(project) = project.upgrade(cx) {
1091                        !project.read(cx).is_read_only()
1092                    } else {
1093                        false
1094                    }
1095                });
1096                this.joined_projects.insert(project.downgrade());
1097            });
1098            Ok(project)
1099        })
1100    }
1101
1102    pub(crate) fn share_project(
1103        &mut self,
1104        project: ModelHandle<Project>,
1105        cx: &mut ModelContext<Self>,
1106    ) -> Task<Result<u64>> {
1107        if let Some(project_id) = project.read(cx).remote_id() {
1108            return Task::ready(Ok(project_id));
1109        }
1110
1111        let request = self.client.request(proto::ShareProject {
1112            room_id: self.id(),
1113            worktrees: project.read(cx).worktree_metadata_protos(cx),
1114        });
1115        cx.spawn(|this, mut cx| async move {
1116            let response = request.await?;
1117
1118            project.update(&mut cx, |project, cx| {
1119                project.shared(response.project_id, cx)
1120            })?;
1121
1122            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1123            this.update(&mut cx, |this, cx| {
1124                this.shared_projects.insert(project.downgrade());
1125                let active_project = this.local_participant.active_project.as_ref();
1126                if active_project.map_or(false, |location| *location == project) {
1127                    this.set_location(Some(&project), cx)
1128                } else {
1129                    Task::ready(Ok(()))
1130                }
1131            })
1132            .await?;
1133
1134            Ok(response.project_id)
1135        })
1136    }
1137
1138    pub(crate) fn unshare_project(
1139        &mut self,
1140        project: ModelHandle<Project>,
1141        cx: &mut ModelContext<Self>,
1142    ) -> Result<()> {
1143        let project_id = match project.read(cx).remote_id() {
1144            Some(project_id) => project_id,
1145            None => return Ok(()),
1146        };
1147
1148        self.client.send(proto::UnshareProject { project_id })?;
1149        project.update(cx, |this, cx| this.unshare(cx))
1150    }
1151
1152    pub(crate) fn set_location(
1153        &mut self,
1154        project: Option<&ModelHandle<Project>>,
1155        cx: &mut ModelContext<Self>,
1156    ) -> Task<Result<()>> {
1157        if self.status.is_offline() {
1158            return Task::ready(Err(anyhow!("room is offline")));
1159        }
1160
1161        let client = self.client.clone();
1162        let room_id = self.id;
1163        let location = if let Some(project) = project {
1164            self.local_participant.active_project = Some(project.downgrade());
1165            if let Some(project_id) = project.read(cx).remote_id() {
1166                proto::participant_location::Variant::SharedProject(
1167                    proto::participant_location::SharedProject { id: project_id },
1168                )
1169            } else {
1170                proto::participant_location::Variant::UnsharedProject(
1171                    proto::participant_location::UnsharedProject {},
1172                )
1173            }
1174        } else {
1175            self.local_participant.active_project = None;
1176            proto::participant_location::Variant::External(proto::participant_location::External {})
1177        };
1178
1179        cx.notify();
1180        cx.foreground().spawn(async move {
1181            client
1182                .request(proto::UpdateParticipantLocation {
1183                    room_id,
1184                    location: Some(proto::ParticipantLocation {
1185                        variant: Some(location),
1186                    }),
1187                })
1188                .await?;
1189            Ok(())
1190        })
1191    }
1192
1193    pub fn is_screen_sharing(&self) -> bool {
1194        self.live_kit.as_ref().map_or(false, |live_kit| {
1195            !matches!(live_kit.screen_track, LocalTrack::None)
1196        })
1197    }
1198
1199    pub fn is_sharing_mic(&self) -> bool {
1200        self.live_kit.as_ref().map_or(false, |live_kit| {
1201            !matches!(live_kit.microphone_track, LocalTrack::None)
1202        })
1203    }
1204
1205    pub fn is_muted(&self, cx: &AppContext) -> bool {
1206        self.live_kit
1207            .as_ref()
1208            .and_then(|live_kit| match &live_kit.microphone_track {
1209                LocalTrack::None => Some(Self::mute_on_join(cx)),
1210                LocalTrack::Pending { muted, .. } => Some(*muted),
1211                LocalTrack::Published { muted, .. } => Some(*muted),
1212            })
1213            .unwrap_or(false)
1214    }
1215
1216    pub fn is_speaking(&self) -> bool {
1217        self.live_kit
1218            .as_ref()
1219            .map_or(false, |live_kit| live_kit.speaking)
1220    }
1221
1222    pub fn is_deafened(&self) -> Option<bool> {
1223        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1224    }
1225
1226    #[track_caller]
1227    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1228        if self.status.is_offline() {
1229            return Task::ready(Err(anyhow!("room is offline")));
1230        } else if self.is_sharing_mic() {
1231            return Task::ready(Err(anyhow!("microphone was already shared")));
1232        }
1233
1234        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1235            let publish_id = post_inc(&mut live_kit.next_publish_id);
1236            live_kit.microphone_track = LocalTrack::Pending {
1237                publish_id,
1238                muted: false,
1239            };
1240            cx.notify();
1241            publish_id
1242        } else {
1243            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1244        };
1245
1246        cx.spawn_weak(|this, mut cx| async move {
1247            let publish_track = async {
1248                let track = LocalAudioTrack::create();
1249                this.upgrade(&cx)
1250                    .ok_or_else(|| anyhow!("room was dropped"))?
1251                    .read_with(&cx, |this, _| {
1252                        this.live_kit
1253                            .as_ref()
1254                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1255                    })
1256                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1257                    .await
1258            };
1259
1260            let publication = publish_track.await;
1261            this.upgrade(&cx)
1262                .ok_or_else(|| anyhow!("room was dropped"))?
1263                .update(&mut cx, |this, cx| {
1264                    let live_kit = this
1265                        .live_kit
1266                        .as_mut()
1267                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1268
1269                    let (canceled, muted) = if let LocalTrack::Pending {
1270                        publish_id: cur_publish_id,
1271                        muted,
1272                    } = &live_kit.microphone_track
1273                    {
1274                        (*cur_publish_id != publish_id, *muted)
1275                    } else {
1276                        (true, false)
1277                    };
1278
1279                    match publication {
1280                        Ok(publication) => {
1281                            if canceled {
1282                                live_kit.room.unpublish_track(publication);
1283                            } else {
1284                                if muted {
1285                                    cx.background().spawn(publication.set_mute(muted)).detach();
1286                                }
1287                                live_kit.microphone_track = LocalTrack::Published {
1288                                    track_publication: publication,
1289                                    muted,
1290                                };
1291                                cx.notify();
1292                            }
1293                            Ok(())
1294                        }
1295                        Err(error) => {
1296                            if canceled {
1297                                Ok(())
1298                            } else {
1299                                live_kit.microphone_track = LocalTrack::None;
1300                                cx.notify();
1301                                Err(error)
1302                            }
1303                        }
1304                    }
1305                })
1306        })
1307    }
1308
1309    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1310        if self.status.is_offline() {
1311            return Task::ready(Err(anyhow!("room is offline")));
1312        } else if self.is_screen_sharing() {
1313            return Task::ready(Err(anyhow!("screen was already shared")));
1314        }
1315
1316        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1317            let publish_id = post_inc(&mut live_kit.next_publish_id);
1318            live_kit.screen_track = LocalTrack::Pending {
1319                publish_id,
1320                muted: false,
1321            };
1322            cx.notify();
1323            (live_kit.room.display_sources(), publish_id)
1324        } else {
1325            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1326        };
1327
1328        cx.spawn_weak(|this, mut cx| async move {
1329            let publish_track = async {
1330                let displays = displays.await?;
1331                let display = displays
1332                    .first()
1333                    .ok_or_else(|| anyhow!("no display found"))?;
1334                let track = LocalVideoTrack::screen_share_for_display(&display);
1335                this.upgrade(&cx)
1336                    .ok_or_else(|| anyhow!("room was dropped"))?
1337                    .read_with(&cx, |this, _| {
1338                        this.live_kit
1339                            .as_ref()
1340                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1341                    })
1342                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1343                    .await
1344            };
1345
1346            let publication = publish_track.await;
1347            this.upgrade(&cx)
1348                .ok_or_else(|| anyhow!("room was dropped"))?
1349                .update(&mut cx, |this, cx| {
1350                    let live_kit = this
1351                        .live_kit
1352                        .as_mut()
1353                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1354
1355                    let (canceled, muted) = if let LocalTrack::Pending {
1356                        publish_id: cur_publish_id,
1357                        muted,
1358                    } = &live_kit.screen_track
1359                    {
1360                        (*cur_publish_id != publish_id, *muted)
1361                    } else {
1362                        (true, false)
1363                    };
1364
1365                    match publication {
1366                        Ok(publication) => {
1367                            if canceled {
1368                                live_kit.room.unpublish_track(publication);
1369                            } else {
1370                                if muted {
1371                                    cx.background().spawn(publication.set_mute(muted)).detach();
1372                                }
1373                                live_kit.screen_track = LocalTrack::Published {
1374                                    track_publication: publication,
1375                                    muted,
1376                                };
1377                                cx.notify();
1378                            }
1379
1380                            Audio::play_sound(Sound::StartScreenshare, cx);
1381
1382                            Ok(())
1383                        }
1384                        Err(error) => {
1385                            if canceled {
1386                                Ok(())
1387                            } else {
1388                                live_kit.screen_track = LocalTrack::None;
1389                                cx.notify();
1390                                Err(error)
1391                            }
1392                        }
1393                    }
1394                })
1395        })
1396    }
1397
1398    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1399        let should_mute = !self.is_muted(cx);
1400        if let Some(live_kit) = self.live_kit.as_mut() {
1401            if matches!(live_kit.microphone_track, LocalTrack::None) {
1402                return Ok(self.share_microphone(cx));
1403            }
1404
1405            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1406            live_kit.muted_by_user = should_mute;
1407
1408            if old_muted == true && live_kit.deafened == true {
1409                if let Some(task) = self.toggle_deafen(cx).ok() {
1410                    task.detach();
1411                }
1412            }
1413
1414            Ok(ret_task)
1415        } else {
1416            Err(anyhow!("LiveKit not started"))
1417        }
1418    }
1419
1420    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1421        if let Some(live_kit) = self.live_kit.as_mut() {
1422            (*live_kit).deafened = !live_kit.deafened;
1423
1424            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1425            // Context notification is sent within set_mute itself.
1426            let mut mute_task = None;
1427            // When deafening, mute user's mic as well.
1428            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1429            if live_kit.deafened || !live_kit.muted_by_user {
1430                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1431            };
1432            for participant in self.remote_participants.values() {
1433                for track in live_kit
1434                    .room
1435                    .remote_audio_track_publications(&participant.user.id.to_string())
1436                {
1437                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1438                }
1439            }
1440
1441            Ok(cx.foreground().spawn(async move {
1442                if let Some(mute_task) = mute_task {
1443                    mute_task.await?;
1444                }
1445                for task in tasks {
1446                    task.await?;
1447                }
1448                Ok(())
1449            }))
1450        } else {
1451            Err(anyhow!("LiveKit not started"))
1452        }
1453    }
1454
1455    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1456        if self.status.is_offline() {
1457            return Err(anyhow!("room is offline"));
1458        }
1459
1460        let live_kit = self
1461            .live_kit
1462            .as_mut()
1463            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1464        match mem::take(&mut live_kit.screen_track) {
1465            LocalTrack::None => Err(anyhow!("screen was not shared")),
1466            LocalTrack::Pending { .. } => {
1467                cx.notify();
1468                Ok(())
1469            }
1470            LocalTrack::Published {
1471                track_publication, ..
1472            } => {
1473                live_kit.room.unpublish_track(track_publication);
1474                cx.notify();
1475
1476                Audio::play_sound(Sound::StopScreenshare, cx);
1477                Ok(())
1478            }
1479        }
1480    }
1481
1482    #[cfg(any(test, feature = "test-support"))]
1483    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1484        self.live_kit
1485            .as_ref()
1486            .unwrap()
1487            .room
1488            .set_display_sources(sources);
1489    }
1490}
1491
1492struct LiveKitRoom {
1493    room: Arc<live_kit_client::Room>,
1494    screen_track: LocalTrack,
1495    microphone_track: LocalTrack,
1496    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1497    muted_by_user: bool,
1498    deafened: bool,
1499    speaking: bool,
1500    next_publish_id: usize,
1501    _maintain_room: Task<()>,
1502    _maintain_tracks: [Task<()>; 2],
1503}
1504
1505impl LiveKitRoom {
1506    fn set_mute(
1507        self: &mut LiveKitRoom,
1508        should_mute: bool,
1509        cx: &mut ModelContext<Room>,
1510    ) -> Result<(Task<Result<()>>, bool)> {
1511        if !should_mute {
1512            // clear user muting state.
1513            self.muted_by_user = false;
1514        }
1515
1516        let (result, old_muted) = match &mut self.microphone_track {
1517            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1518            LocalTrack::Pending { muted, .. } => {
1519                let old_muted = *muted;
1520                *muted = should_mute;
1521                cx.notify();
1522                Ok((Task::Ready(Some(Ok(()))), old_muted))
1523            }
1524            LocalTrack::Published {
1525                track_publication,
1526                muted,
1527            } => {
1528                let old_muted = *muted;
1529                *muted = should_mute;
1530                cx.notify();
1531                Ok((
1532                    cx.background().spawn(track_publication.set_mute(*muted)),
1533                    old_muted,
1534                ))
1535            }
1536        }?;
1537
1538        if old_muted != should_mute {
1539            if should_mute {
1540                Audio::play_sound(Sound::Mute, cx);
1541            } else {
1542                Audio::play_sound(Sound::Unmute, cx);
1543            }
1544        }
1545
1546        Ok((result, old_muted))
1547    }
1548}
1549
1550enum LocalTrack {
1551    None,
1552    Pending {
1553        publish_id: usize,
1554        muted: bool,
1555    },
1556    Published {
1557        track_publication: LocalTrackPublication,
1558        muted: bool,
1559    },
1560}
1561
1562impl Default for LocalTrack {
1563    fn default() -> Self {
1564        Self::None
1565    }
1566}
1567
1568#[derive(Copy, Clone, PartialEq, Eq)]
1569pub enum RoomStatus {
1570    Online,
1571    Rejoining,
1572    Offline,
1573}
1574
1575impl RoomStatus {
1576    pub fn is_offline(&self) -> bool {
1577        matches!(self, RoomStatus::Offline)
1578    }
1579
1580    pub fn is_online(&self) -> bool {
1581        matches!(self, RoomStatus::Online)
1582    }
1583}