room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio2::{Audio, Sound};
   8use client2::{
   9    proto::{self, PeerId},
  10    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs2::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui2::{
  16    AppContext, AsyncAppContext, Context, EventEmitter, Handle, ModelContext, Task, WeakHandle,
  17};
  18use language2::LanguageRegistry;
  19use live_kit_client::{
  20    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  21    RemoteVideoTrackUpdate,
  22};
  23use postage::{sink::Sink, stream::Stream, watch};
  24use project2::Project;
  25use settings2::Settings;
  26use std::{future::Future, mem, sync::Arc, time::Duration};
  27use util::{post_inc, ResultExt, TryFutureExt};
  28
  29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  30
  31#[derive(Clone, Debug, PartialEq, Eq)]
  32pub enum Event {
  33    ParticipantLocationChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteVideoTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteAudioTracksChanged {
  40        participant_id: proto::PeerId,
  41    },
  42    RemoteProjectShared {
  43        owner: Arc<User>,
  44        project_id: u64,
  45        worktree_root_names: Vec<String>,
  46    },
  47    RemoteProjectUnshared {
  48        project_id: u64,
  49    },
  50    RemoteProjectJoined {
  51        project_id: u64,
  52    },
  53    RemoteProjectInvitationDiscarded {
  54        project_id: u64,
  55    },
  56    Left,
  57}
  58
  59pub struct Room {
  60    id: u64,
  61    channel_id: Option<u64>,
  62    live_kit: Option<LiveKitRoom>,
  63    status: RoomStatus,
  64    shared_projects: HashSet<WeakHandle<Project>>,
  65    joined_projects: HashSet<WeakHandle<Project>>,
  66    local_participant: LocalParticipant,
  67    remote_participants: BTreeMap<u64, RemoteParticipant>,
  68    pending_participants: Vec<Arc<User>>,
  69    participant_user_ids: HashSet<u64>,
  70    pending_call_count: usize,
  71    leave_when_empty: bool,
  72    client: Arc<Client>,
  73    user_store: Handle<UserStore>,
  74    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  75    client_subscriptions: Vec<client2::Subscription>,
  76    _subscriptions: Vec<gpui2::Subscription>,
  77    room_update_completed_tx: watch::Sender<Option<()>>,
  78    room_update_completed_rx: watch::Receiver<Option<()>>,
  79    pending_room_update: Option<Task<()>>,
  80    maintain_connection: Option<Task<Option<()>>>,
  81}
  82
  83impl EventEmitter for Room {
  84    type Event = Event;
  85}
  86
  87impl Room {
  88    pub fn channel_id(&self) -> Option<u64> {
  89        self.channel_id
  90    }
  91
  92    pub fn is_sharing_project(&self) -> bool {
  93        !self.shared_projects.is_empty()
  94    }
  95
  96    #[cfg(any(test, feature = "test-support"))]
  97    pub fn is_connected(&self) -> bool {
  98        if let Some(live_kit) = self.live_kit.as_ref() {
  99            matches!(
 100                *live_kit.room.status().borrow(),
 101                live_kit_client::ConnectionState::Connected { .. }
 102            )
 103        } else {
 104            false
 105        }
 106    }
 107
 108    fn new(
 109        id: u64,
 110        channel_id: Option<u64>,
 111        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 112        client: Arc<Client>,
 113        user_store: Handle<UserStore>,
 114        cx: &mut ModelContext<Self>,
 115    ) -> Self {
 116        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 117            let room = live_kit_client::Room::new();
 118            let mut status = room.status();
 119            // Consume the initial status of the room.
 120            let _ = status.try_recv();
 121            let _maintain_room = cx.spawn(|this, mut cx| async move {
 122                while let Some(status) = status.next().await {
 123                    let this = if let Some(this) = this.upgrade() {
 124                        this
 125                    } else {
 126                        break;
 127                    };
 128
 129                    if status == live_kit_client::ConnectionState::Disconnected {
 130                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 131                            .ok();
 132                        break;
 133                    }
 134                }
 135            });
 136
 137            let mut track_video_changes = room.remote_video_track_updates();
 138            let _maintain_video_tracks = cx.spawn(|this, mut cx| async move {
 139                while let Some(track_change) = track_video_changes.next().await {
 140                    let this = if let Some(this) = this.upgrade() {
 141                        this
 142                    } else {
 143                        break;
 144                    };
 145
 146                    this.update(&mut cx, |this, cx| {
 147                        this.remote_video_track_updated(track_change, cx).log_err()
 148                    })
 149                    .ok();
 150                }
 151            });
 152
 153            let mut track_audio_changes = room.remote_audio_track_updates();
 154            let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move {
 155                while let Some(track_change) = track_audio_changes.next().await {
 156                    let this = if let Some(this) = this.upgrade() {
 157                        this
 158                    } else {
 159                        break;
 160                    };
 161
 162                    this.update(&mut cx, |this, cx| {
 163                        this.remote_audio_track_updated(track_change, cx).log_err()
 164                    })
 165                    .ok();
 166                }
 167            });
 168
 169            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 170            cx.spawn(|this, mut cx| async move {
 171                connect.await?;
 172
 173                if !cx.update(|cx| Self::mute_on_join(cx))? {
 174                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 175                        .await?;
 176                }
 177
 178                anyhow::Ok(())
 179            })
 180            .detach_and_log_err(cx);
 181
 182            Some(LiveKitRoom {
 183                room,
 184                screen_track: LocalTrack::None,
 185                microphone_track: LocalTrack::None,
 186                next_publish_id: 0,
 187                muted_by_user: false,
 188                deafened: false,
 189                speaking: false,
 190                _maintain_room,
 191                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 192            })
 193        } else {
 194            None
 195        };
 196
 197        let maintain_connection = cx.spawn({
 198            let client = client.clone();
 199            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 200        });
 201
 202        Audio::play_sound(Sound::Joined, cx);
 203
 204        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 205
 206        Self {
 207            id,
 208            channel_id,
 209            live_kit: live_kit_room,
 210            status: RoomStatus::Online,
 211            shared_projects: Default::default(),
 212            joined_projects: Default::default(),
 213            participant_user_ids: Default::default(),
 214            local_participant: Default::default(),
 215            remote_participants: Default::default(),
 216            pending_participants: Default::default(),
 217            pending_call_count: 0,
 218            client_subscriptions: vec![
 219                client.add_message_handler(cx.weak_handle(), Self::handle_room_updated)
 220            ],
 221            _subscriptions: vec![
 222                cx.on_release(Self::released),
 223                cx.on_app_quit(Self::app_will_quit),
 224            ],
 225            leave_when_empty: false,
 226            pending_room_update: None,
 227            client,
 228            user_store,
 229            follows_by_leader_id_project_id: Default::default(),
 230            maintain_connection: Some(maintain_connection),
 231            room_update_completed_tx,
 232            room_update_completed_rx,
 233        }
 234    }
 235
 236    pub(crate) fn create(
 237        called_user_id: u64,
 238        initial_project: Option<Handle<Project>>,
 239        client: Arc<Client>,
 240        user_store: Handle<UserStore>,
 241        cx: &mut AppContext,
 242    ) -> Task<Result<Handle<Self>>> {
 243        cx.spawn(move |mut cx| async move {
 244            let response = client.request(proto::CreateRoom {}).await?;
 245            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 246            let room = cx.entity(|cx| {
 247                Self::new(
 248                    room_proto.id,
 249                    None,
 250                    response.live_kit_connection_info,
 251                    client,
 252                    user_store,
 253                    cx,
 254                )
 255            })?;
 256
 257            let initial_project_id = if let Some(initial_project) = initial_project {
 258                let initial_project_id = room
 259                    .update(&mut cx, |room, cx| {
 260                        room.share_project(initial_project.clone(), cx)
 261                    })?
 262                    .await?;
 263                Some(initial_project_id)
 264            } else {
 265                None
 266            };
 267
 268            match room
 269                .update(&mut cx, |room, cx| {
 270                    room.leave_when_empty = true;
 271                    room.call(called_user_id, initial_project_id, cx)
 272                })?
 273                .await
 274            {
 275                Ok(()) => Ok(room),
 276                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 277            }
 278        })
 279    }
 280
 281    pub(crate) fn join_channel(
 282        channel_id: u64,
 283        client: Arc<Client>,
 284        user_store: Handle<UserStore>,
 285        cx: &mut AppContext,
 286    ) -> Task<Result<Handle<Self>>> {
 287        cx.spawn(move |cx| async move {
 288            Self::from_join_response(
 289                client.request(proto::JoinChannel { channel_id }).await?,
 290                client,
 291                user_store,
 292                cx,
 293            )
 294        })
 295    }
 296
 297    pub(crate) fn join(
 298        call: &IncomingCall,
 299        client: Arc<Client>,
 300        user_store: Handle<UserStore>,
 301        cx: &mut AppContext,
 302    ) -> Task<Result<Handle<Self>>> {
 303        let id = call.room_id;
 304        cx.spawn(move |cx| async move {
 305            Self::from_join_response(
 306                client.request(proto::JoinRoom { id }).await?,
 307                client,
 308                user_store,
 309                cx,
 310            )
 311        })
 312    }
 313
 314    fn released(&mut self, cx: &mut AppContext) {
 315        if self.status.is_online() {
 316            self.leave_internal(cx).detach_and_log_err(cx);
 317        }
 318    }
 319
 320    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 321        let task = if self.status.is_online() {
 322            let leave = self.leave_internal(cx);
 323            Some(cx.executor().spawn(async move {
 324                leave.await.log_err();
 325            }))
 326        } else {
 327            None
 328        };
 329
 330        async move {
 331            if let Some(task) = task {
 332                task.await;
 333            }
 334        }
 335    }
 336
 337    pub fn mute_on_join(cx: &AppContext) -> bool {
 338        CallSettings::get_global(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some()
 339    }
 340
 341    fn from_join_response(
 342        response: proto::JoinRoomResponse,
 343        client: Arc<Client>,
 344        user_store: Handle<UserStore>,
 345        mut cx: AsyncAppContext,
 346    ) -> Result<Handle<Self>> {
 347        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 348        let room = cx.entity(|cx| {
 349            Self::new(
 350                room_proto.id,
 351                response.channel_id,
 352                response.live_kit_connection_info,
 353                client,
 354                user_store,
 355                cx,
 356            )
 357        })?;
 358        room.update(&mut cx, |room, cx| {
 359            room.leave_when_empty = room.channel_id.is_none();
 360            room.apply_room_update(room_proto, cx)?;
 361            anyhow::Ok(())
 362        })??;
 363        Ok(room)
 364    }
 365
 366    fn should_leave(&self) -> bool {
 367        self.leave_when_empty
 368            && self.pending_room_update.is_none()
 369            && self.pending_participants.is_empty()
 370            && self.remote_participants.is_empty()
 371            && self.pending_call_count == 0
 372    }
 373
 374    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 375        cx.notify();
 376        cx.emit(Event::Left);
 377        self.leave_internal(cx)
 378    }
 379
 380    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 381        if self.status.is_offline() {
 382            return Task::ready(Err(anyhow!("room is offline")));
 383        }
 384
 385        log::info!("leaving room");
 386        Audio::play_sound(Sound::Leave, cx);
 387
 388        self.clear_state(cx);
 389
 390        let leave_room = self.client.request(proto::LeaveRoom {});
 391        cx.executor().spawn(async move {
 392            leave_room.await?;
 393            anyhow::Ok(())
 394        })
 395    }
 396
 397    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 398        for project in self.shared_projects.drain() {
 399            if let Some(project) = project.upgrade() {
 400                project.update(cx, |project, cx| {
 401                    project.unshare(cx).log_err();
 402                });
 403            }
 404        }
 405        for project in self.joined_projects.drain() {
 406            if let Some(project) = project.upgrade() {
 407                project.update(cx, |project, cx| {
 408                    project.disconnected_from_host(cx);
 409                    project.close(cx);
 410                });
 411            }
 412        }
 413
 414        self.status = RoomStatus::Offline;
 415        self.remote_participants.clear();
 416        self.pending_participants.clear();
 417        self.participant_user_ids.clear();
 418        self.client_subscriptions.clear();
 419        self.live_kit.take();
 420        self.pending_room_update.take();
 421        self.maintain_connection.take();
 422    }
 423
 424    async fn maintain_connection(
 425        this: WeakHandle<Self>,
 426        client: Arc<Client>,
 427        mut cx: AsyncAppContext,
 428    ) -> Result<()> {
 429        let mut client_status = client.status();
 430        loop {
 431            let _ = client_status.try_recv();
 432            let is_connected = client_status.borrow().is_connected();
 433            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 434            if !is_connected || client_status.next().await.is_some() {
 435                log::info!("detected client disconnection");
 436
 437                this.upgrade()
 438                    .ok_or_else(|| anyhow!("room was dropped"))?
 439                    .update(&mut cx, |this, cx| {
 440                        this.status = RoomStatus::Rejoining;
 441                        cx.notify();
 442                    })?;
 443
 444                // Wait for client to re-establish a connection to the server.
 445                {
 446                    let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse();
 447                    let client_reconnection = async {
 448                        let mut remaining_attempts = 3;
 449                        while remaining_attempts > 0 {
 450                            if client_status.borrow().is_connected() {
 451                                log::info!("client reconnected, attempting to rejoin room");
 452
 453                                let Some(this) = this.upgrade() else { break };
 454                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 455                                    Ok(task) => {
 456                                        if task.await.log_err().is_some() {
 457                                            return true;
 458                                        } else {
 459                                            remaining_attempts -= 1;
 460                                        }
 461                                    }
 462                                    Err(_app_dropped) => return false,
 463                                }
 464                            } else if client_status.borrow().is_signed_out() {
 465                                return false;
 466                            }
 467
 468                            log::info!(
 469                                "waiting for client status change, remaining attempts {}",
 470                                remaining_attempts
 471                            );
 472                            client_status.next().await;
 473                        }
 474                        false
 475                    }
 476                    .fuse();
 477                    futures::pin_mut!(client_reconnection);
 478
 479                    futures::select_biased! {
 480                        reconnected = client_reconnection => {
 481                            if reconnected {
 482                                log::info!("successfully reconnected to room");
 483                                // If we successfully joined the room, go back around the loop
 484                                // waiting for future connection status changes.
 485                                continue;
 486                            }
 487                        }
 488                        _ = reconnection_timeout => {
 489                            log::info!("room reconnection timeout expired");
 490                        }
 491                    }
 492                }
 493
 494                break;
 495            }
 496        }
 497
 498        // The client failed to re-establish a connection to the server
 499        // or an error occurred while trying to re-join the room. Either way
 500        // we leave the room and return an error.
 501        if let Some(this) = this.upgrade() {
 502            log::info!("reconnection failed, leaving room");
 503            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 504        }
 505        Err(anyhow!(
 506            "can't reconnect to room: client failed to re-establish connection"
 507        ))
 508    }
 509
 510    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 511        let mut projects = HashMap::default();
 512        let mut reshared_projects = Vec::new();
 513        let mut rejoined_projects = Vec::new();
 514        self.shared_projects.retain(|project| {
 515            if let Some(handle) = project.upgrade() {
 516                let project = handle.read(cx);
 517                if let Some(project_id) = project.remote_id() {
 518                    projects.insert(project_id, handle.clone());
 519                    reshared_projects.push(proto::UpdateProject {
 520                        project_id,
 521                        worktrees: project.worktree_metadata_protos(cx),
 522                    });
 523                    return true;
 524                }
 525            }
 526            false
 527        });
 528        self.joined_projects.retain(|project| {
 529            if let Some(handle) = project.upgrade() {
 530                let project = handle.read(cx);
 531                if let Some(project_id) = project.remote_id() {
 532                    projects.insert(project_id, handle.clone());
 533                    rejoined_projects.push(proto::RejoinProject {
 534                        id: project_id,
 535                        worktrees: project
 536                            .worktrees()
 537                            .map(|worktree| {
 538                                let worktree = worktree.read(cx);
 539                                proto::RejoinWorktree {
 540                                    id: worktree.id().to_proto(),
 541                                    scan_id: worktree.completed_scan_id() as u64,
 542                                }
 543                            })
 544                            .collect(),
 545                    });
 546                }
 547                return true;
 548            }
 549            false
 550        });
 551
 552        let response = self.client.request_envelope(proto::RejoinRoom {
 553            id: self.id,
 554            reshared_projects,
 555            rejoined_projects,
 556        });
 557
 558        cx.spawn(|this, mut cx| async move {
 559            let response = response.await?;
 560            let message_id = response.message_id;
 561            let response = response.payload;
 562            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 563            this.update(&mut cx, |this, cx| {
 564                this.status = RoomStatus::Online;
 565                this.apply_room_update(room_proto, cx)?;
 566
 567                for reshared_project in response.reshared_projects {
 568                    if let Some(project) = projects.get(&reshared_project.id) {
 569                        project.update(cx, |project, cx| {
 570                            project.reshared(reshared_project, cx).log_err();
 571                        });
 572                    }
 573                }
 574
 575                for rejoined_project in response.rejoined_projects {
 576                    if let Some(project) = projects.get(&rejoined_project.id) {
 577                        project.update(cx, |project, cx| {
 578                            project.rejoined(rejoined_project, message_id, cx).log_err();
 579                        });
 580                    }
 581                }
 582
 583                anyhow::Ok(())
 584            })?
 585        })
 586    }
 587
 588    pub fn id(&self) -> u64 {
 589        self.id
 590    }
 591
 592    pub fn status(&self) -> RoomStatus {
 593        self.status
 594    }
 595
 596    pub fn local_participant(&self) -> &LocalParticipant {
 597        &self.local_participant
 598    }
 599
 600    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 601        &self.remote_participants
 602    }
 603
 604    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 605        self.remote_participants
 606            .values()
 607            .find(|p| p.peer_id == peer_id)
 608    }
 609
 610    pub fn pending_participants(&self) -> &[Arc<User>] {
 611        &self.pending_participants
 612    }
 613
 614    pub fn contains_participant(&self, user_id: u64) -> bool {
 615        self.participant_user_ids.contains(&user_id)
 616    }
 617
 618    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 619        self.follows_by_leader_id_project_id
 620            .get(&(leader_id, project_id))
 621            .map_or(&[], |v| v.as_slice())
 622    }
 623
 624    /// Returns the most 'active' projects, defined as most people in the project
 625    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 626        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 627        for participant in self.remote_participants.values() {
 628            match participant.location {
 629                ParticipantLocation::SharedProject { project_id } => {
 630                    project_hosts_and_guest_counts
 631                        .entry(project_id)
 632                        .or_default()
 633                        .1 += 1;
 634                }
 635                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 636            }
 637            for project in &participant.projects {
 638                project_hosts_and_guest_counts
 639                    .entry(project.id)
 640                    .or_default()
 641                    .0 = Some(participant.user.id);
 642            }
 643        }
 644
 645        if let Some(user) = self.user_store.read(cx).current_user() {
 646            for project in &self.local_participant.projects {
 647                project_hosts_and_guest_counts
 648                    .entry(project.id)
 649                    .or_default()
 650                    .0 = Some(user.id);
 651            }
 652        }
 653
 654        project_hosts_and_guest_counts
 655            .into_iter()
 656            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 657            .max_by_key(|(_, _, guest_count)| *guest_count)
 658            .map(|(id, host, _)| (id, host))
 659    }
 660
 661    async fn handle_room_updated(
 662        this: Handle<Self>,
 663        envelope: TypedEnvelope<proto::RoomUpdated>,
 664        _: Arc<Client>,
 665        mut cx: AsyncAppContext,
 666    ) -> Result<()> {
 667        let room = envelope
 668            .payload
 669            .room
 670            .ok_or_else(|| anyhow!("invalid room"))?;
 671        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 672    }
 673
 674    fn apply_room_update(
 675        &mut self,
 676        mut room: proto::Room,
 677        cx: &mut ModelContext<Self>,
 678    ) -> Result<()> {
 679        // Filter ourselves out from the room's participants.
 680        let local_participant_ix = room
 681            .participants
 682            .iter()
 683            .position(|participant| Some(participant.user_id) == self.client.user_id());
 684        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 685
 686        let pending_participant_user_ids = room
 687            .pending_participants
 688            .iter()
 689            .map(|p| p.user_id)
 690            .collect::<Vec<_>>();
 691
 692        let remote_participant_user_ids = room
 693            .participants
 694            .iter()
 695            .map(|p| p.user_id)
 696            .collect::<Vec<_>>();
 697
 698        let (remote_participants, pending_participants) =
 699            self.user_store.update(cx, move |user_store, cx| {
 700                (
 701                    user_store.get_users(remote_participant_user_ids, cx),
 702                    user_store.get_users(pending_participant_user_ids, cx),
 703                )
 704            });
 705
 706        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 707            let (remote_participants, pending_participants) =
 708                futures::join!(remote_participants, pending_participants);
 709
 710            this.update(&mut cx, |this, cx| {
 711                this.participant_user_ids.clear();
 712
 713                if let Some(participant) = local_participant {
 714                    this.local_participant.projects = participant.projects;
 715                } else {
 716                    this.local_participant.projects.clear();
 717                }
 718
 719                if let Some(participants) = remote_participants.log_err() {
 720                    for (participant, user) in room.participants.into_iter().zip(participants) {
 721                        let Some(peer_id) = participant.peer_id else {
 722                            continue;
 723                        };
 724                        let participant_index = ParticipantIndex(participant.participant_index);
 725                        this.participant_user_ids.insert(participant.user_id);
 726
 727                        let old_projects = this
 728                            .remote_participants
 729                            .get(&participant.user_id)
 730                            .into_iter()
 731                            .flat_map(|existing| &existing.projects)
 732                            .map(|project| project.id)
 733                            .collect::<HashSet<_>>();
 734                        let new_projects = participant
 735                            .projects
 736                            .iter()
 737                            .map(|project| project.id)
 738                            .collect::<HashSet<_>>();
 739
 740                        for project in &participant.projects {
 741                            if !old_projects.contains(&project.id) {
 742                                cx.emit(Event::RemoteProjectShared {
 743                                    owner: user.clone(),
 744                                    project_id: project.id,
 745                                    worktree_root_names: project.worktree_root_names.clone(),
 746                                });
 747                            }
 748                        }
 749
 750                        for unshared_project_id in old_projects.difference(&new_projects) {
 751                            this.joined_projects.retain(|project| {
 752                                if let Some(project) = project.upgrade() {
 753                                    project.update(cx, |project, cx| {
 754                                        if project.remote_id() == Some(*unshared_project_id) {
 755                                            project.disconnected_from_host(cx);
 756                                            false
 757                                        } else {
 758                                            true
 759                                        }
 760                                    })
 761                                } else {
 762                                    false
 763                                }
 764                            });
 765                            cx.emit(Event::RemoteProjectUnshared {
 766                                project_id: *unshared_project_id,
 767                            });
 768                        }
 769
 770                        let location = ParticipantLocation::from_proto(participant.location)
 771                            .unwrap_or(ParticipantLocation::External);
 772                        if let Some(remote_participant) =
 773                            this.remote_participants.get_mut(&participant.user_id)
 774                        {
 775                            remote_participant.peer_id = peer_id;
 776                            remote_participant.projects = participant.projects;
 777                            remote_participant.participant_index = participant_index;
 778                            if location != remote_participant.location {
 779                                remote_participant.location = location;
 780                                cx.emit(Event::ParticipantLocationChanged {
 781                                    participant_id: peer_id,
 782                                });
 783                            }
 784                        } else {
 785                            this.remote_participants.insert(
 786                                participant.user_id,
 787                                RemoteParticipant {
 788                                    user: user.clone(),
 789                                    participant_index,
 790                                    peer_id,
 791                                    projects: participant.projects,
 792                                    location,
 793                                    muted: true,
 794                                    speaking: false,
 795                                    video_tracks: Default::default(),
 796                                    audio_tracks: Default::default(),
 797                                },
 798                            );
 799
 800                            Audio::play_sound(Sound::Joined, cx);
 801
 802                            if let Some(live_kit) = this.live_kit.as_ref() {
 803                                let video_tracks =
 804                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 805                                let audio_tracks =
 806                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 807                                let publications = live_kit
 808                                    .room
 809                                    .remote_audio_track_publications(&user.id.to_string());
 810
 811                                for track in video_tracks {
 812                                    this.remote_video_track_updated(
 813                                        RemoteVideoTrackUpdate::Subscribed(track),
 814                                        cx,
 815                                    )
 816                                    .log_err();
 817                                }
 818
 819                                for (track, publication) in
 820                                    audio_tracks.iter().zip(publications.iter())
 821                                {
 822                                    this.remote_audio_track_updated(
 823                                        RemoteAudioTrackUpdate::Subscribed(
 824                                            track.clone(),
 825                                            publication.clone(),
 826                                        ),
 827                                        cx,
 828                                    )
 829                                    .log_err();
 830                                }
 831                            }
 832                        }
 833                    }
 834
 835                    this.remote_participants.retain(|user_id, participant| {
 836                        if this.participant_user_ids.contains(user_id) {
 837                            true
 838                        } else {
 839                            for project in &participant.projects {
 840                                cx.emit(Event::RemoteProjectUnshared {
 841                                    project_id: project.id,
 842                                });
 843                            }
 844                            false
 845                        }
 846                    });
 847                }
 848
 849                if let Some(pending_participants) = pending_participants.log_err() {
 850                    this.pending_participants = pending_participants;
 851                    for participant in &this.pending_participants {
 852                        this.participant_user_ids.insert(participant.id);
 853                    }
 854                }
 855
 856                this.follows_by_leader_id_project_id.clear();
 857                for follower in room.followers {
 858                    let project_id = follower.project_id;
 859                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 860                        (Some(leader), Some(follower)) => (leader, follower),
 861
 862                        _ => {
 863                            log::error!("Follower message {follower:?} missing some state");
 864                            continue;
 865                        }
 866                    };
 867
 868                    let list = this
 869                        .follows_by_leader_id_project_id
 870                        .entry((leader, project_id))
 871                        .or_insert(Vec::new());
 872                    if !list.contains(&follower) {
 873                        list.push(follower);
 874                    }
 875                }
 876
 877                this.pending_room_update.take();
 878                if this.should_leave() {
 879                    log::info!("room is empty, leaving");
 880                    let _ = this.leave(cx);
 881                }
 882
 883                this.user_store.update(cx, |user_store, cx| {
 884                    let participant_indices_by_user_id = this
 885                        .remote_participants
 886                        .iter()
 887                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 888                        .collect();
 889                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 890                });
 891
 892                this.check_invariants();
 893                this.room_update_completed_tx.try_send(Some(())).ok();
 894                cx.notify();
 895            })
 896            .ok();
 897        }));
 898
 899        cx.notify();
 900        Ok(())
 901    }
 902
 903    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 904        let mut done_rx = self.room_update_completed_rx.clone();
 905        async move {
 906            while let Some(result) = done_rx.next().await {
 907                if result.is_some() {
 908                    break;
 909                }
 910            }
 911        }
 912    }
 913
 914    fn remote_video_track_updated(
 915        &mut self,
 916        change: RemoteVideoTrackUpdate,
 917        cx: &mut ModelContext<Self>,
 918    ) -> Result<()> {
 919        match change {
 920            RemoteVideoTrackUpdate::Subscribed(track) => {
 921                let user_id = track.publisher_id().parse()?;
 922                let track_id = track.sid().to_string();
 923                let participant = self
 924                    .remote_participants
 925                    .get_mut(&user_id)
 926                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 927                participant.video_tracks.insert(
 928                    track_id.clone(),
 929                    Arc::new(RemoteVideoTrack {
 930                        live_kit_track: track,
 931                    }),
 932                );
 933                cx.emit(Event::RemoteVideoTracksChanged {
 934                    participant_id: participant.peer_id,
 935                });
 936            }
 937            RemoteVideoTrackUpdate::Unsubscribed {
 938                publisher_id,
 939                track_id,
 940            } => {
 941                let user_id = publisher_id.parse()?;
 942                let participant = self
 943                    .remote_participants
 944                    .get_mut(&user_id)
 945                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 946                participant.video_tracks.remove(&track_id);
 947                cx.emit(Event::RemoteVideoTracksChanged {
 948                    participant_id: participant.peer_id,
 949                });
 950            }
 951        }
 952
 953        cx.notify();
 954        Ok(())
 955    }
 956
 957    fn remote_audio_track_updated(
 958        &mut self,
 959        change: RemoteAudioTrackUpdate,
 960        cx: &mut ModelContext<Self>,
 961    ) -> Result<()> {
 962        match change {
 963            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 964                let mut speaker_ids = speakers
 965                    .into_iter()
 966                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 967                    .collect::<Vec<u64>>();
 968                speaker_ids.sort_unstable();
 969                for (sid, participant) in &mut self.remote_participants {
 970                    if let Ok(_) = speaker_ids.binary_search(sid) {
 971                        participant.speaking = true;
 972                    } else {
 973                        participant.speaking = false;
 974                    }
 975                }
 976                if let Some(id) = self.client.user_id() {
 977                    if let Some(room) = &mut self.live_kit {
 978                        if let Ok(_) = speaker_ids.binary_search(&id) {
 979                            room.speaking = true;
 980                        } else {
 981                            room.speaking = false;
 982                        }
 983                    }
 984                }
 985                cx.notify();
 986            }
 987            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 988                let mut found = false;
 989                for participant in &mut self.remote_participants.values_mut() {
 990                    for track in participant.audio_tracks.values() {
 991                        if track.sid() == track_id {
 992                            found = true;
 993                            break;
 994                        }
 995                    }
 996                    if found {
 997                        participant.muted = muted;
 998                        break;
 999                    }
1000                }
1001
1002                cx.notify();
1003            }
1004            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1005                let user_id = track.publisher_id().parse()?;
1006                let track_id = track.sid().to_string();
1007                let participant = self
1008                    .remote_participants
1009                    .get_mut(&user_id)
1010                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1011
1012                participant.audio_tracks.insert(track_id.clone(), track);
1013                participant.muted = publication.is_muted();
1014
1015                cx.emit(Event::RemoteAudioTracksChanged {
1016                    participant_id: participant.peer_id,
1017                });
1018            }
1019            RemoteAudioTrackUpdate::Unsubscribed {
1020                publisher_id,
1021                track_id,
1022            } => {
1023                let user_id = publisher_id.parse()?;
1024                let participant = self
1025                    .remote_participants
1026                    .get_mut(&user_id)
1027                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1028                participant.audio_tracks.remove(&track_id);
1029                cx.emit(Event::RemoteAudioTracksChanged {
1030                    participant_id: participant.peer_id,
1031                });
1032            }
1033        }
1034
1035        cx.notify();
1036        Ok(())
1037    }
1038
1039    fn check_invariants(&self) {
1040        #[cfg(any(test, feature = "test-support"))]
1041        {
1042            for participant in self.remote_participants.values() {
1043                assert!(self.participant_user_ids.contains(&participant.user.id));
1044                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1045            }
1046
1047            for participant in &self.pending_participants {
1048                assert!(self.participant_user_ids.contains(&participant.id));
1049                assert_ne!(participant.id, self.client.user_id().unwrap());
1050            }
1051
1052            assert_eq!(
1053                self.participant_user_ids.len(),
1054                self.remote_participants.len() + self.pending_participants.len()
1055            );
1056        }
1057    }
1058
1059    pub(crate) fn call(
1060        &mut self,
1061        called_user_id: u64,
1062        initial_project_id: Option<u64>,
1063        cx: &mut ModelContext<Self>,
1064    ) -> Task<Result<()>> {
1065        if self.status.is_offline() {
1066            return Task::ready(Err(anyhow!("room is offline")));
1067        }
1068
1069        cx.notify();
1070        let client = self.client.clone();
1071        let room_id = self.id;
1072        self.pending_call_count += 1;
1073        cx.spawn(move |this, mut cx| async move {
1074            let result = client
1075                .request(proto::Call {
1076                    room_id,
1077                    called_user_id,
1078                    initial_project_id,
1079                })
1080                .await;
1081            this.update(&mut cx, |this, cx| {
1082                this.pending_call_count -= 1;
1083                if this.should_leave() {
1084                    this.leave(cx).detach_and_log_err(cx);
1085                }
1086            })?;
1087            result?;
1088            Ok(())
1089        })
1090    }
1091
1092    pub fn join_project(
1093        &mut self,
1094        id: u64,
1095        language_registry: Arc<LanguageRegistry>,
1096        fs: Arc<dyn Fs>,
1097        cx: &mut ModelContext<Self>,
1098    ) -> Task<Result<Handle<Project>>> {
1099        let client = self.client.clone();
1100        let user_store = self.user_store.clone();
1101        cx.emit(Event::RemoteProjectJoined { project_id: id });
1102        cx.spawn(move |this, mut cx| async move {
1103            let project =
1104                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1105
1106            this.update(&mut cx, |this, cx| {
1107                this.joined_projects.retain(|project| {
1108                    if let Some(project) = project.upgrade() {
1109                        !project.read(cx).is_read_only()
1110                    } else {
1111                        false
1112                    }
1113                });
1114                this.joined_projects.insert(project.downgrade());
1115            })?;
1116            Ok(project)
1117        })
1118    }
1119
1120    pub(crate) fn share_project(
1121        &mut self,
1122        project: Handle<Project>,
1123        cx: &mut ModelContext<Self>,
1124    ) -> Task<Result<u64>> {
1125        if let Some(project_id) = project.read(cx).remote_id() {
1126            return Task::ready(Ok(project_id));
1127        }
1128
1129        let request = self.client.request(proto::ShareProject {
1130            room_id: self.id(),
1131            worktrees: project.read(cx).worktree_metadata_protos(cx),
1132        });
1133        cx.spawn(|this, mut cx| async move {
1134            let response = request.await?;
1135
1136            project.update(&mut cx, |project, cx| {
1137                project.shared(response.project_id, cx)
1138            })??;
1139
1140            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1141            this.update(&mut cx, |this, cx| {
1142                this.shared_projects.insert(project.downgrade());
1143                let active_project = this.local_participant.active_project.as_ref();
1144                if active_project.map_or(false, |location| *location == project) {
1145                    this.set_location(Some(&project), cx)
1146                } else {
1147                    Task::ready(Ok(()))
1148                }
1149            })?
1150            .await?;
1151
1152            Ok(response.project_id)
1153        })
1154    }
1155
1156    pub(crate) fn unshare_project(
1157        &mut self,
1158        project: Handle<Project>,
1159        cx: &mut ModelContext<Self>,
1160    ) -> Result<()> {
1161        let project_id = match project.read(cx).remote_id() {
1162            Some(project_id) => project_id,
1163            None => return Ok(()),
1164        };
1165
1166        self.client.send(proto::UnshareProject { project_id })?;
1167        project.update(cx, |this, cx| this.unshare(cx))
1168    }
1169
1170    pub(crate) fn set_location(
1171        &mut self,
1172        project: Option<&Handle<Project>>,
1173        cx: &mut ModelContext<Self>,
1174    ) -> Task<Result<()>> {
1175        if self.status.is_offline() {
1176            return Task::ready(Err(anyhow!("room is offline")));
1177        }
1178
1179        let client = self.client.clone();
1180        let room_id = self.id;
1181        let location = if let Some(project) = project {
1182            self.local_participant.active_project = Some(project.downgrade());
1183            if let Some(project_id) = project.read(cx).remote_id() {
1184                proto::participant_location::Variant::SharedProject(
1185                    proto::participant_location::SharedProject { id: project_id },
1186                )
1187            } else {
1188                proto::participant_location::Variant::UnsharedProject(
1189                    proto::participant_location::UnsharedProject {},
1190                )
1191            }
1192        } else {
1193            self.local_participant.active_project = None;
1194            proto::participant_location::Variant::External(proto::participant_location::External {})
1195        };
1196
1197        cx.notify();
1198        cx.executor().spawn_on_main(move || async move {
1199            client
1200                .request(proto::UpdateParticipantLocation {
1201                    room_id,
1202                    location: Some(proto::ParticipantLocation {
1203                        variant: Some(location),
1204                    }),
1205                })
1206                .await?;
1207            Ok(())
1208        })
1209    }
1210
1211    pub fn is_screen_sharing(&self) -> bool {
1212        self.live_kit.as_ref().map_or(false, |live_kit| {
1213            !matches!(live_kit.screen_track, LocalTrack::None)
1214        })
1215    }
1216
1217    pub fn is_sharing_mic(&self) -> bool {
1218        self.live_kit.as_ref().map_or(false, |live_kit| {
1219            !matches!(live_kit.microphone_track, LocalTrack::None)
1220        })
1221    }
1222
1223    pub fn is_muted(&self, cx: &AppContext) -> bool {
1224        self.live_kit
1225            .as_ref()
1226            .and_then(|live_kit| match &live_kit.microphone_track {
1227                LocalTrack::None => Some(Self::mute_on_join(cx)),
1228                LocalTrack::Pending { muted, .. } => Some(*muted),
1229                LocalTrack::Published { muted, .. } => Some(*muted),
1230            })
1231            .unwrap_or(false)
1232    }
1233
1234    pub fn is_speaking(&self) -> bool {
1235        self.live_kit
1236            .as_ref()
1237            .map_or(false, |live_kit| live_kit.speaking)
1238    }
1239
1240    pub fn is_deafened(&self) -> Option<bool> {
1241        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1242    }
1243
1244    #[track_caller]
1245    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1246        if self.status.is_offline() {
1247            return Task::ready(Err(anyhow!("room is offline")));
1248        } else if self.is_sharing_mic() {
1249            return Task::ready(Err(anyhow!("microphone was already shared")));
1250        }
1251
1252        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1253            let publish_id = post_inc(&mut live_kit.next_publish_id);
1254            live_kit.microphone_track = LocalTrack::Pending {
1255                publish_id,
1256                muted: false,
1257            };
1258            cx.notify();
1259            publish_id
1260        } else {
1261            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1262        };
1263
1264        cx.spawn(move |this, mut cx| async move {
1265            let publish_track = async {
1266                let track = LocalAudioTrack::create();
1267                this.upgrade()
1268                    .ok_or_else(|| anyhow!("room was dropped"))?
1269                    .update(&mut cx, |this, _| {
1270                        this.live_kit
1271                            .as_ref()
1272                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1273                    })?
1274                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1275                    .await
1276            };
1277
1278            let publication = publish_track.await;
1279            this.upgrade()
1280                .ok_or_else(|| anyhow!("room was dropped"))?
1281                .update(&mut cx, |this, cx| {
1282                    let live_kit = this
1283                        .live_kit
1284                        .as_mut()
1285                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1286
1287                    let (canceled, muted) = if let LocalTrack::Pending {
1288                        publish_id: cur_publish_id,
1289                        muted,
1290                    } = &live_kit.microphone_track
1291                    {
1292                        (*cur_publish_id != publish_id, *muted)
1293                    } else {
1294                        (true, false)
1295                    };
1296
1297                    match publication {
1298                        Ok(publication) => {
1299                            if canceled {
1300                                live_kit.room.unpublish_track(publication);
1301                            } else {
1302                                if muted {
1303                                    cx.executor().spawn(publication.set_mute(muted)).detach();
1304                                }
1305                                live_kit.microphone_track = LocalTrack::Published {
1306                                    track_publication: publication,
1307                                    muted,
1308                                };
1309                                cx.notify();
1310                            }
1311                            Ok(())
1312                        }
1313                        Err(error) => {
1314                            if canceled {
1315                                Ok(())
1316                            } else {
1317                                live_kit.microphone_track = LocalTrack::None;
1318                                cx.notify();
1319                                Err(error)
1320                            }
1321                        }
1322                    }
1323                })?
1324        })
1325    }
1326
1327    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1328        if self.status.is_offline() {
1329            return Task::ready(Err(anyhow!("room is offline")));
1330        } else if self.is_screen_sharing() {
1331            return Task::ready(Err(anyhow!("screen was already shared")));
1332        }
1333
1334        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1335            let publish_id = post_inc(&mut live_kit.next_publish_id);
1336            live_kit.screen_track = LocalTrack::Pending {
1337                publish_id,
1338                muted: false,
1339            };
1340            cx.notify();
1341            (live_kit.room.display_sources(), publish_id)
1342        } else {
1343            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1344        };
1345
1346        cx.spawn(move |this, mut cx| async move {
1347            let publish_track = async {
1348                let displays = displays.await?;
1349                let display = displays
1350                    .first()
1351                    .ok_or_else(|| anyhow!("no display found"))?;
1352                let track = LocalVideoTrack::screen_share_for_display(&display);
1353                this.upgrade()
1354                    .ok_or_else(|| anyhow!("room was dropped"))?
1355                    .update(&mut cx, |this, _| {
1356                        this.live_kit
1357                            .as_ref()
1358                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1359                    })?
1360                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1361                    .await
1362            };
1363
1364            let publication = publish_track.await;
1365            this.upgrade()
1366                .ok_or_else(|| anyhow!("room was dropped"))?
1367                .update(&mut cx, |this, cx| {
1368                    let live_kit = this
1369                        .live_kit
1370                        .as_mut()
1371                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1372
1373                    let (canceled, muted) = if let LocalTrack::Pending {
1374                        publish_id: cur_publish_id,
1375                        muted,
1376                    } = &live_kit.screen_track
1377                    {
1378                        (*cur_publish_id != publish_id, *muted)
1379                    } else {
1380                        (true, false)
1381                    };
1382
1383                    match publication {
1384                        Ok(publication) => {
1385                            if canceled {
1386                                live_kit.room.unpublish_track(publication);
1387                            } else {
1388                                if muted {
1389                                    cx.executor().spawn(publication.set_mute(muted)).detach();
1390                                }
1391                                live_kit.screen_track = LocalTrack::Published {
1392                                    track_publication: publication,
1393                                    muted,
1394                                };
1395                                cx.notify();
1396                            }
1397
1398                            Audio::play_sound(Sound::StartScreenshare, cx);
1399
1400                            Ok(())
1401                        }
1402                        Err(error) => {
1403                            if canceled {
1404                                Ok(())
1405                            } else {
1406                                live_kit.screen_track = LocalTrack::None;
1407                                cx.notify();
1408                                Err(error)
1409                            }
1410                        }
1411                    }
1412                })?
1413        })
1414    }
1415
1416    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1417        let should_mute = !self.is_muted(cx);
1418        if let Some(live_kit) = self.live_kit.as_mut() {
1419            if matches!(live_kit.microphone_track, LocalTrack::None) {
1420                return Ok(self.share_microphone(cx));
1421            }
1422
1423            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1424            live_kit.muted_by_user = should_mute;
1425
1426            if old_muted == true && live_kit.deafened == true {
1427                if let Some(task) = self.toggle_deafen(cx).ok() {
1428                    task.detach();
1429                }
1430            }
1431
1432            Ok(ret_task)
1433        } else {
1434            Err(anyhow!("LiveKit not started"))
1435        }
1436    }
1437
1438    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1439        if let Some(live_kit) = self.live_kit.as_mut() {
1440            (*live_kit).deafened = !live_kit.deafened;
1441
1442            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1443            // Context notification is sent within set_mute itself.
1444            let mut mute_task = None;
1445            // When deafening, mute user's mic as well.
1446            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1447            if live_kit.deafened || !live_kit.muted_by_user {
1448                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1449            };
1450            for participant in self.remote_participants.values() {
1451                for track in live_kit
1452                    .room
1453                    .remote_audio_track_publications(&participant.user.id.to_string())
1454                {
1455                    let deafened = live_kit.deafened;
1456                    tasks.push(
1457                        cx.executor()
1458                            .spawn_on_main(move || track.set_enabled(!deafened)),
1459                    );
1460                }
1461            }
1462
1463            Ok(cx.executor().spawn_on_main(|| async {
1464                if let Some(mute_task) = mute_task {
1465                    mute_task.await?;
1466                }
1467                for task in tasks {
1468                    task.await?;
1469                }
1470                Ok(())
1471            }))
1472        } else {
1473            Err(anyhow!("LiveKit not started"))
1474        }
1475    }
1476
1477    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1478        if self.status.is_offline() {
1479            return Err(anyhow!("room is offline"));
1480        }
1481
1482        let live_kit = self
1483            .live_kit
1484            .as_mut()
1485            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1486        match mem::take(&mut live_kit.screen_track) {
1487            LocalTrack::None => Err(anyhow!("screen was not shared")),
1488            LocalTrack::Pending { .. } => {
1489                cx.notify();
1490                Ok(())
1491            }
1492            LocalTrack::Published {
1493                track_publication, ..
1494            } => {
1495                live_kit.room.unpublish_track(track_publication);
1496                cx.notify();
1497
1498                Audio::play_sound(Sound::StopScreenshare, cx);
1499                Ok(())
1500            }
1501        }
1502    }
1503
1504    #[cfg(any(test, feature = "test-support"))]
1505    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1506        self.live_kit
1507            .as_ref()
1508            .unwrap()
1509            .room
1510            .set_display_sources(sources);
1511    }
1512}
1513
1514struct LiveKitRoom {
1515    room: Arc<live_kit_client::Room>,
1516    screen_track: LocalTrack,
1517    microphone_track: LocalTrack,
1518    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1519    muted_by_user: bool,
1520    deafened: bool,
1521    speaking: bool,
1522    next_publish_id: usize,
1523    _maintain_room: Task<()>,
1524    _maintain_tracks: [Task<()>; 2],
1525}
1526
1527impl LiveKitRoom {
1528    fn set_mute(
1529        self: &mut LiveKitRoom,
1530        should_mute: bool,
1531        cx: &mut ModelContext<Room>,
1532    ) -> Result<(Task<Result<()>>, bool)> {
1533        if !should_mute {
1534            // clear user muting state.
1535            self.muted_by_user = false;
1536        }
1537
1538        let (result, old_muted) = match &mut self.microphone_track {
1539            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1540            LocalTrack::Pending { muted, .. } => {
1541                let old_muted = *muted;
1542                *muted = should_mute;
1543                cx.notify();
1544                Ok((Task::Ready(Some(Ok(()))), old_muted))
1545            }
1546            LocalTrack::Published {
1547                track_publication,
1548                muted,
1549            } => {
1550                let old_muted = *muted;
1551                *muted = should_mute;
1552                cx.notify();
1553                Ok((
1554                    cx.executor().spawn(track_publication.set_mute(*muted)),
1555                    old_muted,
1556                ))
1557            }
1558        }?;
1559
1560        if old_muted != should_mute {
1561            if should_mute {
1562                Audio::play_sound(Sound::Mute, cx);
1563            } else {
1564                Audio::play_sound(Sound::Unmute, cx);
1565            }
1566        }
1567
1568        Ok((result, old_muted))
1569    }
1570}
1571
1572enum LocalTrack {
1573    None,
1574    Pending {
1575        publish_id: usize,
1576        muted: bool,
1577    },
1578    Published {
1579        track_publication: LocalTrackPublication,
1580        muted: bool,
1581    },
1582}
1583
1584impl Default for LocalTrack {
1585    fn default() -> Self {
1586        Self::None
1587    }
1588}
1589
1590#[derive(Copy, Clone, PartialEq, Eq)]
1591pub enum RoomStatus {
1592    Online,
1593    Rejoining,
1594    Offline,
1595}
1596
1597impl RoomStatus {
1598    pub fn is_offline(&self) -> bool {
1599        matches!(self, RoomStatus::Offline)
1600    }
1601
1602    pub fn is_online(&self) -> bool {
1603        matches!(self, RoomStatus::Online)
1604    }
1605}