room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  15use language::LanguageRegistry;
  16use live_kit_client::{
  17    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  18    RemoteVideoTrackUpdate,
  19};
  20use postage::{sink::Sink, stream::Stream, watch};
  21use project::Project;
  22use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  23use util::{post_inc, ResultExt, TryFutureExt};
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27#[derive(Clone, Debug, PartialEq, Eq)]
  28pub enum Event {
  29    ParticipantLocationChanged {
  30        participant_id: proto::PeerId,
  31    },
  32    RemoteVideoTracksChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteAudioTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteProjectShared {
  39        owner: Arc<User>,
  40        project_id: u64,
  41        worktree_root_names: Vec<String>,
  42    },
  43    RemoteProjectUnshared {
  44        project_id: u64,
  45    },
  46    RemoteProjectJoined {
  47        project_id: u64,
  48    },
  49    RemoteProjectInvitationDiscarded {
  50        project_id: u64,
  51    },
  52    Left,
  53}
  54
  55pub struct Room {
  56    id: u64,
  57    pub channel_id: Option<u64>,
  58    live_kit: Option<LiveKitRoom>,
  59    status: RoomStatus,
  60    shared_projects: HashSet<WeakModelHandle<Project>>,
  61    joined_projects: HashSet<WeakModelHandle<Project>>,
  62    local_participant: LocalParticipant,
  63    remote_participants: BTreeMap<u64, RemoteParticipant>,
  64    pending_participants: Vec<Arc<User>>,
  65    participant_user_ids: HashSet<u64>,
  66    pending_call_count: usize,
  67    leave_when_empty: bool,
  68    client: Arc<Client>,
  69    user_store: ModelHandle<UserStore>,
  70    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  71    subscriptions: Vec<client::Subscription>,
  72    room_update_completed_tx: watch::Sender<Option<()>>,
  73    room_update_completed_rx: watch::Receiver<Option<()>>,
  74    pending_room_update: Option<Task<()>>,
  75    maintain_connection: Option<Task<Option<()>>>,
  76}
  77
  78impl Entity for Room {
  79    type Event = Event;
  80
  81    fn release(&mut self, cx: &mut AppContext) {
  82        if self.status.is_online() {
  83            self.leave_internal(cx).detach_and_log_err(cx);
  84        }
  85    }
  86
  87    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  88        if self.status.is_online() {
  89            let leave = self.leave_internal(cx);
  90            Some(
  91                cx.background()
  92                    .spawn(async move {
  93                        leave.await.log_err();
  94                    })
  95                    .boxed(),
  96            )
  97        } else {
  98            None
  99        }
 100    }
 101}
 102
 103impl Room {
 104    pub fn channel_id(&self) -> Option<u64> {
 105        self.channel_id
 106    }
 107
 108    pub fn is_sharing_project(&self) -> bool {
 109        !self.shared_projects.is_empty()
 110    }
 111
 112    #[cfg(any(test, feature = "test-support"))]
 113    pub fn is_connected(&self) -> bool {
 114        if let Some(live_kit) = self.live_kit.as_ref() {
 115            matches!(
 116                *live_kit.room.status().borrow(),
 117                live_kit_client::ConnectionState::Connected { .. }
 118            )
 119        } else {
 120            false
 121        }
 122    }
 123
 124    pub fn can_publish(&self) -> bool {
 125        self.live_kit.as_ref().is_some_and(|room| room.can_publish)
 126    }
 127
 128    fn new(
 129        id: u64,
 130        channel_id: Option<u64>,
 131        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 132        client: Arc<Client>,
 133        user_store: ModelHandle<UserStore>,
 134        cx: &mut ModelContext<Self>,
 135    ) -> Self {
 136        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 137            let room = live_kit_client::Room::new();
 138            let mut status = room.status();
 139            // Consume the initial status of the room.
 140            let _ = status.try_recv();
 141            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 142                while let Some(status) = status.next().await {
 143                    let this = if let Some(this) = this.upgrade(&cx) {
 144                        this
 145                    } else {
 146                        break;
 147                    };
 148
 149                    if status == live_kit_client::ConnectionState::Disconnected {
 150                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 151                        break;
 152                    }
 153                }
 154            });
 155
 156            let mut track_video_changes = room.remote_video_track_updates();
 157            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 158                while let Some(track_change) = track_video_changes.next().await {
 159                    let this = if let Some(this) = this.upgrade(&cx) {
 160                        this
 161                    } else {
 162                        break;
 163                    };
 164
 165                    this.update(&mut cx, |this, cx| {
 166                        this.remote_video_track_updated(track_change, cx).log_err()
 167                    });
 168                }
 169            });
 170
 171            let mut track_audio_changes = room.remote_audio_track_updates();
 172            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 173                while let Some(track_change) = track_audio_changes.next().await {
 174                    let this = if let Some(this) = this.upgrade(&cx) {
 175                        this
 176                    } else {
 177                        break;
 178                    };
 179
 180                    this.update(&mut cx, |this, cx| {
 181                        this.remote_audio_track_updated(track_change, cx).log_err()
 182                    });
 183                }
 184            });
 185
 186            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 187            if connection_info.can_publish {
 188                cx.spawn(|this, mut cx| async move {
 189                    connect.await?;
 190
 191                    if !cx.read(Self::mute_on_join) {
 192                        this.update(&mut cx, |this, cx| this.share_microphone(cx))
 193                            .await?;
 194                    }
 195
 196                    anyhow::Ok(())
 197                })
 198                .detach_and_log_err(cx);
 199            }
 200
 201            Some(LiveKitRoom {
 202                room,
 203                can_publish: connection_info.can_publish,
 204                screen_track: LocalTrack::None,
 205                microphone_track: LocalTrack::None,
 206                next_publish_id: 0,
 207                muted_by_user: false,
 208                deafened: false,
 209                speaking: false,
 210                _maintain_room,
 211                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 212            })
 213        } else {
 214            None
 215        };
 216
 217        let maintain_connection =
 218            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 219
 220        Audio::play_sound(Sound::Joined, cx);
 221
 222        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 223
 224        Self {
 225            id,
 226            channel_id,
 227            live_kit: live_kit_room,
 228            status: RoomStatus::Online,
 229            shared_projects: Default::default(),
 230            joined_projects: Default::default(),
 231            participant_user_ids: Default::default(),
 232            local_participant: Default::default(),
 233            remote_participants: Default::default(),
 234            pending_participants: Default::default(),
 235            pending_call_count: 0,
 236            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 237            leave_when_empty: false,
 238            pending_room_update: None,
 239            client,
 240            user_store,
 241            follows_by_leader_id_project_id: Default::default(),
 242            maintain_connection: Some(maintain_connection),
 243            room_update_completed_tx,
 244            room_update_completed_rx,
 245        }
 246    }
 247
 248    pub(crate) fn create(
 249        called_user_id: u64,
 250        initial_project: Option<ModelHandle<Project>>,
 251        client: Arc<Client>,
 252        user_store: ModelHandle<UserStore>,
 253        cx: &mut AppContext,
 254    ) -> Task<Result<ModelHandle<Self>>> {
 255        cx.spawn(|mut cx| async move {
 256            let response = client.request(proto::CreateRoom {}).await?;
 257            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 258            let room = cx.add_model(|cx| {
 259                Self::new(
 260                    room_proto.id,
 261                    None,
 262                    response.live_kit_connection_info,
 263                    client,
 264                    user_store,
 265                    cx,
 266                )
 267            });
 268
 269            let initial_project_id = if let Some(initial_project) = initial_project {
 270                let initial_project_id = room
 271                    .update(&mut cx, |room, cx| {
 272                        room.share_project(initial_project.clone(), cx)
 273                    })
 274                    .await?;
 275                Some(initial_project_id)
 276            } else {
 277                None
 278            };
 279
 280            match room
 281                .update(&mut cx, |room, cx| {
 282                    room.leave_when_empty = true;
 283                    room.call(called_user_id, initial_project_id, cx)
 284                })
 285                .await
 286            {
 287                Ok(()) => Ok(room),
 288                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 289            }
 290        })
 291    }
 292
 293    pub(crate) async fn join_channel(
 294        channel_id: u64,
 295        client: Arc<Client>,
 296        user_store: ModelHandle<UserStore>,
 297        cx: AsyncAppContext,
 298    ) -> Result<ModelHandle<Self>> {
 299        Self::from_join_response(
 300            client.request(proto::JoinChannel { channel_id }).await?,
 301            client,
 302            user_store,
 303            cx,
 304        )
 305    }
 306
 307    pub(crate) async fn join(
 308        room_id: u64,
 309        client: Arc<Client>,
 310        user_store: ModelHandle<UserStore>,
 311        cx: AsyncAppContext,
 312    ) -> Result<ModelHandle<Self>> {
 313        Self::from_join_response(
 314            client.request(proto::JoinRoom { id: room_id }).await?,
 315            client,
 316            user_store,
 317            cx,
 318        )
 319    }
 320
 321    pub fn mute_on_join(cx: &AppContext) -> bool {
 322        settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 323    }
 324
 325    fn from_join_response(
 326        response: proto::JoinRoomResponse,
 327        client: Arc<Client>,
 328        user_store: ModelHandle<UserStore>,
 329        mut cx: AsyncAppContext,
 330    ) -> Result<ModelHandle<Self>> {
 331        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 332        let room = cx.add_model(|cx| {
 333            Self::new(
 334                room_proto.id,
 335                response.channel_id,
 336                response.live_kit_connection_info,
 337                client,
 338                user_store,
 339                cx,
 340            )
 341        });
 342        room.update(&mut cx, |room, cx| {
 343            room.leave_when_empty = room.channel_id.is_none();
 344            room.apply_room_update(room_proto, cx)?;
 345            anyhow::Ok(())
 346        })?;
 347        Ok(room)
 348    }
 349
 350    fn should_leave(&self) -> bool {
 351        self.leave_when_empty
 352            && self.pending_room_update.is_none()
 353            && self.pending_participants.is_empty()
 354            && self.remote_participants.is_empty()
 355            && self.pending_call_count == 0
 356    }
 357
 358    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 359        cx.notify();
 360        cx.emit(Event::Left);
 361        self.leave_internal(cx)
 362    }
 363
 364    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 365        if self.status.is_offline() {
 366            return Task::ready(Err(anyhow!("room is offline")));
 367        }
 368
 369        log::info!("leaving room");
 370        Audio::play_sound(Sound::Leave, cx);
 371
 372        self.clear_state(cx);
 373
 374        let leave_room = self.client.request(proto::LeaveRoom {});
 375        cx.background().spawn(async move {
 376            leave_room.await?;
 377            anyhow::Ok(())
 378        })
 379    }
 380
 381    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 382        for project in self.shared_projects.drain() {
 383            if let Some(project) = project.upgrade(cx) {
 384                project.update(cx, |project, cx| {
 385                    project.unshare(cx).log_err();
 386                });
 387            }
 388        }
 389        for project in self.joined_projects.drain() {
 390            if let Some(project) = project.upgrade(cx) {
 391                project.update(cx, |project, cx| {
 392                    project.disconnected_from_host(cx);
 393                    project.close(cx);
 394                });
 395            }
 396        }
 397
 398        self.status = RoomStatus::Offline;
 399        self.remote_participants.clear();
 400        self.pending_participants.clear();
 401        self.participant_user_ids.clear();
 402        self.subscriptions.clear();
 403        self.live_kit.take();
 404        self.pending_room_update.take();
 405        self.maintain_connection.take();
 406    }
 407
 408    async fn maintain_connection(
 409        this: WeakModelHandle<Self>,
 410        client: Arc<Client>,
 411        mut cx: AsyncAppContext,
 412    ) -> Result<()> {
 413        let mut client_status = client.status();
 414        loop {
 415            let _ = client_status.try_recv();
 416            let is_connected = client_status.borrow().is_connected();
 417            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 418            if !is_connected || client_status.next().await.is_some() {
 419                log::info!("detected client disconnection");
 420
 421                this.upgrade(&cx)
 422                    .ok_or_else(|| anyhow!("room was dropped"))?
 423                    .update(&mut cx, |this, cx| {
 424                        this.status = RoomStatus::Rejoining;
 425                        cx.notify();
 426                    });
 427
 428                // Wait for client to re-establish a connection to the server.
 429                {
 430                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 431                    let client_reconnection = async {
 432                        let mut remaining_attempts = 3;
 433                        while remaining_attempts > 0 {
 434                            if client_status.borrow().is_connected() {
 435                                log::info!("client reconnected, attempting to rejoin room");
 436
 437                                let Some(this) = this.upgrade(&cx) else { break };
 438                                if this
 439                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 440                                    .await
 441                                    .log_err()
 442                                    .is_some()
 443                                {
 444                                    return true;
 445                                } else {
 446                                    remaining_attempts -= 1;
 447                                }
 448                            } else if client_status.borrow().is_signed_out() {
 449                                return false;
 450                            }
 451
 452                            log::info!(
 453                                "waiting for client status change, remaining attempts {}",
 454                                remaining_attempts
 455                            );
 456                            client_status.next().await;
 457                        }
 458                        false
 459                    }
 460                    .fuse();
 461                    futures::pin_mut!(client_reconnection);
 462
 463                    futures::select_biased! {
 464                        reconnected = client_reconnection => {
 465                            if reconnected {
 466                                log::info!("successfully reconnected to room");
 467                                // If we successfully joined the room, go back around the loop
 468                                // waiting for future connection status changes.
 469                                continue;
 470                            }
 471                        }
 472                        _ = reconnection_timeout => {
 473                            log::info!("room reconnection timeout expired");
 474                        }
 475                    }
 476                }
 477
 478                break;
 479            }
 480        }
 481
 482        // The client failed to re-establish a connection to the server
 483        // or an error occurred while trying to re-join the room. Either way
 484        // we leave the room and return an error.
 485        if let Some(this) = this.upgrade(&cx) {
 486            log::info!("reconnection failed, leaving room");
 487            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 488        }
 489        Err(anyhow!(
 490            "can't reconnect to room: client failed to re-establish connection"
 491        ))
 492    }
 493
 494    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 495        let mut projects = HashMap::default();
 496        let mut reshared_projects = Vec::new();
 497        let mut rejoined_projects = Vec::new();
 498        self.shared_projects.retain(|project| {
 499            if let Some(handle) = project.upgrade(cx) {
 500                let project = handle.read(cx);
 501                if let Some(project_id) = project.remote_id() {
 502                    projects.insert(project_id, handle.clone());
 503                    reshared_projects.push(proto::UpdateProject {
 504                        project_id,
 505                        worktrees: project.worktree_metadata_protos(cx),
 506                    });
 507                    return true;
 508                }
 509            }
 510            false
 511        });
 512        self.joined_projects.retain(|project| {
 513            if let Some(handle) = project.upgrade(cx) {
 514                let project = handle.read(cx);
 515                if let Some(project_id) = project.remote_id() {
 516                    projects.insert(project_id, handle.clone());
 517                    rejoined_projects.push(proto::RejoinProject {
 518                        id: project_id,
 519                        worktrees: project
 520                            .worktrees(cx)
 521                            .map(|worktree| {
 522                                let worktree = worktree.read(cx);
 523                                proto::RejoinWorktree {
 524                                    id: worktree.id().to_proto(),
 525                                    scan_id: worktree.completed_scan_id() as u64,
 526                                }
 527                            })
 528                            .collect(),
 529                    });
 530                }
 531                return true;
 532            }
 533            false
 534        });
 535
 536        let response = self.client.request_envelope(proto::RejoinRoom {
 537            id: self.id,
 538            reshared_projects,
 539            rejoined_projects,
 540        });
 541
 542        cx.spawn(|this, mut cx| async move {
 543            let response = response.await?;
 544            let message_id = response.message_id;
 545            let response = response.payload;
 546            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 547            this.update(&mut cx, |this, cx| {
 548                this.status = RoomStatus::Online;
 549                this.apply_room_update(room_proto, cx)?;
 550
 551                for reshared_project in response.reshared_projects {
 552                    if let Some(project) = projects.get(&reshared_project.id) {
 553                        project.update(cx, |project, cx| {
 554                            project.reshared(reshared_project, cx).log_err();
 555                        });
 556                    }
 557                }
 558
 559                for rejoined_project in response.rejoined_projects {
 560                    if let Some(project) = projects.get(&rejoined_project.id) {
 561                        project.update(cx, |project, cx| {
 562                            project.rejoined(rejoined_project, message_id, cx).log_err();
 563                        });
 564                    }
 565                }
 566
 567                anyhow::Ok(())
 568            })
 569        })
 570    }
 571
 572    pub fn id(&self) -> u64 {
 573        self.id
 574    }
 575
 576    pub fn status(&self) -> RoomStatus {
 577        self.status
 578    }
 579
 580    pub fn local_participant(&self) -> &LocalParticipant {
 581        &self.local_participant
 582    }
 583
 584    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 585        &self.remote_participants
 586    }
 587
 588    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 589        self.remote_participants
 590            .values()
 591            .find(|p| p.peer_id == peer_id)
 592    }
 593
 594    pub fn pending_participants(&self) -> &[Arc<User>] {
 595        &self.pending_participants
 596    }
 597
 598    pub fn contains_participant(&self, user_id: u64) -> bool {
 599        self.participant_user_ids.contains(&user_id)
 600    }
 601
 602    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 603        self.follows_by_leader_id_project_id
 604            .get(&(leader_id, project_id))
 605            .map_or(&[], |v| v.as_slice())
 606    }
 607
 608    /// Returns the most 'active' projects, defined as most people in the project
 609    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 610        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 611        for participant in self.remote_participants.values() {
 612            match participant.location {
 613                ParticipantLocation::SharedProject { project_id } => {
 614                    project_hosts_and_guest_counts
 615                        .entry(project_id)
 616                        .or_default()
 617                        .1 += 1;
 618                }
 619                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 620            }
 621            for project in &participant.projects {
 622                project_hosts_and_guest_counts
 623                    .entry(project.id)
 624                    .or_default()
 625                    .0 = Some(participant.user.id);
 626            }
 627        }
 628
 629        if let Some(user) = self.user_store.read(cx).current_user() {
 630            for project in &self.local_participant.projects {
 631                project_hosts_and_guest_counts
 632                    .entry(project.id)
 633                    .or_default()
 634                    .0 = Some(user.id);
 635            }
 636        }
 637
 638        project_hosts_and_guest_counts
 639            .into_iter()
 640            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 641            .max_by_key(|(_, _, guest_count)| *guest_count)
 642            .map(|(id, host, _)| (id, host))
 643    }
 644
 645    async fn handle_room_updated(
 646        this: ModelHandle<Self>,
 647        envelope: TypedEnvelope<proto::RoomUpdated>,
 648        _: Arc<Client>,
 649        mut cx: AsyncAppContext,
 650    ) -> Result<()> {
 651        let room = envelope
 652            .payload
 653            .room
 654            .ok_or_else(|| anyhow!("invalid room"))?;
 655        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 656    }
 657
 658    fn apply_room_update(
 659        &mut self,
 660        mut room: proto::Room,
 661        cx: &mut ModelContext<Self>,
 662    ) -> Result<()> {
 663        // Filter ourselves out from the room's participants.
 664        let local_participant_ix = room
 665            .participants
 666            .iter()
 667            .position(|participant| Some(participant.user_id) == self.client.user_id());
 668        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 669
 670        let pending_participant_user_ids = room
 671            .pending_participants
 672            .iter()
 673            .map(|p| p.user_id)
 674            .collect::<Vec<_>>();
 675
 676        let remote_participant_user_ids = room
 677            .participants
 678            .iter()
 679            .map(|p| p.user_id)
 680            .collect::<Vec<_>>();
 681
 682        let (remote_participants, pending_participants) =
 683            self.user_store.update(cx, move |user_store, cx| {
 684                (
 685                    user_store.get_users(remote_participant_user_ids, cx),
 686                    user_store.get_users(pending_participant_user_ids, cx),
 687                )
 688            });
 689
 690        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 691            let (remote_participants, pending_participants) =
 692                futures::join!(remote_participants, pending_participants);
 693
 694            this.update(&mut cx, |this, cx| {
 695                this.participant_user_ids.clear();
 696
 697                if let Some(participant) = local_participant {
 698                    this.local_participant.projects = participant.projects;
 699                } else {
 700                    this.local_participant.projects.clear();
 701                }
 702
 703                if let Some(participants) = remote_participants.log_err() {
 704                    for (participant, user) in room.participants.into_iter().zip(participants) {
 705                        let Some(peer_id) = participant.peer_id else {
 706                            continue;
 707                        };
 708                        let participant_index = ParticipantIndex(participant.participant_index);
 709                        this.participant_user_ids.insert(participant.user_id);
 710
 711                        let old_projects = this
 712                            .remote_participants
 713                            .get(&participant.user_id)
 714                            .into_iter()
 715                            .flat_map(|existing| &existing.projects)
 716                            .map(|project| project.id)
 717                            .collect::<HashSet<_>>();
 718                        let new_projects = participant
 719                            .projects
 720                            .iter()
 721                            .map(|project| project.id)
 722                            .collect::<HashSet<_>>();
 723
 724                        for project in &participant.projects {
 725                            if !old_projects.contains(&project.id) {
 726                                cx.emit(Event::RemoteProjectShared {
 727                                    owner: user.clone(),
 728                                    project_id: project.id,
 729                                    worktree_root_names: project.worktree_root_names.clone(),
 730                                });
 731                            }
 732                        }
 733
 734                        for unshared_project_id in old_projects.difference(&new_projects) {
 735                            this.joined_projects.retain(|project| {
 736                                if let Some(project) = project.upgrade(cx) {
 737                                    project.update(cx, |project, cx| {
 738                                        if project.remote_id() == Some(*unshared_project_id) {
 739                                            project.disconnected_from_host(cx);
 740                                            false
 741                                        } else {
 742                                            true
 743                                        }
 744                                    })
 745                                } else {
 746                                    false
 747                                }
 748                            });
 749                            cx.emit(Event::RemoteProjectUnshared {
 750                                project_id: *unshared_project_id,
 751                            });
 752                        }
 753
 754                        let location = ParticipantLocation::from_proto(participant.location)
 755                            .unwrap_or(ParticipantLocation::External);
 756                        if let Some(remote_participant) =
 757                            this.remote_participants.get_mut(&participant.user_id)
 758                        {
 759                            remote_participant.peer_id = peer_id;
 760                            remote_participant.projects = participant.projects;
 761                            remote_participant.participant_index = participant_index;
 762                            if location != remote_participant.location {
 763                                remote_participant.location = location;
 764                                cx.emit(Event::ParticipantLocationChanged {
 765                                    participant_id: peer_id,
 766                                });
 767                            }
 768                        } else {
 769                            this.remote_participants.insert(
 770                                participant.user_id,
 771                                RemoteParticipant {
 772                                    user: user.clone(),
 773                                    participant_index,
 774                                    peer_id,
 775                                    projects: participant.projects,
 776                                    location,
 777                                    muted: true,
 778                                    speaking: false,
 779                                    video_tracks: Default::default(),
 780                                    audio_tracks: Default::default(),
 781                                },
 782                            );
 783
 784                            Audio::play_sound(Sound::Joined, cx);
 785
 786                            if let Some(live_kit) = this.live_kit.as_ref() {
 787                                let video_tracks =
 788                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 789                                let audio_tracks =
 790                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 791                                let publications = live_kit
 792                                    .room
 793                                    .remote_audio_track_publications(&user.id.to_string());
 794
 795                                for track in video_tracks {
 796                                    this.remote_video_track_updated(
 797                                        RemoteVideoTrackUpdate::Subscribed(track),
 798                                        cx,
 799                                    )
 800                                    .log_err();
 801                                }
 802
 803                                for (track, publication) in
 804                                    audio_tracks.iter().zip(publications.iter())
 805                                {
 806                                    this.remote_audio_track_updated(
 807                                        RemoteAudioTrackUpdate::Subscribed(
 808                                            track.clone(),
 809                                            publication.clone(),
 810                                        ),
 811                                        cx,
 812                                    )
 813                                    .log_err();
 814                                }
 815                            }
 816                        }
 817                    }
 818
 819                    this.remote_participants.retain(|user_id, participant| {
 820                        if this.participant_user_ids.contains(user_id) {
 821                            true
 822                        } else {
 823                            for project in &participant.projects {
 824                                cx.emit(Event::RemoteProjectUnshared {
 825                                    project_id: project.id,
 826                                });
 827                            }
 828                            false
 829                        }
 830                    });
 831                }
 832
 833                if let Some(pending_participants) = pending_participants.log_err() {
 834                    this.pending_participants = pending_participants;
 835                    for participant in &this.pending_participants {
 836                        this.participant_user_ids.insert(participant.id);
 837                    }
 838                }
 839
 840                this.follows_by_leader_id_project_id.clear();
 841                for follower in room.followers {
 842                    let project_id = follower.project_id;
 843                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 844                        (Some(leader), Some(follower)) => (leader, follower),
 845
 846                        _ => {
 847                            log::error!("Follower message {follower:?} missing some state");
 848                            continue;
 849                        }
 850                    };
 851
 852                    let list = this
 853                        .follows_by_leader_id_project_id
 854                        .entry((leader, project_id))
 855                        .or_insert(Vec::new());
 856                    if !list.contains(&follower) {
 857                        list.push(follower);
 858                    }
 859                }
 860
 861                this.pending_room_update.take();
 862                if this.should_leave() {
 863                    log::info!("room is empty, leaving");
 864                    let _ = this.leave(cx);
 865                }
 866
 867                this.user_store.update(cx, |user_store, cx| {
 868                    let participant_indices_by_user_id = this
 869                        .remote_participants
 870                        .iter()
 871                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 872                        .collect();
 873                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 874                });
 875
 876                this.check_invariants();
 877                this.room_update_completed_tx.try_send(Some(())).ok();
 878                cx.notify();
 879            });
 880        }));
 881
 882        cx.notify();
 883        Ok(())
 884    }
 885
 886    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 887        let mut done_rx = self.room_update_completed_rx.clone();
 888        async move {
 889            while let Some(result) = done_rx.next().await {
 890                if result.is_some() {
 891                    break;
 892                }
 893            }
 894        }
 895    }
 896
 897    fn remote_video_track_updated(
 898        &mut self,
 899        change: RemoteVideoTrackUpdate,
 900        cx: &mut ModelContext<Self>,
 901    ) -> Result<()> {
 902        match change {
 903            RemoteVideoTrackUpdate::Subscribed(track) => {
 904                let user_id = track.publisher_id().parse()?;
 905                let track_id = track.sid().to_string();
 906                let participant = self
 907                    .remote_participants
 908                    .get_mut(&user_id)
 909                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 910                participant.video_tracks.insert(
 911                    track_id.clone(),
 912                    Arc::new(RemoteVideoTrack {
 913                        live_kit_track: track,
 914                    }),
 915                );
 916                cx.emit(Event::RemoteVideoTracksChanged {
 917                    participant_id: participant.peer_id,
 918                });
 919            }
 920            RemoteVideoTrackUpdate::Unsubscribed {
 921                publisher_id,
 922                track_id,
 923            } => {
 924                let user_id = publisher_id.parse()?;
 925                let participant = self
 926                    .remote_participants
 927                    .get_mut(&user_id)
 928                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 929                participant.video_tracks.remove(&track_id);
 930                cx.emit(Event::RemoteVideoTracksChanged {
 931                    participant_id: participant.peer_id,
 932                });
 933            }
 934        }
 935
 936        cx.notify();
 937        Ok(())
 938    }
 939
 940    fn remote_audio_track_updated(
 941        &mut self,
 942        change: RemoteAudioTrackUpdate,
 943        cx: &mut ModelContext<Self>,
 944    ) -> Result<()> {
 945        match change {
 946            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 947                let mut speaker_ids = speakers
 948                    .into_iter()
 949                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 950                    .collect::<Vec<u64>>();
 951                speaker_ids.sort_unstable();
 952                for (sid, participant) in &mut self.remote_participants {
 953                    if let Ok(_) = speaker_ids.binary_search(sid) {
 954                        participant.speaking = true;
 955                    } else {
 956                        participant.speaking = false;
 957                    }
 958                }
 959                if let Some(id) = self.client.user_id() {
 960                    if let Some(room) = &mut self.live_kit {
 961                        if let Ok(_) = speaker_ids.binary_search(&id) {
 962                            room.speaking = true;
 963                        } else {
 964                            room.speaking = false;
 965                        }
 966                    }
 967                }
 968                cx.notify();
 969            }
 970            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 971                let mut found = false;
 972                for participant in &mut self.remote_participants.values_mut() {
 973                    for track in participant.audio_tracks.values() {
 974                        if track.sid() == track_id {
 975                            found = true;
 976                            break;
 977                        }
 978                    }
 979                    if found {
 980                        participant.muted = muted;
 981                        break;
 982                    }
 983                }
 984
 985                cx.notify();
 986            }
 987            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 988                let user_id = track.publisher_id().parse()?;
 989                let track_id = track.sid().to_string();
 990                let participant = self
 991                    .remote_participants
 992                    .get_mut(&user_id)
 993                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 994
 995                participant.audio_tracks.insert(track_id.clone(), track);
 996                participant.muted = publication.is_muted();
 997
 998                cx.emit(Event::RemoteAudioTracksChanged {
 999                    participant_id: participant.peer_id,
1000                });
1001            }
1002            RemoteAudioTrackUpdate::Unsubscribed {
1003                publisher_id,
1004                track_id,
1005            } => {
1006                let user_id = publisher_id.parse()?;
1007                let participant = self
1008                    .remote_participants
1009                    .get_mut(&user_id)
1010                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1011                participant.audio_tracks.remove(&track_id);
1012                cx.emit(Event::RemoteAudioTracksChanged {
1013                    participant_id: participant.peer_id,
1014                });
1015            }
1016        }
1017
1018        cx.notify();
1019        Ok(())
1020    }
1021
1022    fn check_invariants(&self) {
1023        #[cfg(any(test, feature = "test-support"))]
1024        {
1025            for participant in self.remote_participants.values() {
1026                assert!(self.participant_user_ids.contains(&participant.user.id));
1027                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1028            }
1029
1030            for participant in &self.pending_participants {
1031                assert!(self.participant_user_ids.contains(&participant.id));
1032                assert_ne!(participant.id, self.client.user_id().unwrap());
1033            }
1034
1035            assert_eq!(
1036                self.participant_user_ids.len(),
1037                self.remote_participants.len() + self.pending_participants.len()
1038            );
1039        }
1040    }
1041
1042    pub(crate) fn call(
1043        &mut self,
1044        called_user_id: u64,
1045        initial_project_id: Option<u64>,
1046        cx: &mut ModelContext<Self>,
1047    ) -> Task<Result<()>> {
1048        if self.status.is_offline() {
1049            return Task::ready(Err(anyhow!("room is offline")));
1050        }
1051
1052        cx.notify();
1053        let client = self.client.clone();
1054        let room_id = self.id;
1055        self.pending_call_count += 1;
1056        cx.spawn(|this, mut cx| async move {
1057            let result = client
1058                .request(proto::Call {
1059                    room_id,
1060                    called_user_id,
1061                    initial_project_id,
1062                })
1063                .await;
1064            this.update(&mut cx, |this, cx| {
1065                this.pending_call_count -= 1;
1066                if this.should_leave() {
1067                    this.leave(cx).detach_and_log_err(cx);
1068                }
1069            });
1070            result?;
1071            Ok(())
1072        })
1073    }
1074
1075    pub fn join_project(
1076        &mut self,
1077        id: u64,
1078        language_registry: Arc<LanguageRegistry>,
1079        fs: Arc<dyn Fs>,
1080        cx: &mut ModelContext<Self>,
1081    ) -> Task<Result<ModelHandle<Project>>> {
1082        let client = self.client.clone();
1083        let user_store = self.user_store.clone();
1084        cx.emit(Event::RemoteProjectJoined { project_id: id });
1085        cx.spawn(|this, mut cx| async move {
1086            let project =
1087                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1088
1089            this.update(&mut cx, |this, cx| {
1090                this.joined_projects.retain(|project| {
1091                    if let Some(project) = project.upgrade(cx) {
1092                        !project.read(cx).is_read_only()
1093                    } else {
1094                        false
1095                    }
1096                });
1097                this.joined_projects.insert(project.downgrade());
1098            });
1099            Ok(project)
1100        })
1101    }
1102
1103    pub(crate) fn share_project(
1104        &mut self,
1105        project: ModelHandle<Project>,
1106        cx: &mut ModelContext<Self>,
1107    ) -> Task<Result<u64>> {
1108        if let Some(project_id) = project.read(cx).remote_id() {
1109            return Task::ready(Ok(project_id));
1110        }
1111
1112        let request = self.client.request(proto::ShareProject {
1113            room_id: self.id(),
1114            worktrees: project.read(cx).worktree_metadata_protos(cx),
1115        });
1116        cx.spawn(|this, mut cx| async move {
1117            let response = request.await?;
1118
1119            project.update(&mut cx, |project, cx| {
1120                project.shared(response.project_id, cx)
1121            })?;
1122
1123            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1124            this.update(&mut cx, |this, cx| {
1125                this.shared_projects.insert(project.downgrade());
1126                let active_project = this.local_participant.active_project.as_ref();
1127                if active_project.map_or(false, |location| *location == project) {
1128                    this.set_location(Some(&project), cx)
1129                } else {
1130                    Task::ready(Ok(()))
1131                }
1132            })
1133            .await?;
1134
1135            Ok(response.project_id)
1136        })
1137    }
1138
1139    pub(crate) fn unshare_project(
1140        &mut self,
1141        project: ModelHandle<Project>,
1142        cx: &mut ModelContext<Self>,
1143    ) -> Result<()> {
1144        let project_id = match project.read(cx).remote_id() {
1145            Some(project_id) => project_id,
1146            None => return Ok(()),
1147        };
1148
1149        self.client.send(proto::UnshareProject { project_id })?;
1150        project.update(cx, |this, cx| this.unshare(cx))
1151    }
1152
1153    pub(crate) fn set_location(
1154        &mut self,
1155        project: Option<&ModelHandle<Project>>,
1156        cx: &mut ModelContext<Self>,
1157    ) -> Task<Result<()>> {
1158        if self.status.is_offline() {
1159            return Task::ready(Err(anyhow!("room is offline")));
1160        }
1161
1162        let client = self.client.clone();
1163        let room_id = self.id;
1164        let location = if let Some(project) = project {
1165            self.local_participant.active_project = Some(project.downgrade());
1166            if let Some(project_id) = project.read(cx).remote_id() {
1167                proto::participant_location::Variant::SharedProject(
1168                    proto::participant_location::SharedProject { id: project_id },
1169                )
1170            } else {
1171                proto::participant_location::Variant::UnsharedProject(
1172                    proto::participant_location::UnsharedProject {},
1173                )
1174            }
1175        } else {
1176            self.local_participant.active_project = None;
1177            proto::participant_location::Variant::External(proto::participant_location::External {})
1178        };
1179
1180        cx.notify();
1181        cx.foreground().spawn(async move {
1182            client
1183                .request(proto::UpdateParticipantLocation {
1184                    room_id,
1185                    location: Some(proto::ParticipantLocation {
1186                        variant: Some(location),
1187                    }),
1188                })
1189                .await?;
1190            Ok(())
1191        })
1192    }
1193
1194    pub fn is_screen_sharing(&self) -> bool {
1195        self.live_kit.as_ref().map_or(false, |live_kit| {
1196            !matches!(live_kit.screen_track, LocalTrack::None)
1197        })
1198    }
1199
1200    pub fn is_sharing_mic(&self) -> bool {
1201        self.live_kit.as_ref().map_or(false, |live_kit| {
1202            !matches!(live_kit.microphone_track, LocalTrack::None)
1203        })
1204    }
1205
1206    pub fn is_muted(&self, cx: &AppContext) -> bool {
1207        self.live_kit
1208            .as_ref()
1209            .and_then(|live_kit| match &live_kit.microphone_track {
1210                LocalTrack::None => Some(Self::mute_on_join(cx)),
1211                LocalTrack::Pending { muted, .. } => Some(*muted),
1212                LocalTrack::Published { muted, .. } => Some(*muted),
1213            })
1214            .unwrap_or(false)
1215    }
1216
1217    pub fn is_speaking(&self) -> bool {
1218        self.live_kit
1219            .as_ref()
1220            .map_or(false, |live_kit| live_kit.speaking)
1221    }
1222
1223    pub fn is_deafened(&self) -> Option<bool> {
1224        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1225    }
1226
1227    #[track_caller]
1228    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1229        if self.status.is_offline() {
1230            return Task::ready(Err(anyhow!("room is offline")));
1231        } else if self.is_sharing_mic() {
1232            return Task::ready(Err(anyhow!("microphone was already shared")));
1233        }
1234
1235        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1236            let publish_id = post_inc(&mut live_kit.next_publish_id);
1237            live_kit.microphone_track = LocalTrack::Pending {
1238                publish_id,
1239                muted: false,
1240            };
1241            cx.notify();
1242            publish_id
1243        } else {
1244            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1245        };
1246
1247        cx.spawn_weak(|this, mut cx| async move {
1248            let publish_track = async {
1249                let track = LocalAudioTrack::create();
1250                this.upgrade(&cx)
1251                    .ok_or_else(|| anyhow!("room was dropped"))?
1252                    .read_with(&cx, |this, _| {
1253                        this.live_kit
1254                            .as_ref()
1255                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1256                    })
1257                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1258                    .await
1259            };
1260
1261            let publication = publish_track.await;
1262            this.upgrade(&cx)
1263                .ok_or_else(|| anyhow!("room was dropped"))?
1264                .update(&mut cx, |this, cx| {
1265                    let live_kit = this
1266                        .live_kit
1267                        .as_mut()
1268                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1269
1270                    let (canceled, muted) = if let LocalTrack::Pending {
1271                        publish_id: cur_publish_id,
1272                        muted,
1273                    } = &live_kit.microphone_track
1274                    {
1275                        (*cur_publish_id != publish_id, *muted)
1276                    } else {
1277                        (true, false)
1278                    };
1279
1280                    match publication {
1281                        Ok(publication) => {
1282                            if canceled {
1283                                live_kit.room.unpublish_track(publication);
1284                            } else {
1285                                if muted {
1286                                    cx.background().spawn(publication.set_mute(muted)).detach();
1287                                }
1288                                live_kit.microphone_track = LocalTrack::Published {
1289                                    track_publication: publication,
1290                                    muted,
1291                                };
1292                                cx.notify();
1293                            }
1294                            Ok(())
1295                        }
1296                        Err(error) => {
1297                            if canceled {
1298                                Ok(())
1299                            } else {
1300                                live_kit.microphone_track = LocalTrack::None;
1301                                cx.notify();
1302                                Err(error)
1303                            }
1304                        }
1305                    }
1306                })
1307        })
1308    }
1309
1310    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1311        if self.status.is_offline() {
1312            return Task::ready(Err(anyhow!("room is offline")));
1313        } else if self.is_screen_sharing() {
1314            return Task::ready(Err(anyhow!("screen was already shared")));
1315        }
1316
1317        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1318            let publish_id = post_inc(&mut live_kit.next_publish_id);
1319            live_kit.screen_track = LocalTrack::Pending {
1320                publish_id,
1321                muted: false,
1322            };
1323            cx.notify();
1324            (live_kit.room.display_sources(), publish_id)
1325        } else {
1326            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1327        };
1328
1329        cx.spawn_weak(|this, mut cx| async move {
1330            let publish_track = async {
1331                let displays = displays.await?;
1332                let display = displays
1333                    .first()
1334                    .ok_or_else(|| anyhow!("no display found"))?;
1335                let track = LocalVideoTrack::screen_share_for_display(&display);
1336                this.upgrade(&cx)
1337                    .ok_or_else(|| anyhow!("room was dropped"))?
1338                    .read_with(&cx, |this, _| {
1339                        this.live_kit
1340                            .as_ref()
1341                            .map(|live_kit| live_kit.room.publish_video_track(track))
1342                    })
1343                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1344                    .await
1345            };
1346
1347            let publication = publish_track.await;
1348            this.upgrade(&cx)
1349                .ok_or_else(|| anyhow!("room was dropped"))?
1350                .update(&mut cx, |this, cx| {
1351                    let live_kit = this
1352                        .live_kit
1353                        .as_mut()
1354                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1355
1356                    let (canceled, muted) = if let LocalTrack::Pending {
1357                        publish_id: cur_publish_id,
1358                        muted,
1359                    } = &live_kit.screen_track
1360                    {
1361                        (*cur_publish_id != publish_id, *muted)
1362                    } else {
1363                        (true, false)
1364                    };
1365
1366                    match publication {
1367                        Ok(publication) => {
1368                            if canceled {
1369                                live_kit.room.unpublish_track(publication);
1370                            } else {
1371                                if muted {
1372                                    cx.background().spawn(publication.set_mute(muted)).detach();
1373                                }
1374                                live_kit.screen_track = LocalTrack::Published {
1375                                    track_publication: publication,
1376                                    muted,
1377                                };
1378                                cx.notify();
1379                            }
1380
1381                            Audio::play_sound(Sound::StartScreenshare, cx);
1382
1383                            Ok(())
1384                        }
1385                        Err(error) => {
1386                            if canceled {
1387                                Ok(())
1388                            } else {
1389                                live_kit.screen_track = LocalTrack::None;
1390                                cx.notify();
1391                                Err(error)
1392                            }
1393                        }
1394                    }
1395                })
1396        })
1397    }
1398
1399    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1400        let should_mute = !self.is_muted(cx);
1401        if let Some(live_kit) = self.live_kit.as_mut() {
1402            if matches!(live_kit.microphone_track, LocalTrack::None) {
1403                return Ok(self.share_microphone(cx));
1404            }
1405
1406            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1407            live_kit.muted_by_user = should_mute;
1408
1409            if old_muted == true && live_kit.deafened == true {
1410                if let Some(task) = self.toggle_deafen(cx).ok() {
1411                    task.detach();
1412                }
1413            }
1414
1415            Ok(ret_task)
1416        } else {
1417            Err(anyhow!("LiveKit not started"))
1418        }
1419    }
1420
1421    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1422        if let Some(live_kit) = self.live_kit.as_mut() {
1423            (*live_kit).deafened = !live_kit.deafened;
1424
1425            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1426            // Context notification is sent within set_mute itself.
1427            let mut mute_task = None;
1428            // When deafening, mute user's mic as well.
1429            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1430            if live_kit.deafened || !live_kit.muted_by_user {
1431                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1432            };
1433            for participant in self.remote_participants.values() {
1434                for track in live_kit
1435                    .room
1436                    .remote_audio_track_publications(&participant.user.id.to_string())
1437                {
1438                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1439                }
1440            }
1441
1442            Ok(cx.foreground().spawn(async move {
1443                if let Some(mute_task) = mute_task {
1444                    mute_task.await?;
1445                }
1446                for task in tasks {
1447                    task.await?;
1448                }
1449                Ok(())
1450            }))
1451        } else {
1452            Err(anyhow!("LiveKit not started"))
1453        }
1454    }
1455
1456    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1457        if self.status.is_offline() {
1458            return Err(anyhow!("room is offline"));
1459        }
1460
1461        let live_kit = self
1462            .live_kit
1463            .as_mut()
1464            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1465        match mem::take(&mut live_kit.screen_track) {
1466            LocalTrack::None => Err(anyhow!("screen was not shared")),
1467            LocalTrack::Pending { .. } => {
1468                cx.notify();
1469                Ok(())
1470            }
1471            LocalTrack::Published {
1472                track_publication, ..
1473            } => {
1474                live_kit.room.unpublish_track(track_publication);
1475                cx.notify();
1476
1477                Audio::play_sound(Sound::StopScreenshare, cx);
1478                Ok(())
1479            }
1480        }
1481    }
1482
1483    #[cfg(any(test, feature = "test-support"))]
1484    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1485        self.live_kit
1486            .as_ref()
1487            .unwrap()
1488            .room
1489            .set_display_sources(sources);
1490    }
1491}
1492
1493struct LiveKitRoom {
1494    room: Arc<live_kit_client::Room>,
1495    screen_track: LocalTrack,
1496    microphone_track: LocalTrack,
1497    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1498    muted_by_user: bool,
1499    deafened: bool,
1500    speaking: bool,
1501    next_publish_id: usize,
1502    can_publish: bool,
1503    _maintain_room: Task<()>,
1504    _maintain_tracks: [Task<()>; 2],
1505}
1506
1507impl LiveKitRoom {
1508    fn set_mute(
1509        self: &mut LiveKitRoom,
1510        should_mute: bool,
1511        cx: &mut ModelContext<Room>,
1512    ) -> Result<(Task<Result<()>>, bool)> {
1513        if !should_mute {
1514            // clear user muting state.
1515            self.muted_by_user = false;
1516        }
1517
1518        let (result, old_muted) = match &mut self.microphone_track {
1519            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1520            LocalTrack::Pending { muted, .. } => {
1521                let old_muted = *muted;
1522                *muted = should_mute;
1523                cx.notify();
1524                Ok((Task::Ready(Some(Ok(()))), old_muted))
1525            }
1526            LocalTrack::Published {
1527                track_publication,
1528                muted,
1529            } => {
1530                let old_muted = *muted;
1531                *muted = should_mute;
1532                cx.notify();
1533                Ok((
1534                    cx.background().spawn(track_publication.set_mute(*muted)),
1535                    old_muted,
1536                ))
1537            }
1538        }?;
1539
1540        if old_muted != should_mute {
1541            if should_mute {
1542                Audio::play_sound(Sound::Mute, cx);
1543            } else {
1544                Audio::play_sound(Sound::Unmute, cx);
1545            }
1546        }
1547
1548        Ok((result, old_muted))
1549    }
1550}
1551
1552enum LocalTrack {
1553    None,
1554    Pending {
1555        publish_id: usize,
1556        muted: bool,
1557    },
1558    Published {
1559        track_publication: LocalTrackPublication,
1560        muted: bool,
1561    },
1562}
1563
1564impl Default for LocalTrack {
1565    fn default() -> Self {
1566        Self::None
1567    }
1568}
1569
1570#[derive(Copy, Clone, PartialEq, Eq)]
1571pub enum RoomStatus {
1572    Online,
1573    Rejoining,
1574    Offline,
1575}
1576
1577impl RoomStatus {
1578    pub fn is_offline(&self) -> bool {
1579        matches!(self, RoomStatus::Offline)
1580    }
1581
1582    pub fn is_online(&self) -> bool {
1583        matches!(self, RoomStatus::Online)
1584    }
1585}