room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{
  19    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  20    RemoteVideoTrackUpdate,
  21};
  22use postage::{sink::Sink, stream::Stream, watch};
  23use project::Project;
  24use settings::Settings;
  25use std::{future::Future, mem, sync::Arc, time::Duration};
  26use util::{post_inc, ResultExt, TryFutureExt};
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30#[derive(Clone, Debug, PartialEq, Eq)]
  31pub enum Event {
  32    ParticipantLocationChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteVideoTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteAudioTracksChanged {
  39        participant_id: proto::PeerId,
  40    },
  41    RemoteProjectShared {
  42        owner: Arc<User>,
  43        project_id: u64,
  44        worktree_root_names: Vec<String>,
  45    },
  46    RemoteProjectUnshared {
  47        project_id: u64,
  48    },
  49    RemoteProjectJoined {
  50        project_id: u64,
  51    },
  52    RemoteProjectInvitationDiscarded {
  53        project_id: u64,
  54    },
  55    Left,
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<u64>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakModel<Project>>,
  64    joined_projects: HashSet<WeakModel<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Model<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<u64> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                live_kit_client::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<u64>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Model<UserStore>,
 111        cx: &mut ModelContext<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = live_kit_client::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(|this, mut cx| async move {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == live_kit_client::ConnectionState::Disconnected {
 127                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 128                            .ok();
 129                        break;
 130                    }
 131                }
 132            });
 133
 134            let _maintain_video_tracks = cx.spawn({
 135                let room = room.clone();
 136                move |this, mut cx| async move {
 137                    let mut track_video_changes = room.remote_video_track_updates();
 138                    while let Some(track_change) = track_video_changes.next().await {
 139                        let this = if let Some(this) = this.upgrade() {
 140                            this
 141                        } else {
 142                            break;
 143                        };
 144
 145                        this.update(&mut cx, |this, cx| {
 146                            this.remote_video_track_updated(track_change, cx).log_err()
 147                        })
 148                        .ok();
 149                    }
 150                }
 151            });
 152
 153            let _maintain_audio_tracks = cx.spawn({
 154                let room = room.clone();
 155                |this, mut cx| async move {
 156                    let mut track_audio_changes = room.remote_audio_track_updates();
 157                    while let Some(track_change) = track_audio_changes.next().await {
 158                        let this = if let Some(this) = this.upgrade() {
 159                            this
 160                        } else {
 161                            break;
 162                        };
 163
 164                        this.update(&mut cx, |this, cx| {
 165                            this.remote_audio_track_updated(track_change, cx).log_err()
 166                        })
 167                        .ok();
 168                    }
 169                }
 170            });
 171
 172            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 173            cx.spawn(|this, mut cx| async move {
 174                connect.await?;
 175
 176                if !cx.update(|cx| Self::mute_on_join(cx))? {
 177                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 178                        .await?;
 179                }
 180
 181                anyhow::Ok(())
 182            })
 183            .detach_and_log_err(cx);
 184
 185            Some(LiveKitRoom {
 186                room,
 187                screen_track: LocalTrack::None,
 188                microphone_track: LocalTrack::None,
 189                next_publish_id: 0,
 190                muted_by_user: false,
 191                deafened: false,
 192                speaking: false,
 193                _maintain_room,
 194                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 195            })
 196        } else {
 197            None
 198        };
 199
 200        let maintain_connection = cx.spawn({
 201            let client = client.clone();
 202            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 203        });
 204
 205        Audio::play_sound(Sound::Joined, cx);
 206
 207        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 208
 209        Self {
 210            id,
 211            channel_id,
 212            live_kit: live_kit_room,
 213            status: RoomStatus::Online,
 214            shared_projects: Default::default(),
 215            joined_projects: Default::default(),
 216            participant_user_ids: Default::default(),
 217            local_participant: Default::default(),
 218            remote_participants: Default::default(),
 219            pending_participants: Default::default(),
 220            pending_call_count: 0,
 221            client_subscriptions: vec![
 222                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 223            ],
 224            _subscriptions: vec![
 225                cx.on_release(Self::released),
 226                cx.on_app_quit(Self::app_will_quit),
 227            ],
 228            leave_when_empty: false,
 229            pending_room_update: None,
 230            client,
 231            user_store,
 232            follows_by_leader_id_project_id: Default::default(),
 233            maintain_connection: Some(maintain_connection),
 234            room_update_completed_tx,
 235            room_update_completed_rx,
 236        }
 237    }
 238
 239    pub(crate) fn create(
 240        called_user_id: u64,
 241        initial_project: Option<Model<Project>>,
 242        client: Arc<Client>,
 243        user_store: Model<UserStore>,
 244        cx: &mut AppContext,
 245    ) -> Task<Result<Model<Self>>> {
 246        cx.spawn(move |mut cx| async move {
 247            let response = client.request(proto::CreateRoom {}).await?;
 248            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 249            let room = cx.build_model(|cx| {
 250                Self::new(
 251                    room_proto.id,
 252                    None,
 253                    response.live_kit_connection_info,
 254                    client,
 255                    user_store,
 256                    cx,
 257                )
 258            })?;
 259
 260            let initial_project_id = if let Some(initial_project) = initial_project {
 261                let initial_project_id = room
 262                    .update(&mut cx, |room, cx| {
 263                        room.share_project(initial_project.clone(), cx)
 264                    })?
 265                    .await?;
 266                Some(initial_project_id)
 267            } else {
 268                None
 269            };
 270
 271            match room
 272                .update(&mut cx, |room, cx| {
 273                    room.leave_when_empty = true;
 274                    room.call(called_user_id, initial_project_id, cx)
 275                })?
 276                .await
 277            {
 278                Ok(()) => Ok(room),
 279                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 280            }
 281        })
 282    }
 283
 284    pub(crate) async fn join_channel(
 285        channel_id: u64,
 286        client: Arc<Client>,
 287        user_store: Model<UserStore>,
 288        cx: AsyncAppContext,
 289    ) -> Result<Model<Self>> {
 290        Self::from_join_response(
 291            client.request(proto::JoinChannel { channel_id }).await?,
 292            client,
 293            user_store,
 294            cx,
 295        )
 296    }
 297
 298    pub(crate) async fn join(
 299        room_id: u64,
 300        client: Arc<Client>,
 301        user_store: Model<UserStore>,
 302        cx: AsyncAppContext,
 303    ) -> Result<Model<Self>> {
 304        Self::from_join_response(
 305            client.request(proto::JoinRoom { id: room_id }).await?,
 306            client,
 307            user_store,
 308            cx,
 309        )
 310    }
 311
 312    fn released(&mut self, cx: &mut AppContext) {
 313        if self.status.is_online() {
 314            self.leave_internal(cx).detach_and_log_err(cx);
 315        }
 316    }
 317
 318    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 319        let task = if self.status.is_online() {
 320            let leave = self.leave_internal(cx);
 321            Some(cx.background_executor().spawn(async move {
 322                leave.await.log_err();
 323            }))
 324        } else {
 325            None
 326        };
 327
 328        async move {
 329            if let Some(task) = task {
 330                task.await;
 331            }
 332        }
 333    }
 334
 335    pub fn mute_on_join(cx: &AppContext) -> bool {
 336        false
 337        //CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 338    }
 339
 340    fn from_join_response(
 341        response: proto::JoinRoomResponse,
 342        client: Arc<Client>,
 343        user_store: Model<UserStore>,
 344        mut cx: AsyncAppContext,
 345    ) -> Result<Model<Self>> {
 346        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 347        let room = cx.build_model(|cx| {
 348            Self::new(
 349                room_proto.id,
 350                response.channel_id,
 351                response.live_kit_connection_info,
 352                client,
 353                user_store,
 354                cx,
 355            )
 356        })?;
 357        room.update(&mut cx, |room, cx| {
 358            room.leave_when_empty = room.channel_id.is_none();
 359            room.apply_room_update(room_proto, cx)?;
 360            anyhow::Ok(())
 361        })??;
 362        Ok(room)
 363    }
 364
 365    fn should_leave(&self) -> bool {
 366        self.leave_when_empty
 367            && self.pending_room_update.is_none()
 368            && self.pending_participants.is_empty()
 369            && self.remote_participants.is_empty()
 370            && self.pending_call_count == 0
 371    }
 372
 373    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 374        cx.notify();
 375        cx.emit(Event::Left);
 376        self.leave_internal(cx)
 377    }
 378
 379    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 380        if self.status.is_offline() {
 381            return Task::ready(Err(anyhow!("room is offline")));
 382        }
 383
 384        log::info!("leaving room");
 385        Audio::play_sound(Sound::Leave, cx);
 386
 387        self.clear_state(cx);
 388
 389        let leave_room = self.client.request(proto::LeaveRoom {});
 390        cx.background_executor().spawn(async move {
 391            leave_room.await?;
 392            anyhow::Ok(())
 393        })
 394    }
 395
 396    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 397        for project in self.shared_projects.drain() {
 398            if let Some(project) = project.upgrade() {
 399                project.update(cx, |project, cx| {
 400                    project.unshare(cx).log_err();
 401                });
 402            }
 403        }
 404        for project in self.joined_projects.drain() {
 405            if let Some(project) = project.upgrade() {
 406                project.update(cx, |project, cx| {
 407                    project.disconnected_from_host(cx);
 408                    project.close(cx);
 409                });
 410            }
 411        }
 412
 413        self.status = RoomStatus::Offline;
 414        self.remote_participants.clear();
 415        self.pending_participants.clear();
 416        self.participant_user_ids.clear();
 417        self.client_subscriptions.clear();
 418        self.live_kit.take();
 419        self.pending_room_update.take();
 420        self.maintain_connection.take();
 421    }
 422
 423    async fn maintain_connection(
 424        this: WeakModel<Self>,
 425        client: Arc<Client>,
 426        mut cx: AsyncAppContext,
 427    ) -> Result<()> {
 428        let mut client_status = client.status();
 429        loop {
 430            let _ = client_status.try_recv();
 431            let is_connected = client_status.borrow().is_connected();
 432            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 433            if !is_connected || client_status.next().await.is_some() {
 434                log::info!("detected client disconnection");
 435
 436                this.upgrade()
 437                    .ok_or_else(|| anyhow!("room was dropped"))?
 438                    .update(&mut cx, |this, cx| {
 439                        this.status = RoomStatus::Rejoining;
 440                        cx.notify();
 441                    })?;
 442
 443                // Wait for client to re-establish a connection to the server.
 444                {
 445                    let mut reconnection_timeout =
 446                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 447                    let client_reconnection = async {
 448                        let mut remaining_attempts = 3;
 449                        while remaining_attempts > 0 {
 450                            if client_status.borrow().is_connected() {
 451                                log::info!("client reconnected, attempting to rejoin room");
 452
 453                                let Some(this) = this.upgrade() else { break };
 454                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 455                                    Ok(task) => {
 456                                        if task.await.log_err().is_some() {
 457                                            return true;
 458                                        } else {
 459                                            remaining_attempts -= 1;
 460                                        }
 461                                    }
 462                                    Err(_app_dropped) => return false,
 463                                }
 464                            } else if client_status.borrow().is_signed_out() {
 465                                return false;
 466                            }
 467
 468                            log::info!(
 469                                "waiting for client status change, remaining attempts {}",
 470                                remaining_attempts
 471                            );
 472                            client_status.next().await;
 473                        }
 474                        false
 475                    }
 476                    .fuse();
 477                    futures::pin_mut!(client_reconnection);
 478
 479                    futures::select_biased! {
 480                        reconnected = client_reconnection => {
 481                            if reconnected {
 482                                log::info!("successfully reconnected to room");
 483                                // If we successfully joined the room, go back around the loop
 484                                // waiting for future connection status changes.
 485                                continue;
 486                            }
 487                        }
 488                        _ = reconnection_timeout => {
 489                            log::info!("room reconnection timeout expired");
 490                        }
 491                    }
 492                }
 493
 494                break;
 495            }
 496        }
 497
 498        // The client failed to re-establish a connection to the server
 499        // or an error occurred while trying to re-join the room. Either way
 500        // we leave the room and return an error.
 501        if let Some(this) = this.upgrade() {
 502            log::info!("reconnection failed, leaving room");
 503            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 504        }
 505        Err(anyhow!(
 506            "can't reconnect to room: client failed to re-establish connection"
 507        ))
 508    }
 509
 510    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 511        let mut projects = HashMap::default();
 512        let mut reshared_projects = Vec::new();
 513        let mut rejoined_projects = Vec::new();
 514        self.shared_projects.retain(|project| {
 515            if let Some(handle) = project.upgrade() {
 516                let project = handle.read(cx);
 517                if let Some(project_id) = project.remote_id() {
 518                    projects.insert(project_id, handle.clone());
 519                    reshared_projects.push(proto::UpdateProject {
 520                        project_id,
 521                        worktrees: project.worktree_metadata_protos(cx),
 522                    });
 523                    return true;
 524                }
 525            }
 526            false
 527        });
 528        self.joined_projects.retain(|project| {
 529            if let Some(handle) = project.upgrade() {
 530                let project = handle.read(cx);
 531                if let Some(project_id) = project.remote_id() {
 532                    projects.insert(project_id, handle.clone());
 533                    rejoined_projects.push(proto::RejoinProject {
 534                        id: project_id,
 535                        worktrees: project
 536                            .worktrees()
 537                            .map(|worktree| {
 538                                let worktree = worktree.read(cx);
 539                                proto::RejoinWorktree {
 540                                    id: worktree.id().to_proto(),
 541                                    scan_id: worktree.completed_scan_id() as u64,
 542                                }
 543                            })
 544                            .collect(),
 545                    });
 546                }
 547                return true;
 548            }
 549            false
 550        });
 551
 552        let response = self.client.request_envelope(proto::RejoinRoom {
 553            id: self.id,
 554            reshared_projects,
 555            rejoined_projects,
 556        });
 557
 558        cx.spawn(|this, mut cx| async move {
 559            let response = response.await?;
 560            let message_id = response.message_id;
 561            let response = response.payload;
 562            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 563            this.update(&mut cx, |this, cx| {
 564                this.status = RoomStatus::Online;
 565                this.apply_room_update(room_proto, cx)?;
 566
 567                for reshared_project in response.reshared_projects {
 568                    if let Some(project) = projects.get(&reshared_project.id) {
 569                        project.update(cx, |project, cx| {
 570                            project.reshared(reshared_project, cx).log_err();
 571                        });
 572                    }
 573                }
 574
 575                for rejoined_project in response.rejoined_projects {
 576                    if let Some(project) = projects.get(&rejoined_project.id) {
 577                        project.update(cx, |project, cx| {
 578                            project.rejoined(rejoined_project, message_id, cx).log_err();
 579                        });
 580                    }
 581                }
 582
 583                anyhow::Ok(())
 584            })?
 585        })
 586    }
 587
 588    pub fn id(&self) -> u64 {
 589        self.id
 590    }
 591
 592    pub fn status(&self) -> RoomStatus {
 593        self.status
 594    }
 595
 596    pub fn local_participant(&self) -> &LocalParticipant {
 597        &self.local_participant
 598    }
 599
 600    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 601        &self.remote_participants
 602    }
 603
 604    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 605        self.remote_participants
 606            .values()
 607            .find(|p| p.peer_id == peer_id)
 608    }
 609
 610    pub fn pending_participants(&self) -> &[Arc<User>] {
 611        &self.pending_participants
 612    }
 613
 614    pub fn contains_participant(&self, user_id: u64) -> bool {
 615        self.participant_user_ids.contains(&user_id)
 616    }
 617
 618    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 619        self.follows_by_leader_id_project_id
 620            .get(&(leader_id, project_id))
 621            .map_or(&[], |v| v.as_slice())
 622    }
 623
 624    /// Returns the most 'active' projects, defined as most people in the project
 625    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 626        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 627        for participant in self.remote_participants.values() {
 628            match participant.location {
 629                ParticipantLocation::SharedProject { project_id } => {
 630                    project_hosts_and_guest_counts
 631                        .entry(project_id)
 632                        .or_default()
 633                        .1 += 1;
 634                }
 635                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 636            }
 637            for project in &participant.projects {
 638                project_hosts_and_guest_counts
 639                    .entry(project.id)
 640                    .or_default()
 641                    .0 = Some(participant.user.id);
 642            }
 643        }
 644
 645        if let Some(user) = self.user_store.read(cx).current_user() {
 646            for project in &self.local_participant.projects {
 647                project_hosts_and_guest_counts
 648                    .entry(project.id)
 649                    .or_default()
 650                    .0 = Some(user.id);
 651            }
 652        }
 653
 654        project_hosts_and_guest_counts
 655            .into_iter()
 656            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 657            .max_by_key(|(_, _, guest_count)| *guest_count)
 658            .map(|(id, host, _)| (id, host))
 659    }
 660
 661    async fn handle_room_updated(
 662        this: Model<Self>,
 663        envelope: TypedEnvelope<proto::RoomUpdated>,
 664        _: Arc<Client>,
 665        mut cx: AsyncAppContext,
 666    ) -> Result<()> {
 667        let room = envelope
 668            .payload
 669            .room
 670            .ok_or_else(|| anyhow!("invalid room"))?;
 671        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 672    }
 673
 674    fn apply_room_update(
 675        &mut self,
 676        mut room: proto::Room,
 677        cx: &mut ModelContext<Self>,
 678    ) -> Result<()> {
 679        // Filter ourselves out from the room's participants.
 680        let local_participant_ix = room
 681            .participants
 682            .iter()
 683            .position(|participant| Some(participant.user_id) == self.client.user_id());
 684        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 685
 686        let pending_participant_user_ids = room
 687            .pending_participants
 688            .iter()
 689            .map(|p| p.user_id)
 690            .collect::<Vec<_>>();
 691
 692        let remote_participant_user_ids = room
 693            .participants
 694            .iter()
 695            .map(|p| p.user_id)
 696            .collect::<Vec<_>>();
 697
 698        let (remote_participants, pending_participants) =
 699            self.user_store.update(cx, move |user_store, cx| {
 700                (
 701                    user_store.get_users(remote_participant_user_ids, cx),
 702                    user_store.get_users(pending_participant_user_ids, cx),
 703                )
 704            });
 705
 706        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 707            let (remote_participants, pending_participants) =
 708                futures::join!(remote_participants, pending_participants);
 709
 710            this.update(&mut cx, |this, cx| {
 711                this.participant_user_ids.clear();
 712
 713                if let Some(participant) = local_participant {
 714                    this.local_participant.projects = participant.projects;
 715                } else {
 716                    this.local_participant.projects.clear();
 717                }
 718
 719                if let Some(participants) = remote_participants.log_err() {
 720                    for (participant, user) in room.participants.into_iter().zip(participants) {
 721                        let Some(peer_id) = participant.peer_id else {
 722                            continue;
 723                        };
 724                        let participant_index = ParticipantIndex(participant.participant_index);
 725                        this.participant_user_ids.insert(participant.user_id);
 726
 727                        let old_projects = this
 728                            .remote_participants
 729                            .get(&participant.user_id)
 730                            .into_iter()
 731                            .flat_map(|existing| &existing.projects)
 732                            .map(|project| project.id)
 733                            .collect::<HashSet<_>>();
 734                        let new_projects = participant
 735                            .projects
 736                            .iter()
 737                            .map(|project| project.id)
 738                            .collect::<HashSet<_>>();
 739
 740                        for project in &participant.projects {
 741                            if !old_projects.contains(&project.id) {
 742                                cx.emit(Event::RemoteProjectShared {
 743                                    owner: user.clone(),
 744                                    project_id: project.id,
 745                                    worktree_root_names: project.worktree_root_names.clone(),
 746                                });
 747                            }
 748                        }
 749
 750                        for unshared_project_id in old_projects.difference(&new_projects) {
 751                            this.joined_projects.retain(|project| {
 752                                if let Some(project) = project.upgrade() {
 753                                    project.update(cx, |project, cx| {
 754                                        if project.remote_id() == Some(*unshared_project_id) {
 755                                            project.disconnected_from_host(cx);
 756                                            false
 757                                        } else {
 758                                            true
 759                                        }
 760                                    })
 761                                } else {
 762                                    false
 763                                }
 764                            });
 765                            cx.emit(Event::RemoteProjectUnshared {
 766                                project_id: *unshared_project_id,
 767                            });
 768                        }
 769
 770                        let location = ParticipantLocation::from_proto(participant.location)
 771                            .unwrap_or(ParticipantLocation::External);
 772                        if let Some(remote_participant) =
 773                            this.remote_participants.get_mut(&participant.user_id)
 774                        {
 775                            remote_participant.peer_id = peer_id;
 776                            remote_participant.projects = participant.projects;
 777                            remote_participant.participant_index = participant_index;
 778                            if location != remote_participant.location {
 779                                remote_participant.location = location;
 780                                cx.emit(Event::ParticipantLocationChanged {
 781                                    participant_id: peer_id,
 782                                });
 783                            }
 784                        } else {
 785                            this.remote_participants.insert(
 786                                participant.user_id,
 787                                RemoteParticipant {
 788                                    user: user.clone(),
 789                                    participant_index,
 790                                    peer_id,
 791                                    projects: participant.projects,
 792                                    location,
 793                                    muted: true,
 794                                    speaking: false,
 795                                    video_tracks: Default::default(),
 796                                    audio_tracks: Default::default(),
 797                                },
 798                            );
 799
 800                            Audio::play_sound(Sound::Joined, cx);
 801
 802                            if let Some(live_kit) = this.live_kit.as_ref() {
 803                                let video_tracks =
 804                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 805                                let audio_tracks =
 806                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 807                                let publications = live_kit
 808                                    .room
 809                                    .remote_audio_track_publications(&user.id.to_string());
 810
 811                                for track in video_tracks {
 812                                    this.remote_video_track_updated(
 813                                        RemoteVideoTrackUpdate::Subscribed(track),
 814                                        cx,
 815                                    )
 816                                    .log_err();
 817                                }
 818
 819                                for (track, publication) in
 820                                    audio_tracks.iter().zip(publications.iter())
 821                                {
 822                                    this.remote_audio_track_updated(
 823                                        RemoteAudioTrackUpdate::Subscribed(
 824                                            track.clone(),
 825                                            publication.clone(),
 826                                        ),
 827                                        cx,
 828                                    )
 829                                    .log_err();
 830                                }
 831                            }
 832                        }
 833                    }
 834
 835                    this.remote_participants.retain(|user_id, participant| {
 836                        if this.participant_user_ids.contains(user_id) {
 837                            true
 838                        } else {
 839                            for project in &participant.projects {
 840                                cx.emit(Event::RemoteProjectUnshared {
 841                                    project_id: project.id,
 842                                });
 843                            }
 844                            false
 845                        }
 846                    });
 847                }
 848
 849                if let Some(pending_participants) = pending_participants.log_err() {
 850                    this.pending_participants = pending_participants;
 851                    for participant in &this.pending_participants {
 852                        this.participant_user_ids.insert(participant.id);
 853                    }
 854                }
 855
 856                this.follows_by_leader_id_project_id.clear();
 857                for follower in room.followers {
 858                    let project_id = follower.project_id;
 859                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 860                        (Some(leader), Some(follower)) => (leader, follower),
 861
 862                        _ => {
 863                            log::error!("Follower message {follower:?} missing some state");
 864                            continue;
 865                        }
 866                    };
 867
 868                    let list = this
 869                        .follows_by_leader_id_project_id
 870                        .entry((leader, project_id))
 871                        .or_insert(Vec::new());
 872                    if !list.contains(&follower) {
 873                        list.push(follower);
 874                    }
 875                }
 876
 877                this.pending_room_update.take();
 878                if this.should_leave() {
 879                    log::info!("room is empty, leaving");
 880                    let _ = this.leave(cx);
 881                }
 882
 883                this.user_store.update(cx, |user_store, cx| {
 884                    let participant_indices_by_user_id = this
 885                        .remote_participants
 886                        .iter()
 887                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 888                        .collect();
 889                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 890                });
 891
 892                this.check_invariants();
 893                this.room_update_completed_tx.try_send(Some(())).ok();
 894                cx.notify();
 895            })
 896            .ok();
 897        }));
 898
 899        cx.notify();
 900        Ok(())
 901    }
 902
 903    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 904        let mut done_rx = self.room_update_completed_rx.clone();
 905        async move {
 906            while let Some(result) = done_rx.next().await {
 907                if result.is_some() {
 908                    break;
 909                }
 910            }
 911        }
 912    }
 913
 914    fn remote_video_track_updated(
 915        &mut self,
 916        change: RemoteVideoTrackUpdate,
 917        cx: &mut ModelContext<Self>,
 918    ) -> Result<()> {
 919        match change {
 920            RemoteVideoTrackUpdate::Subscribed(track) => {
 921                let user_id = track.publisher_id().parse()?;
 922                let track_id = track.sid().to_string();
 923                let participant = self
 924                    .remote_participants
 925                    .get_mut(&user_id)
 926                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 927                participant.video_tracks.insert(track_id.clone(), track);
 928                cx.emit(Event::RemoteVideoTracksChanged {
 929                    participant_id: participant.peer_id,
 930                });
 931            }
 932            RemoteVideoTrackUpdate::Unsubscribed {
 933                publisher_id,
 934                track_id,
 935            } => {
 936                let user_id = publisher_id.parse()?;
 937                let participant = self
 938                    .remote_participants
 939                    .get_mut(&user_id)
 940                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 941                participant.video_tracks.remove(&track_id);
 942                cx.emit(Event::RemoteVideoTracksChanged {
 943                    participant_id: participant.peer_id,
 944                });
 945            }
 946        }
 947
 948        cx.notify();
 949        Ok(())
 950    }
 951
 952    fn remote_audio_track_updated(
 953        &mut self,
 954        change: RemoteAudioTrackUpdate,
 955        cx: &mut ModelContext<Self>,
 956    ) -> Result<()> {
 957        match change {
 958            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 959                let mut speaker_ids = speakers
 960                    .into_iter()
 961                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 962                    .collect::<Vec<u64>>();
 963                speaker_ids.sort_unstable();
 964                for (sid, participant) in &mut self.remote_participants {
 965                    if let Ok(_) = speaker_ids.binary_search(sid) {
 966                        participant.speaking = true;
 967                    } else {
 968                        participant.speaking = false;
 969                    }
 970                }
 971                if let Some(id) = self.client.user_id() {
 972                    if let Some(room) = &mut self.live_kit {
 973                        if let Ok(_) = speaker_ids.binary_search(&id) {
 974                            room.speaking = true;
 975                        } else {
 976                            room.speaking = false;
 977                        }
 978                    }
 979                }
 980                cx.notify();
 981            }
 982            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 983                let mut found = false;
 984                for participant in &mut self.remote_participants.values_mut() {
 985                    for track in participant.audio_tracks.values() {
 986                        if track.sid() == track_id {
 987                            found = true;
 988                            break;
 989                        }
 990                    }
 991                    if found {
 992                        participant.muted = muted;
 993                        break;
 994                    }
 995                }
 996
 997                cx.notify();
 998            }
 999            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1000                let user_id = track.publisher_id().parse()?;
1001                let track_id = track.sid().to_string();
1002                let participant = self
1003                    .remote_participants
1004                    .get_mut(&user_id)
1005                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1006                participant.audio_tracks.insert(track_id.clone(), track);
1007                participant.muted = publication.is_muted();
1008
1009                cx.emit(Event::RemoteAudioTracksChanged {
1010                    participant_id: participant.peer_id,
1011                });
1012            }
1013            RemoteAudioTrackUpdate::Unsubscribed {
1014                publisher_id,
1015                track_id,
1016            } => {
1017                let user_id = publisher_id.parse()?;
1018                let participant = self
1019                    .remote_participants
1020                    .get_mut(&user_id)
1021                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1022                participant.audio_tracks.remove(&track_id);
1023                cx.emit(Event::RemoteAudioTracksChanged {
1024                    participant_id: participant.peer_id,
1025                });
1026            }
1027        }
1028
1029        cx.notify();
1030        Ok(())
1031    }
1032
1033    fn check_invariants(&self) {
1034        #[cfg(any(test, feature = "test-support"))]
1035        {
1036            for participant in self.remote_participants.values() {
1037                assert!(self.participant_user_ids.contains(&participant.user.id));
1038                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1039            }
1040
1041            for participant in &self.pending_participants {
1042                assert!(self.participant_user_ids.contains(&participant.id));
1043                assert_ne!(participant.id, self.client.user_id().unwrap());
1044            }
1045
1046            assert_eq!(
1047                self.participant_user_ids.len(),
1048                self.remote_participants.len() + self.pending_participants.len()
1049            );
1050        }
1051    }
1052
1053    pub(crate) fn call(
1054        &mut self,
1055        called_user_id: u64,
1056        initial_project_id: Option<u64>,
1057        cx: &mut ModelContext<Self>,
1058    ) -> Task<Result<()>> {
1059        if self.status.is_offline() {
1060            return Task::ready(Err(anyhow!("room is offline")));
1061        }
1062
1063        cx.notify();
1064        let client = self.client.clone();
1065        let room_id = self.id;
1066        self.pending_call_count += 1;
1067        cx.spawn(move |this, mut cx| async move {
1068            let result = client
1069                .request(proto::Call {
1070                    room_id,
1071                    called_user_id,
1072                    initial_project_id,
1073                })
1074                .await;
1075            this.update(&mut cx, |this, cx| {
1076                this.pending_call_count -= 1;
1077                if this.should_leave() {
1078                    this.leave(cx).detach_and_log_err(cx);
1079                }
1080            })?;
1081            result?;
1082            Ok(())
1083        })
1084    }
1085
1086    pub fn join_project(
1087        &mut self,
1088        id: u64,
1089        language_registry: Arc<LanguageRegistry>,
1090        fs: Arc<dyn Fs>,
1091        cx: &mut ModelContext<Self>,
1092    ) -> Task<Result<Model<Project>>> {
1093        let client = self.client.clone();
1094        let user_store = self.user_store.clone();
1095        cx.emit(Event::RemoteProjectJoined { project_id: id });
1096        cx.spawn(move |this, mut cx| async move {
1097            let project =
1098                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1099
1100            this.update(&mut cx, |this, cx| {
1101                this.joined_projects.retain(|project| {
1102                    if let Some(project) = project.upgrade() {
1103                        !project.read(cx).is_read_only()
1104                    } else {
1105                        false
1106                    }
1107                });
1108                this.joined_projects.insert(project.downgrade());
1109            })?;
1110            Ok(project)
1111        })
1112    }
1113
1114    pub(crate) fn share_project(
1115        &mut self,
1116        project: Model<Project>,
1117        cx: &mut ModelContext<Self>,
1118    ) -> Task<Result<u64>> {
1119        if let Some(project_id) = project.read(cx).remote_id() {
1120            return Task::ready(Ok(project_id));
1121        }
1122
1123        let request = self.client.request(proto::ShareProject {
1124            room_id: self.id(),
1125            worktrees: project.read(cx).worktree_metadata_protos(cx),
1126        });
1127        cx.spawn(|this, mut cx| async move {
1128            let response = request.await?;
1129
1130            project.update(&mut cx, |project, cx| {
1131                project.shared(response.project_id, cx)
1132            })??;
1133
1134            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1135            this.update(&mut cx, |this, cx| {
1136                this.shared_projects.insert(project.downgrade());
1137                let active_project = this.local_participant.active_project.as_ref();
1138                if active_project.map_or(false, |location| *location == project) {
1139                    this.set_location(Some(&project), cx)
1140                } else {
1141                    Task::ready(Ok(()))
1142                }
1143            })?
1144            .await?;
1145
1146            Ok(response.project_id)
1147        })
1148    }
1149
1150    pub(crate) fn unshare_project(
1151        &mut self,
1152        project: Model<Project>,
1153        cx: &mut ModelContext<Self>,
1154    ) -> Result<()> {
1155        let project_id = match project.read(cx).remote_id() {
1156            Some(project_id) => project_id,
1157            None => return Ok(()),
1158        };
1159
1160        self.client.send(proto::UnshareProject { project_id })?;
1161        project.update(cx, |this, cx| this.unshare(cx))
1162    }
1163
1164    pub(crate) fn set_location(
1165        &mut self,
1166        project: Option<&Model<Project>>,
1167        cx: &mut ModelContext<Self>,
1168    ) -> Task<Result<()>> {
1169        if self.status.is_offline() {
1170            return Task::ready(Err(anyhow!("room is offline")));
1171        }
1172
1173        let client = self.client.clone();
1174        let room_id = self.id;
1175        let location = if let Some(project) = project {
1176            self.local_participant.active_project = Some(project.downgrade());
1177            if let Some(project_id) = project.read(cx).remote_id() {
1178                proto::participant_location::Variant::SharedProject(
1179                    proto::participant_location::SharedProject { id: project_id },
1180                )
1181            } else {
1182                proto::participant_location::Variant::UnsharedProject(
1183                    proto::participant_location::UnsharedProject {},
1184                )
1185            }
1186        } else {
1187            self.local_participant.active_project = None;
1188            proto::participant_location::Variant::External(proto::participant_location::External {})
1189        };
1190
1191        cx.notify();
1192        cx.background_executor().spawn(async move {
1193            client
1194                .request(proto::UpdateParticipantLocation {
1195                    room_id,
1196                    location: Some(proto::ParticipantLocation {
1197                        variant: Some(location),
1198                    }),
1199                })
1200                .await?;
1201            Ok(())
1202        })
1203    }
1204
1205    pub fn is_screen_sharing(&self) -> bool {
1206        self.live_kit.as_ref().map_or(false, |live_kit| {
1207            !matches!(live_kit.screen_track, LocalTrack::None)
1208        })
1209    }
1210
1211    pub fn is_sharing_mic(&self) -> bool {
1212        self.live_kit.as_ref().map_or(false, |live_kit| {
1213            !matches!(live_kit.microphone_track, LocalTrack::None)
1214        })
1215    }
1216
1217    pub fn is_muted(&self, cx: &AppContext) -> bool {
1218        self.live_kit
1219            .as_ref()
1220            .and_then(|live_kit| match &live_kit.microphone_track {
1221                LocalTrack::None => Some(Self::mute_on_join(cx)),
1222                LocalTrack::Pending { muted, .. } => Some(*muted),
1223                LocalTrack::Published { muted, .. } => Some(*muted),
1224            })
1225            .unwrap_or(false)
1226    }
1227
1228    pub fn is_speaking(&self) -> bool {
1229        self.live_kit
1230            .as_ref()
1231            .map_or(false, |live_kit| live_kit.speaking)
1232    }
1233
1234    pub fn is_deafened(&self) -> Option<bool> {
1235        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1236    }
1237
1238    #[track_caller]
1239    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1240        if self.status.is_offline() {
1241            return Task::ready(Err(anyhow!("room is offline")));
1242        } else if self.is_sharing_mic() {
1243            return Task::ready(Err(anyhow!("microphone was already shared")));
1244        }
1245
1246        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1247            let publish_id = post_inc(&mut live_kit.next_publish_id);
1248            live_kit.microphone_track = LocalTrack::Pending {
1249                publish_id,
1250                muted: false,
1251            };
1252            cx.notify();
1253            publish_id
1254        } else {
1255            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1256        };
1257
1258        cx.spawn(move |this, mut cx| async move {
1259            let publish_track = async {
1260                let track = LocalAudioTrack::create();
1261                this.upgrade()
1262                    .ok_or_else(|| anyhow!("room was dropped"))?
1263                    .update(&mut cx, |this, _| {
1264                        this.live_kit
1265                            .as_ref()
1266                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1267                    })?
1268                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1269                    .await
1270            };
1271
1272            let publication = publish_track.await;
1273            this.upgrade()
1274                .ok_or_else(|| anyhow!("room was dropped"))?
1275                .update(&mut cx, |this, cx| {
1276                    let live_kit = this
1277                        .live_kit
1278                        .as_mut()
1279                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1280
1281                    let (canceled, muted) = if let LocalTrack::Pending {
1282                        publish_id: cur_publish_id,
1283                        muted,
1284                    } = &live_kit.microphone_track
1285                    {
1286                        (*cur_publish_id != publish_id, *muted)
1287                    } else {
1288                        (true, false)
1289                    };
1290
1291                    match publication {
1292                        Ok(publication) => {
1293                            if canceled {
1294                                live_kit.room.unpublish_track(publication);
1295                            } else {
1296                                if muted {
1297                                    cx.background_executor()
1298                                        .spawn(publication.set_mute(muted))
1299                                        .detach();
1300                                }
1301                                live_kit.microphone_track = LocalTrack::Published {
1302                                    track_publication: publication,
1303                                    muted,
1304                                };
1305                                cx.notify();
1306                            }
1307                            Ok(())
1308                        }
1309                        Err(error) => {
1310                            if canceled {
1311                                Ok(())
1312                            } else {
1313                                live_kit.microphone_track = LocalTrack::None;
1314                                cx.notify();
1315                                Err(error)
1316                            }
1317                        }
1318                    }
1319                })?
1320        })
1321    }
1322
1323    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1324        if self.status.is_offline() {
1325            return Task::ready(Err(anyhow!("room is offline")));
1326        } else if self.is_screen_sharing() {
1327            return Task::ready(Err(anyhow!("screen was already shared")));
1328        }
1329
1330        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1331            let publish_id = post_inc(&mut live_kit.next_publish_id);
1332            live_kit.screen_track = LocalTrack::Pending {
1333                publish_id,
1334                muted: false,
1335            };
1336            cx.notify();
1337            (live_kit.room.display_sources(), publish_id)
1338        } else {
1339            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1340        };
1341
1342        cx.spawn(move |this, mut cx| async move {
1343            let publish_track = async {
1344                let displays = displays.await?;
1345                let display = displays
1346                    .first()
1347                    .ok_or_else(|| anyhow!("no display found"))?;
1348                let track = LocalVideoTrack::screen_share_for_display(&display);
1349                this.upgrade()
1350                    .ok_or_else(|| anyhow!("room was dropped"))?
1351                    .update(&mut cx, |this, _| {
1352                        this.live_kit
1353                            .as_ref()
1354                            .map(|live_kit| live_kit.room.publish_video_track(track))
1355                    })?
1356                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1357                    .await
1358            };
1359
1360            let publication = publish_track.await;
1361            this.upgrade()
1362                .ok_or_else(|| anyhow!("room was dropped"))?
1363                .update(&mut cx, |this, cx| {
1364                    let live_kit = this
1365                        .live_kit
1366                        .as_mut()
1367                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1368
1369                    let (canceled, muted) = if let LocalTrack::Pending {
1370                        publish_id: cur_publish_id,
1371                        muted,
1372                    } = &live_kit.screen_track
1373                    {
1374                        (*cur_publish_id != publish_id, *muted)
1375                    } else {
1376                        (true, false)
1377                    };
1378
1379                    match publication {
1380                        Ok(publication) => {
1381                            if canceled {
1382                                live_kit.room.unpublish_track(publication);
1383                            } else {
1384                                if muted {
1385                                    cx.background_executor()
1386                                        .spawn(publication.set_mute(muted))
1387                                        .detach();
1388                                }
1389                                live_kit.screen_track = LocalTrack::Published {
1390                                    track_publication: publication,
1391                                    muted,
1392                                };
1393                                cx.notify();
1394                            }
1395
1396                            Audio::play_sound(Sound::StartScreenshare, cx);
1397
1398                            Ok(())
1399                        }
1400                        Err(error) => {
1401                            if canceled {
1402                                Ok(())
1403                            } else {
1404                                live_kit.screen_track = LocalTrack::None;
1405                                cx.notify();
1406                                Err(error)
1407                            }
1408                        }
1409                    }
1410                })?
1411        })
1412    }
1413
1414    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1415        let should_mute = !self.is_muted(cx);
1416        if let Some(live_kit) = self.live_kit.as_mut() {
1417            if matches!(live_kit.microphone_track, LocalTrack::None) {
1418                return Ok(self.share_microphone(cx));
1419            }
1420
1421            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1422            live_kit.muted_by_user = should_mute;
1423
1424            if old_muted == true && live_kit.deafened == true {
1425                if let Some(task) = self.toggle_deafen(cx).ok() {
1426                    task.detach();
1427                }
1428            }
1429
1430            Ok(ret_task)
1431        } else {
1432            Err(anyhow!("LiveKit not started"))
1433        }
1434    }
1435
1436    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1437        if let Some(live_kit) = self.live_kit.as_mut() {
1438            (*live_kit).deafened = !live_kit.deafened;
1439
1440            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1441            // Context notification is sent within set_mute itself.
1442            let mut mute_task = None;
1443            // When deafening, mute user's mic as well.
1444            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1445            if live_kit.deafened || !live_kit.muted_by_user {
1446                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1447            };
1448            for participant in self.remote_participants.values() {
1449                for track in live_kit
1450                    .room
1451                    .remote_audio_track_publications(&participant.user.id.to_string())
1452                {
1453                    let deafened = live_kit.deafened;
1454                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1455                }
1456            }
1457
1458            Ok(cx.foreground_executor().spawn(async move {
1459                if let Some(mute_task) = mute_task {
1460                    mute_task.await?;
1461                }
1462                for task in tasks {
1463                    task.await?;
1464                }
1465                Ok(())
1466            }))
1467        } else {
1468            Err(anyhow!("LiveKit not started"))
1469        }
1470    }
1471
1472    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1473        if self.status.is_offline() {
1474            return Err(anyhow!("room is offline"));
1475        }
1476
1477        let live_kit = self
1478            .live_kit
1479            .as_mut()
1480            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1481        match mem::take(&mut live_kit.screen_track) {
1482            LocalTrack::None => Err(anyhow!("screen was not shared")),
1483            LocalTrack::Pending { .. } => {
1484                cx.notify();
1485                Ok(())
1486            }
1487            LocalTrack::Published {
1488                track_publication, ..
1489            } => {
1490                live_kit.room.unpublish_track(track_publication);
1491                cx.notify();
1492
1493                Audio::play_sound(Sound::StopScreenshare, cx);
1494                Ok(())
1495            }
1496        }
1497    }
1498
1499    #[cfg(any(test, feature = "test-support"))]
1500    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1501        self.live_kit
1502            .as_ref()
1503            .unwrap()
1504            .room
1505            .set_display_sources(sources);
1506    }
1507}
1508
1509struct LiveKitRoom {
1510    room: Arc<live_kit_client::Room>,
1511    screen_track: LocalTrack,
1512    microphone_track: LocalTrack,
1513    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1514    muted_by_user: bool,
1515    deafened: bool,
1516    speaking: bool,
1517    next_publish_id: usize,
1518    _maintain_room: Task<()>,
1519    _maintain_tracks: [Task<()>; 2],
1520}
1521
1522impl LiveKitRoom {
1523    fn set_mute(
1524        self: &mut LiveKitRoom,
1525        should_mute: bool,
1526        cx: &mut ModelContext<Room>,
1527    ) -> Result<(Task<Result<()>>, bool)> {
1528        if !should_mute {
1529            // clear user muting state.
1530            self.muted_by_user = false;
1531        }
1532
1533        let (result, old_muted) = match &mut self.microphone_track {
1534            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1535            LocalTrack::Pending { muted, .. } => {
1536                let old_muted = *muted;
1537                *muted = should_mute;
1538                cx.notify();
1539                Ok((Task::Ready(Some(Ok(()))), old_muted))
1540            }
1541            LocalTrack::Published {
1542                track_publication,
1543                muted,
1544            } => {
1545                let old_muted = *muted;
1546                *muted = should_mute;
1547                cx.notify();
1548                Ok((
1549                    cx.background_executor()
1550                        .spawn(track_publication.set_mute(*muted)),
1551                    old_muted,
1552                ))
1553            }
1554        }?;
1555
1556        if old_muted != should_mute {
1557            if should_mute {
1558                Audio::play_sound(Sound::Mute, cx);
1559            } else {
1560                Audio::play_sound(Sound::Unmute, cx);
1561            }
1562        }
1563
1564        Ok((result, old_muted))
1565    }
1566}
1567
1568enum LocalTrack {
1569    None,
1570    Pending {
1571        publish_id: usize,
1572        muted: bool,
1573    },
1574    Published {
1575        track_publication: LocalTrackPublication,
1576        muted: bool,
1577    },
1578}
1579
1580impl Default for LocalTrack {
1581    fn default() -> Self {
1582        Self::None
1583    }
1584}
1585
1586#[derive(Copy, Clone, PartialEq, Eq)]
1587pub enum RoomStatus {
1588    Online,
1589    Rejoining,
1590    Offline,
1591}
1592
1593impl RoomStatus {
1594    pub fn is_offline(&self) -> bool {
1595        matches!(self, RoomStatus::Offline)
1596    }
1597
1598    pub fn is_online(&self) -> bool {
1599        matches!(self, RoomStatus::Online)
1600    }
1601}