room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{
  19    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  20    RemoteVideoTrackUpdate,
  21};
  22use postage::{sink::Sink, stream::Stream, watch};
  23use project::Project;
  24use settings::Settings as _;
  25use std::{future::Future, mem, sync::Arc, time::Duration};
  26use util::{post_inc, ResultExt, TryFutureExt};
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30#[derive(Clone, Debug, PartialEq, Eq)]
  31pub enum Event {
  32    ParticipantLocationChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteVideoTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteAudioTracksChanged {
  39        participant_id: proto::PeerId,
  40    },
  41    RemoteProjectShared {
  42        owner: Arc<User>,
  43        project_id: u64,
  44        worktree_root_names: Vec<String>,
  45    },
  46    RemoteProjectUnshared {
  47        project_id: u64,
  48    },
  49    RemoteProjectJoined {
  50        project_id: u64,
  51    },
  52    RemoteProjectInvitationDiscarded {
  53        project_id: u64,
  54    },
  55    Left,
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<u64>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakModel<Project>>,
  64    joined_projects: HashSet<WeakModel<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Model<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter<Event> for Room {}
  83
  84impl Room {
  85    pub fn channel_id(&self) -> Option<u64> {
  86        self.channel_id
  87    }
  88
  89    pub fn is_sharing_project(&self) -> bool {
  90        !self.shared_projects.is_empty()
  91    }
  92
  93    #[cfg(any(test, feature = "test-support"))]
  94    pub fn is_connected(&self) -> bool {
  95        if let Some(live_kit) = self.live_kit.as_ref() {
  96            matches!(
  97                *live_kit.room.status().borrow(),
  98                live_kit_client::ConnectionState::Connected { .. }
  99            )
 100        } else {
 101            false
 102        }
 103    }
 104
 105    fn new(
 106        id: u64,
 107        channel_id: Option<u64>,
 108        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 109        client: Arc<Client>,
 110        user_store: Model<UserStore>,
 111        cx: &mut ModelContext<Self>,
 112    ) -> Self {
 113        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 114            let room = live_kit_client::Room::new();
 115            let mut status = room.status();
 116            // Consume the initial status of the room.
 117            let _ = status.try_recv();
 118            let _maintain_room = cx.spawn(|this, mut cx| async move {
 119                while let Some(status) = status.next().await {
 120                    let this = if let Some(this) = this.upgrade() {
 121                        this
 122                    } else {
 123                        break;
 124                    };
 125
 126                    if status == live_kit_client::ConnectionState::Disconnected {
 127                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 128                            .ok();
 129                        break;
 130                    }
 131                }
 132            });
 133
 134            let _maintain_video_tracks = cx.spawn({
 135                let room = room.clone();
 136                move |this, mut cx| async move {
 137                    let mut track_video_changes = room.remote_video_track_updates();
 138                    while let Some(track_change) = track_video_changes.next().await {
 139                        let this = if let Some(this) = this.upgrade() {
 140                            this
 141                        } else {
 142                            break;
 143                        };
 144
 145                        this.update(&mut cx, |this, cx| {
 146                            this.remote_video_track_updated(track_change, cx).log_err()
 147                        })
 148                        .ok();
 149                    }
 150                }
 151            });
 152
 153            let _maintain_audio_tracks = cx.spawn({
 154                let room = room.clone();
 155                |this, mut cx| async move {
 156                    let mut track_audio_changes = room.remote_audio_track_updates();
 157                    while let Some(track_change) = track_audio_changes.next().await {
 158                        let this = if let Some(this) = this.upgrade() {
 159                            this
 160                        } else {
 161                            break;
 162                        };
 163
 164                        this.update(&mut cx, |this, cx| {
 165                            this.remote_audio_track_updated(track_change, cx).log_err()
 166                        })
 167                        .ok();
 168                    }
 169                }
 170            });
 171
 172            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 173            cx.spawn(|this, mut cx| async move {
 174                connect.await?;
 175
 176                if !cx.update(|cx| Self::mute_on_join(cx))? {
 177                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 178                        .await?;
 179                }
 180
 181                anyhow::Ok(())
 182            })
 183            .detach_and_log_err(cx);
 184
 185            Some(LiveKitRoom {
 186                room,
 187                screen_track: LocalTrack::None,
 188                microphone_track: LocalTrack::None,
 189                next_publish_id: 0,
 190                muted_by_user: false,
 191                deafened: false,
 192                speaking: false,
 193                _maintain_room,
 194                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 195            })
 196        } else {
 197            None
 198        };
 199
 200        let maintain_connection = cx.spawn({
 201            let client = client.clone();
 202            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 203        });
 204
 205        Audio::play_sound(Sound::Joined, cx);
 206
 207        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 208
 209        Self {
 210            id,
 211            channel_id,
 212            live_kit: live_kit_room,
 213            status: RoomStatus::Online,
 214            shared_projects: Default::default(),
 215            joined_projects: Default::default(),
 216            participant_user_ids: Default::default(),
 217            local_participant: Default::default(),
 218            remote_participants: Default::default(),
 219            pending_participants: Default::default(),
 220            pending_call_count: 0,
 221            client_subscriptions: vec![
 222                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 223            ],
 224            _subscriptions: vec![
 225                cx.on_release(Self::released),
 226                cx.on_app_quit(Self::app_will_quit),
 227            ],
 228            leave_when_empty: false,
 229            pending_room_update: None,
 230            client,
 231            user_store,
 232            follows_by_leader_id_project_id: Default::default(),
 233            maintain_connection: Some(maintain_connection),
 234            room_update_completed_tx,
 235            room_update_completed_rx,
 236        }
 237    }
 238
 239    pub(crate) fn create(
 240        called_user_id: u64,
 241        initial_project: Option<Model<Project>>,
 242        client: Arc<Client>,
 243        user_store: Model<UserStore>,
 244        cx: &mut AppContext,
 245    ) -> Task<Result<Model<Self>>> {
 246        cx.spawn(move |mut cx| async move {
 247            let response = client.request(proto::CreateRoom {}).await?;
 248            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 249            let room = cx.build_model(|cx| {
 250                Self::new(
 251                    room_proto.id,
 252                    None,
 253                    response.live_kit_connection_info,
 254                    client,
 255                    user_store,
 256                    cx,
 257                )
 258            })?;
 259
 260            let initial_project_id = if let Some(initial_project) = initial_project {
 261                let initial_project_id = room
 262                    .update(&mut cx, |room, cx| {
 263                        room.share_project(initial_project.clone(), cx)
 264                    })?
 265                    .await?;
 266                Some(initial_project_id)
 267            } else {
 268                None
 269            };
 270
 271            match room
 272                .update(&mut cx, |room, cx| {
 273                    room.leave_when_empty = true;
 274                    room.call(called_user_id, initial_project_id, cx)
 275                })?
 276                .await
 277            {
 278                Ok(()) => Ok(room),
 279                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 280            }
 281        })
 282    }
 283
 284    pub(crate) async fn join_channel(
 285        channel_id: u64,
 286        client: Arc<Client>,
 287        user_store: Model<UserStore>,
 288        cx: AsyncAppContext,
 289    ) -> Result<Model<Self>> {
 290        Self::from_join_response(
 291            client.request(proto::JoinChannel { channel_id }).await?,
 292            client,
 293            user_store,
 294            cx,
 295        )
 296    }
 297
 298    pub(crate) async fn join(
 299        room_id: u64,
 300        client: Arc<Client>,
 301        user_store: Model<UserStore>,
 302        cx: AsyncAppContext,
 303    ) -> Result<Model<Self>> {
 304        Self::from_join_response(
 305            client.request(proto::JoinRoom { id: room_id }).await?,
 306            client,
 307            user_store,
 308            cx,
 309        )
 310    }
 311
 312    fn released(&mut self, cx: &mut AppContext) {
 313        if self.status.is_online() {
 314            self.leave_internal(cx).detach_and_log_err(cx);
 315        }
 316    }
 317
 318    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 319        let task = if self.status.is_online() {
 320            let leave = self.leave_internal(cx);
 321            Some(cx.background_executor().spawn(async move {
 322                leave.await.log_err();
 323            }))
 324        } else {
 325            None
 326        };
 327
 328        async move {
 329            if let Some(task) = task {
 330                task.await;
 331            }
 332        }
 333    }
 334
 335    pub fn mute_on_join(cx: &AppContext) -> bool {
 336        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 337    }
 338
 339    fn from_join_response(
 340        response: proto::JoinRoomResponse,
 341        client: Arc<Client>,
 342        user_store: Model<UserStore>,
 343        mut cx: AsyncAppContext,
 344    ) -> Result<Model<Self>> {
 345        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 346        let room = cx.build_model(|cx| {
 347            Self::new(
 348                room_proto.id,
 349                response.channel_id,
 350                response.live_kit_connection_info,
 351                client,
 352                user_store,
 353                cx,
 354            )
 355        })?;
 356        room.update(&mut cx, |room, cx| {
 357            room.leave_when_empty = room.channel_id.is_none();
 358            room.apply_room_update(room_proto, cx)?;
 359            anyhow::Ok(())
 360        })??;
 361        Ok(room)
 362    }
 363
 364    fn should_leave(&self) -> bool {
 365        self.leave_when_empty
 366            && self.pending_room_update.is_none()
 367            && self.pending_participants.is_empty()
 368            && self.remote_participants.is_empty()
 369            && self.pending_call_count == 0
 370    }
 371
 372    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 373        cx.notify();
 374        cx.emit(Event::Left);
 375        self.leave_internal(cx)
 376    }
 377
 378    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 379        if self.status.is_offline() {
 380            return Task::ready(Err(anyhow!("room is offline")));
 381        }
 382
 383        log::info!("leaving room");
 384        Audio::play_sound(Sound::Leave, cx);
 385
 386        self.clear_state(cx);
 387
 388        let leave_room = self.client.request(proto::LeaveRoom {});
 389        cx.background_executor().spawn(async move {
 390            leave_room.await?;
 391            anyhow::Ok(())
 392        })
 393    }
 394
 395    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 396        for project in self.shared_projects.drain() {
 397            if let Some(project) = project.upgrade() {
 398                project.update(cx, |project, cx| {
 399                    project.unshare(cx).log_err();
 400                });
 401            }
 402        }
 403        for project in self.joined_projects.drain() {
 404            if let Some(project) = project.upgrade() {
 405                project.update(cx, |project, cx| {
 406                    project.disconnected_from_host(cx);
 407                    project.close(cx);
 408                });
 409            }
 410        }
 411
 412        self.status = RoomStatus::Offline;
 413        self.remote_participants.clear();
 414        self.pending_participants.clear();
 415        self.participant_user_ids.clear();
 416        self.client_subscriptions.clear();
 417        self.live_kit.take();
 418        self.pending_room_update.take();
 419        self.maintain_connection.take();
 420    }
 421
 422    async fn maintain_connection(
 423        this: WeakModel<Self>,
 424        client: Arc<Client>,
 425        mut cx: AsyncAppContext,
 426    ) -> Result<()> {
 427        let mut client_status = client.status();
 428        loop {
 429            let _ = client_status.try_recv();
 430            let is_connected = client_status.borrow().is_connected();
 431            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 432            if !is_connected || client_status.next().await.is_some() {
 433                log::info!("detected client disconnection");
 434
 435                this.upgrade()
 436                    .ok_or_else(|| anyhow!("room was dropped"))?
 437                    .update(&mut cx, |this, cx| {
 438                        this.status = RoomStatus::Rejoining;
 439                        cx.notify();
 440                    })?;
 441
 442                // Wait for client to re-establish a connection to the server.
 443                {
 444                    let mut reconnection_timeout =
 445                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 446                    let client_reconnection = async {
 447                        let mut remaining_attempts = 3;
 448                        while remaining_attempts > 0 {
 449                            if client_status.borrow().is_connected() {
 450                                log::info!("client reconnected, attempting to rejoin room");
 451
 452                                let Some(this) = this.upgrade() else { break };
 453                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 454                                    Ok(task) => {
 455                                        if task.await.log_err().is_some() {
 456                                            return true;
 457                                        } else {
 458                                            remaining_attempts -= 1;
 459                                        }
 460                                    }
 461                                    Err(_app_dropped) => return false,
 462                                }
 463                            } else if client_status.borrow().is_signed_out() {
 464                                return false;
 465                            }
 466
 467                            log::info!(
 468                                "waiting for client status change, remaining attempts {}",
 469                                remaining_attempts
 470                            );
 471                            client_status.next().await;
 472                        }
 473                        false
 474                    }
 475                    .fuse();
 476                    futures::pin_mut!(client_reconnection);
 477
 478                    futures::select_biased! {
 479                        reconnected = client_reconnection => {
 480                            if reconnected {
 481                                log::info!("successfully reconnected to room");
 482                                // If we successfully joined the room, go back around the loop
 483                                // waiting for future connection status changes.
 484                                continue;
 485                            }
 486                        }
 487                        _ = reconnection_timeout => {
 488                            log::info!("room reconnection timeout expired");
 489                        }
 490                    }
 491                }
 492
 493                break;
 494            }
 495        }
 496
 497        // The client failed to re-establish a connection to the server
 498        // or an error occurred while trying to re-join the room. Either way
 499        // we leave the room and return an error.
 500        if let Some(this) = this.upgrade() {
 501            log::info!("reconnection failed, leaving room");
 502            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 503        }
 504        Err(anyhow!(
 505            "can't reconnect to room: client failed to re-establish connection"
 506        ))
 507    }
 508
 509    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 510        let mut projects = HashMap::default();
 511        let mut reshared_projects = Vec::new();
 512        let mut rejoined_projects = Vec::new();
 513        self.shared_projects.retain(|project| {
 514            if let Some(handle) = project.upgrade() {
 515                let project = handle.read(cx);
 516                if let Some(project_id) = project.remote_id() {
 517                    projects.insert(project_id, handle.clone());
 518                    reshared_projects.push(proto::UpdateProject {
 519                        project_id,
 520                        worktrees: project.worktree_metadata_protos(cx),
 521                    });
 522                    return true;
 523                }
 524            }
 525            false
 526        });
 527        self.joined_projects.retain(|project| {
 528            if let Some(handle) = project.upgrade() {
 529                let project = handle.read(cx);
 530                if let Some(project_id) = project.remote_id() {
 531                    projects.insert(project_id, handle.clone());
 532                    rejoined_projects.push(proto::RejoinProject {
 533                        id: project_id,
 534                        worktrees: project
 535                            .worktrees()
 536                            .map(|worktree| {
 537                                let worktree = worktree.read(cx);
 538                                proto::RejoinWorktree {
 539                                    id: worktree.id().to_proto(),
 540                                    scan_id: worktree.completed_scan_id() as u64,
 541                                }
 542                            })
 543                            .collect(),
 544                    });
 545                }
 546                return true;
 547            }
 548            false
 549        });
 550
 551        let response = self.client.request_envelope(proto::RejoinRoom {
 552            id: self.id,
 553            reshared_projects,
 554            rejoined_projects,
 555        });
 556
 557        cx.spawn(|this, mut cx| async move {
 558            let response = response.await?;
 559            let message_id = response.message_id;
 560            let response = response.payload;
 561            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 562            this.update(&mut cx, |this, cx| {
 563                this.status = RoomStatus::Online;
 564                this.apply_room_update(room_proto, cx)?;
 565
 566                for reshared_project in response.reshared_projects {
 567                    if let Some(project) = projects.get(&reshared_project.id) {
 568                        project.update(cx, |project, cx| {
 569                            project.reshared(reshared_project, cx).log_err();
 570                        });
 571                    }
 572                }
 573
 574                for rejoined_project in response.rejoined_projects {
 575                    if let Some(project) = projects.get(&rejoined_project.id) {
 576                        project.update(cx, |project, cx| {
 577                            project.rejoined(rejoined_project, message_id, cx).log_err();
 578                        });
 579                    }
 580                }
 581
 582                anyhow::Ok(())
 583            })?
 584        })
 585    }
 586
 587    pub fn id(&self) -> u64 {
 588        self.id
 589    }
 590
 591    pub fn status(&self) -> RoomStatus {
 592        self.status
 593    }
 594
 595    pub fn local_participant(&self) -> &LocalParticipant {
 596        &self.local_participant
 597    }
 598
 599    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 600        &self.remote_participants
 601    }
 602
 603    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 604        self.remote_participants
 605            .values()
 606            .find(|p| p.peer_id == peer_id)
 607    }
 608
 609    pub fn pending_participants(&self) -> &[Arc<User>] {
 610        &self.pending_participants
 611    }
 612
 613    pub fn contains_participant(&self, user_id: u64) -> bool {
 614        self.participant_user_ids.contains(&user_id)
 615    }
 616
 617    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 618        self.follows_by_leader_id_project_id
 619            .get(&(leader_id, project_id))
 620            .map_or(&[], |v| v.as_slice())
 621    }
 622
 623    /// Returns the most 'active' projects, defined as most people in the project
 624    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 625        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 626        for participant in self.remote_participants.values() {
 627            match participant.location {
 628                ParticipantLocation::SharedProject { project_id } => {
 629                    project_hosts_and_guest_counts
 630                        .entry(project_id)
 631                        .or_default()
 632                        .1 += 1;
 633                }
 634                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 635            }
 636            for project in &participant.projects {
 637                project_hosts_and_guest_counts
 638                    .entry(project.id)
 639                    .or_default()
 640                    .0 = Some(participant.user.id);
 641            }
 642        }
 643
 644        if let Some(user) = self.user_store.read(cx).current_user() {
 645            for project in &self.local_participant.projects {
 646                project_hosts_and_guest_counts
 647                    .entry(project.id)
 648                    .or_default()
 649                    .0 = Some(user.id);
 650            }
 651        }
 652
 653        project_hosts_and_guest_counts
 654            .into_iter()
 655            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 656            .max_by_key(|(_, _, guest_count)| *guest_count)
 657            .map(|(id, host, _)| (id, host))
 658    }
 659
 660    async fn handle_room_updated(
 661        this: Model<Self>,
 662        envelope: TypedEnvelope<proto::RoomUpdated>,
 663        _: Arc<Client>,
 664        mut cx: AsyncAppContext,
 665    ) -> Result<()> {
 666        let room = envelope
 667            .payload
 668            .room
 669            .ok_or_else(|| anyhow!("invalid room"))?;
 670        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 671    }
 672
 673    fn apply_room_update(
 674        &mut self,
 675        mut room: proto::Room,
 676        cx: &mut ModelContext<Self>,
 677    ) -> Result<()> {
 678        // Filter ourselves out from the room's participants.
 679        let local_participant_ix = room
 680            .participants
 681            .iter()
 682            .position(|participant| Some(participant.user_id) == self.client.user_id());
 683        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 684
 685        let pending_participant_user_ids = room
 686            .pending_participants
 687            .iter()
 688            .map(|p| p.user_id)
 689            .collect::<Vec<_>>();
 690
 691        let remote_participant_user_ids = room
 692            .participants
 693            .iter()
 694            .map(|p| p.user_id)
 695            .collect::<Vec<_>>();
 696
 697        let (remote_participants, pending_participants) =
 698            self.user_store.update(cx, move |user_store, cx| {
 699                (
 700                    user_store.get_users(remote_participant_user_ids, cx),
 701                    user_store.get_users(pending_participant_user_ids, cx),
 702                )
 703            });
 704
 705        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 706            let (remote_participants, pending_participants) =
 707                futures::join!(remote_participants, pending_participants);
 708
 709            this.update(&mut cx, |this, cx| {
 710                this.participant_user_ids.clear();
 711
 712                if let Some(participant) = local_participant {
 713                    this.local_participant.projects = participant.projects;
 714                } else {
 715                    this.local_participant.projects.clear();
 716                }
 717
 718                if let Some(participants) = remote_participants.log_err() {
 719                    for (participant, user) in room.participants.into_iter().zip(participants) {
 720                        let Some(peer_id) = participant.peer_id else {
 721                            continue;
 722                        };
 723                        let participant_index = ParticipantIndex(participant.participant_index);
 724                        this.participant_user_ids.insert(participant.user_id);
 725
 726                        let old_projects = this
 727                            .remote_participants
 728                            .get(&participant.user_id)
 729                            .into_iter()
 730                            .flat_map(|existing| &existing.projects)
 731                            .map(|project| project.id)
 732                            .collect::<HashSet<_>>();
 733                        let new_projects = participant
 734                            .projects
 735                            .iter()
 736                            .map(|project| project.id)
 737                            .collect::<HashSet<_>>();
 738
 739                        for project in &participant.projects {
 740                            if !old_projects.contains(&project.id) {
 741                                cx.emit(Event::RemoteProjectShared {
 742                                    owner: user.clone(),
 743                                    project_id: project.id,
 744                                    worktree_root_names: project.worktree_root_names.clone(),
 745                                });
 746                            }
 747                        }
 748
 749                        for unshared_project_id in old_projects.difference(&new_projects) {
 750                            this.joined_projects.retain(|project| {
 751                                if let Some(project) = project.upgrade() {
 752                                    project.update(cx, |project, cx| {
 753                                        if project.remote_id() == Some(*unshared_project_id) {
 754                                            project.disconnected_from_host(cx);
 755                                            false
 756                                        } else {
 757                                            true
 758                                        }
 759                                    })
 760                                } else {
 761                                    false
 762                                }
 763                            });
 764                            cx.emit(Event::RemoteProjectUnshared {
 765                                project_id: *unshared_project_id,
 766                            });
 767                        }
 768
 769                        let location = ParticipantLocation::from_proto(participant.location)
 770                            .unwrap_or(ParticipantLocation::External);
 771                        if let Some(remote_participant) =
 772                            this.remote_participants.get_mut(&participant.user_id)
 773                        {
 774                            remote_participant.peer_id = peer_id;
 775                            remote_participant.projects = participant.projects;
 776                            remote_participant.participant_index = participant_index;
 777                            if location != remote_participant.location {
 778                                remote_participant.location = location;
 779                                cx.emit(Event::ParticipantLocationChanged {
 780                                    participant_id: peer_id,
 781                                });
 782                            }
 783                        } else {
 784                            this.remote_participants.insert(
 785                                participant.user_id,
 786                                RemoteParticipant {
 787                                    user: user.clone(),
 788                                    participant_index,
 789                                    peer_id,
 790                                    projects: participant.projects,
 791                                    location,
 792                                    muted: true,
 793                                    speaking: false,
 794                                    video_tracks: Default::default(),
 795                                    audio_tracks: Default::default(),
 796                                },
 797                            );
 798
 799                            Audio::play_sound(Sound::Joined, cx);
 800
 801                            if let Some(live_kit) = this.live_kit.as_ref() {
 802                                let video_tracks =
 803                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 804                                let audio_tracks =
 805                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 806                                let publications = live_kit
 807                                    .room
 808                                    .remote_audio_track_publications(&user.id.to_string());
 809
 810                                for track in video_tracks {
 811                                    this.remote_video_track_updated(
 812                                        RemoteVideoTrackUpdate::Subscribed(track),
 813                                        cx,
 814                                    )
 815                                    .log_err();
 816                                }
 817
 818                                for (track, publication) in
 819                                    audio_tracks.iter().zip(publications.iter())
 820                                {
 821                                    this.remote_audio_track_updated(
 822                                        RemoteAudioTrackUpdate::Subscribed(
 823                                            track.clone(),
 824                                            publication.clone(),
 825                                        ),
 826                                        cx,
 827                                    )
 828                                    .log_err();
 829                                }
 830                            }
 831                        }
 832                    }
 833
 834                    this.remote_participants.retain(|user_id, participant| {
 835                        if this.participant_user_ids.contains(user_id) {
 836                            true
 837                        } else {
 838                            for project in &participant.projects {
 839                                cx.emit(Event::RemoteProjectUnshared {
 840                                    project_id: project.id,
 841                                });
 842                            }
 843                            false
 844                        }
 845                    });
 846                }
 847
 848                if let Some(pending_participants) = pending_participants.log_err() {
 849                    this.pending_participants = pending_participants;
 850                    for participant in &this.pending_participants {
 851                        this.participant_user_ids.insert(participant.id);
 852                    }
 853                }
 854
 855                this.follows_by_leader_id_project_id.clear();
 856                for follower in room.followers {
 857                    let project_id = follower.project_id;
 858                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 859                        (Some(leader), Some(follower)) => (leader, follower),
 860
 861                        _ => {
 862                            log::error!("Follower message {follower:?} missing some state");
 863                            continue;
 864                        }
 865                    };
 866
 867                    let list = this
 868                        .follows_by_leader_id_project_id
 869                        .entry((leader, project_id))
 870                        .or_insert(Vec::new());
 871                    if !list.contains(&follower) {
 872                        list.push(follower);
 873                    }
 874                }
 875
 876                this.pending_room_update.take();
 877                if this.should_leave() {
 878                    log::info!("room is empty, leaving");
 879                    let _ = this.leave(cx);
 880                }
 881
 882                this.user_store.update(cx, |user_store, cx| {
 883                    let participant_indices_by_user_id = this
 884                        .remote_participants
 885                        .iter()
 886                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 887                        .collect();
 888                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 889                });
 890
 891                this.check_invariants();
 892                this.room_update_completed_tx.try_send(Some(())).ok();
 893                cx.notify();
 894            })
 895            .ok();
 896        }));
 897
 898        cx.notify();
 899        Ok(())
 900    }
 901
 902    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 903        let mut done_rx = self.room_update_completed_rx.clone();
 904        async move {
 905            while let Some(result) = done_rx.next().await {
 906                if result.is_some() {
 907                    break;
 908                }
 909            }
 910        }
 911    }
 912
 913    fn remote_video_track_updated(
 914        &mut self,
 915        change: RemoteVideoTrackUpdate,
 916        cx: &mut ModelContext<Self>,
 917    ) -> Result<()> {
 918        match change {
 919            RemoteVideoTrackUpdate::Subscribed(track) => {
 920                let user_id = track.publisher_id().parse()?;
 921                let track_id = track.sid().to_string();
 922                let participant = self
 923                    .remote_participants
 924                    .get_mut(&user_id)
 925                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 926                participant.video_tracks.insert(track_id.clone(), track);
 927                cx.emit(Event::RemoteVideoTracksChanged {
 928                    participant_id: participant.peer_id,
 929                });
 930            }
 931            RemoteVideoTrackUpdate::Unsubscribed {
 932                publisher_id,
 933                track_id,
 934            } => {
 935                let user_id = publisher_id.parse()?;
 936                let participant = self
 937                    .remote_participants
 938                    .get_mut(&user_id)
 939                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 940                participant.video_tracks.remove(&track_id);
 941                cx.emit(Event::RemoteVideoTracksChanged {
 942                    participant_id: participant.peer_id,
 943                });
 944            }
 945        }
 946
 947        cx.notify();
 948        Ok(())
 949    }
 950
 951    fn remote_audio_track_updated(
 952        &mut self,
 953        change: RemoteAudioTrackUpdate,
 954        cx: &mut ModelContext<Self>,
 955    ) -> Result<()> {
 956        match change {
 957            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 958                let mut speaker_ids = speakers
 959                    .into_iter()
 960                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 961                    .collect::<Vec<u64>>();
 962                speaker_ids.sort_unstable();
 963                for (sid, participant) in &mut self.remote_participants {
 964                    if let Ok(_) = speaker_ids.binary_search(sid) {
 965                        participant.speaking = true;
 966                    } else {
 967                        participant.speaking = false;
 968                    }
 969                }
 970                if let Some(id) = self.client.user_id() {
 971                    if let Some(room) = &mut self.live_kit {
 972                        if let Ok(_) = speaker_ids.binary_search(&id) {
 973                            room.speaking = true;
 974                        } else {
 975                            room.speaking = false;
 976                        }
 977                    }
 978                }
 979                cx.notify();
 980            }
 981            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 982                let mut found = false;
 983                for participant in &mut self.remote_participants.values_mut() {
 984                    for track in participant.audio_tracks.values() {
 985                        if track.sid() == track_id {
 986                            found = true;
 987                            break;
 988                        }
 989                    }
 990                    if found {
 991                        participant.muted = muted;
 992                        break;
 993                    }
 994                }
 995
 996                cx.notify();
 997            }
 998            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 999                let user_id = track.publisher_id().parse()?;
1000                let track_id = track.sid().to_string();
1001                let participant = self
1002                    .remote_participants
1003                    .get_mut(&user_id)
1004                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1005                participant.audio_tracks.insert(track_id.clone(), track);
1006                participant.muted = publication.is_muted();
1007
1008                cx.emit(Event::RemoteAudioTracksChanged {
1009                    participant_id: participant.peer_id,
1010                });
1011            }
1012            RemoteAudioTrackUpdate::Unsubscribed {
1013                publisher_id,
1014                track_id,
1015            } => {
1016                let user_id = publisher_id.parse()?;
1017                let participant = self
1018                    .remote_participants
1019                    .get_mut(&user_id)
1020                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1021                participant.audio_tracks.remove(&track_id);
1022                cx.emit(Event::RemoteAudioTracksChanged {
1023                    participant_id: participant.peer_id,
1024                });
1025            }
1026        }
1027
1028        cx.notify();
1029        Ok(())
1030    }
1031
1032    fn check_invariants(&self) {
1033        #[cfg(any(test, feature = "test-support"))]
1034        {
1035            for participant in self.remote_participants.values() {
1036                assert!(self.participant_user_ids.contains(&participant.user.id));
1037                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1038            }
1039
1040            for participant in &self.pending_participants {
1041                assert!(self.participant_user_ids.contains(&participant.id));
1042                assert_ne!(participant.id, self.client.user_id().unwrap());
1043            }
1044
1045            assert_eq!(
1046                self.participant_user_ids.len(),
1047                self.remote_participants.len() + self.pending_participants.len()
1048            );
1049        }
1050    }
1051
1052    pub(crate) fn call(
1053        &mut self,
1054        called_user_id: u64,
1055        initial_project_id: Option<u64>,
1056        cx: &mut ModelContext<Self>,
1057    ) -> Task<Result<()>> {
1058        if self.status.is_offline() {
1059            return Task::ready(Err(anyhow!("room is offline")));
1060        }
1061
1062        cx.notify();
1063        let client = self.client.clone();
1064        let room_id = self.id;
1065        self.pending_call_count += 1;
1066        cx.spawn(move |this, mut cx| async move {
1067            let result = client
1068                .request(proto::Call {
1069                    room_id,
1070                    called_user_id,
1071                    initial_project_id,
1072                })
1073                .await;
1074            this.update(&mut cx, |this, cx| {
1075                this.pending_call_count -= 1;
1076                if this.should_leave() {
1077                    this.leave(cx).detach_and_log_err(cx);
1078                }
1079            })?;
1080            result?;
1081            Ok(())
1082        })
1083    }
1084
1085    pub fn join_project(
1086        &mut self,
1087        id: u64,
1088        language_registry: Arc<LanguageRegistry>,
1089        fs: Arc<dyn Fs>,
1090        cx: &mut ModelContext<Self>,
1091    ) -> Task<Result<Model<Project>>> {
1092        let client = self.client.clone();
1093        let user_store = self.user_store.clone();
1094        cx.emit(Event::RemoteProjectJoined { project_id: id });
1095        cx.spawn(move |this, mut cx| async move {
1096            let project =
1097                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1098
1099            this.update(&mut cx, |this, cx| {
1100                this.joined_projects.retain(|project| {
1101                    if let Some(project) = project.upgrade() {
1102                        !project.read(cx).is_read_only()
1103                    } else {
1104                        false
1105                    }
1106                });
1107                this.joined_projects.insert(project.downgrade());
1108            })?;
1109            Ok(project)
1110        })
1111    }
1112
1113    pub(crate) fn share_project(
1114        &mut self,
1115        project: Model<Project>,
1116        cx: &mut ModelContext<Self>,
1117    ) -> Task<Result<u64>> {
1118        if let Some(project_id) = project.read(cx).remote_id() {
1119            return Task::ready(Ok(project_id));
1120        }
1121
1122        let request = self.client.request(proto::ShareProject {
1123            room_id: self.id(),
1124            worktrees: project.read(cx).worktree_metadata_protos(cx),
1125        });
1126        cx.spawn(|this, mut cx| async move {
1127            let response = request.await?;
1128
1129            project.update(&mut cx, |project, cx| {
1130                project.shared(response.project_id, cx)
1131            })??;
1132
1133            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1134            this.update(&mut cx, |this, cx| {
1135                this.shared_projects.insert(project.downgrade());
1136                let active_project = this.local_participant.active_project.as_ref();
1137                if active_project.map_or(false, |location| *location == project) {
1138                    this.set_location(Some(&project), cx)
1139                } else {
1140                    Task::ready(Ok(()))
1141                }
1142            })?
1143            .await?;
1144
1145            Ok(response.project_id)
1146        })
1147    }
1148
1149    pub(crate) fn unshare_project(
1150        &mut self,
1151        project: Model<Project>,
1152        cx: &mut ModelContext<Self>,
1153    ) -> Result<()> {
1154        let project_id = match project.read(cx).remote_id() {
1155            Some(project_id) => project_id,
1156            None => return Ok(()),
1157        };
1158
1159        self.client.send(proto::UnshareProject { project_id })?;
1160        project.update(cx, |this, cx| this.unshare(cx))
1161    }
1162
1163    pub(crate) fn set_location(
1164        &mut self,
1165        project: Option<&Model<Project>>,
1166        cx: &mut ModelContext<Self>,
1167    ) -> Task<Result<()>> {
1168        if self.status.is_offline() {
1169            return Task::ready(Err(anyhow!("room is offline")));
1170        }
1171
1172        let client = self.client.clone();
1173        let room_id = self.id;
1174        let location = if let Some(project) = project {
1175            self.local_participant.active_project = Some(project.downgrade());
1176            if let Some(project_id) = project.read(cx).remote_id() {
1177                proto::participant_location::Variant::SharedProject(
1178                    proto::participant_location::SharedProject { id: project_id },
1179                )
1180            } else {
1181                proto::participant_location::Variant::UnsharedProject(
1182                    proto::participant_location::UnsharedProject {},
1183                )
1184            }
1185        } else {
1186            self.local_participant.active_project = None;
1187            proto::participant_location::Variant::External(proto::participant_location::External {})
1188        };
1189
1190        cx.notify();
1191        cx.background_executor().spawn(async move {
1192            client
1193                .request(proto::UpdateParticipantLocation {
1194                    room_id,
1195                    location: Some(proto::ParticipantLocation {
1196                        variant: Some(location),
1197                    }),
1198                })
1199                .await?;
1200            Ok(())
1201        })
1202    }
1203
1204    pub fn is_screen_sharing(&self) -> bool {
1205        self.live_kit.as_ref().map_or(false, |live_kit| {
1206            !matches!(live_kit.screen_track, LocalTrack::None)
1207        })
1208    }
1209
1210    pub fn is_sharing_mic(&self) -> bool {
1211        self.live_kit.as_ref().map_or(false, |live_kit| {
1212            !matches!(live_kit.microphone_track, LocalTrack::None)
1213        })
1214    }
1215
1216    pub fn is_muted(&self, cx: &AppContext) -> bool {
1217        self.live_kit
1218            .as_ref()
1219            .and_then(|live_kit| match &live_kit.microphone_track {
1220                LocalTrack::None => Some(Self::mute_on_join(cx)),
1221                LocalTrack::Pending { muted, .. } => Some(*muted),
1222                LocalTrack::Published { muted, .. } => Some(*muted),
1223            })
1224            .unwrap_or(false)
1225    }
1226
1227    pub fn is_speaking(&self) -> bool {
1228        self.live_kit
1229            .as_ref()
1230            .map_or(false, |live_kit| live_kit.speaking)
1231    }
1232
1233    pub fn is_deafened(&self) -> Option<bool> {
1234        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1235    }
1236
1237    #[track_caller]
1238    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1239        if self.status.is_offline() {
1240            return Task::ready(Err(anyhow!("room is offline")));
1241        } else if self.is_sharing_mic() {
1242            return Task::ready(Err(anyhow!("microphone was already shared")));
1243        }
1244
1245        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1246            let publish_id = post_inc(&mut live_kit.next_publish_id);
1247            live_kit.microphone_track = LocalTrack::Pending {
1248                publish_id,
1249                muted: false,
1250            };
1251            cx.notify();
1252            publish_id
1253        } else {
1254            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1255        };
1256
1257        cx.spawn(move |this, mut cx| async move {
1258            let publish_track = async {
1259                let track = LocalAudioTrack::create();
1260                this.upgrade()
1261                    .ok_or_else(|| anyhow!("room was dropped"))?
1262                    .update(&mut cx, |this, _| {
1263                        this.live_kit
1264                            .as_ref()
1265                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1266                    })?
1267                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1268                    .await
1269            };
1270            let publication = publish_track.await;
1271            this.upgrade()
1272                .ok_or_else(|| anyhow!("room was dropped"))?
1273                .update(&mut cx, |this, cx| {
1274                    let live_kit = this
1275                        .live_kit
1276                        .as_mut()
1277                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1278
1279                    let (canceled, muted) = if let LocalTrack::Pending {
1280                        publish_id: cur_publish_id,
1281                        muted,
1282                    } = &live_kit.microphone_track
1283                    {
1284                        (*cur_publish_id != publish_id, *muted)
1285                    } else {
1286                        (true, false)
1287                    };
1288
1289                    match publication {
1290                        Ok(publication) => {
1291                            if canceled {
1292                                live_kit.room.unpublish_track(publication);
1293                            } else {
1294                                if muted {
1295                                    cx.background_executor()
1296                                        .spawn(publication.set_mute(muted))
1297                                        .detach();
1298                                }
1299                                live_kit.microphone_track = LocalTrack::Published {
1300                                    track_publication: publication,
1301                                    muted,
1302                                };
1303                                cx.notify();
1304                            }
1305                            Ok(())
1306                        }
1307                        Err(error) => {
1308                            if canceled {
1309                                Ok(())
1310                            } else {
1311                                live_kit.microphone_track = LocalTrack::None;
1312                                cx.notify();
1313                                Err(error)
1314                            }
1315                        }
1316                    }
1317                })?
1318        })
1319    }
1320
1321    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1322        if self.status.is_offline() {
1323            return Task::ready(Err(anyhow!("room is offline")));
1324        } else if self.is_screen_sharing() {
1325            return Task::ready(Err(anyhow!("screen was already shared")));
1326        }
1327
1328        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1329            let publish_id = post_inc(&mut live_kit.next_publish_id);
1330            live_kit.screen_track = LocalTrack::Pending {
1331                publish_id,
1332                muted: false,
1333            };
1334            cx.notify();
1335            (live_kit.room.display_sources(), publish_id)
1336        } else {
1337            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1338        };
1339
1340        cx.spawn(move |this, mut cx| async move {
1341            let publish_track = async {
1342                let displays = displays.await?;
1343                let display = displays
1344                    .first()
1345                    .ok_or_else(|| anyhow!("no display found"))?;
1346                let track = LocalVideoTrack::screen_share_for_display(&display);
1347                this.upgrade()
1348                    .ok_or_else(|| anyhow!("room was dropped"))?
1349                    .update(&mut cx, |this, _| {
1350                        this.live_kit
1351                            .as_ref()
1352                            .map(|live_kit| live_kit.room.publish_video_track(track))
1353                    })?
1354                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1355                    .await
1356            };
1357
1358            let publication = publish_track.await;
1359            this.upgrade()
1360                .ok_or_else(|| anyhow!("room was dropped"))?
1361                .update(&mut cx, |this, cx| {
1362                    let live_kit = this
1363                        .live_kit
1364                        .as_mut()
1365                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1366
1367                    let (canceled, muted) = if let LocalTrack::Pending {
1368                        publish_id: cur_publish_id,
1369                        muted,
1370                    } = &live_kit.screen_track
1371                    {
1372                        (*cur_publish_id != publish_id, *muted)
1373                    } else {
1374                        (true, false)
1375                    };
1376
1377                    match publication {
1378                        Ok(publication) => {
1379                            if canceled {
1380                                live_kit.room.unpublish_track(publication);
1381                            } else {
1382                                if muted {
1383                                    cx.background_executor()
1384                                        .spawn(publication.set_mute(muted))
1385                                        .detach();
1386                                }
1387                                live_kit.screen_track = LocalTrack::Published {
1388                                    track_publication: publication,
1389                                    muted,
1390                                };
1391                                cx.notify();
1392                            }
1393
1394                            Audio::play_sound(Sound::StartScreenshare, cx);
1395
1396                            Ok(())
1397                        }
1398                        Err(error) => {
1399                            if canceled {
1400                                Ok(())
1401                            } else {
1402                                live_kit.screen_track = LocalTrack::None;
1403                                cx.notify();
1404                                Err(error)
1405                            }
1406                        }
1407                    }
1408                })?
1409        })
1410    }
1411
1412    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1413        let should_mute = !self.is_muted(cx);
1414        if let Some(live_kit) = self.live_kit.as_mut() {
1415            if matches!(live_kit.microphone_track, LocalTrack::None) {
1416                return Ok(self.share_microphone(cx));
1417            }
1418
1419            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1420            live_kit.muted_by_user = should_mute;
1421
1422            if old_muted == true && live_kit.deafened == true {
1423                if let Some(task) = self.toggle_deafen(cx).ok() {
1424                    task.detach();
1425                }
1426            }
1427
1428            Ok(ret_task)
1429        } else {
1430            Err(anyhow!("LiveKit not started"))
1431        }
1432    }
1433
1434    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1435        if let Some(live_kit) = self.live_kit.as_mut() {
1436            (*live_kit).deafened = !live_kit.deafened;
1437
1438            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1439            // Context notification is sent within set_mute itself.
1440            let mut mute_task = None;
1441            // When deafening, mute user's mic as well.
1442            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1443            if live_kit.deafened || !live_kit.muted_by_user {
1444                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1445            };
1446            for participant in self.remote_participants.values() {
1447                for track in live_kit
1448                    .room
1449                    .remote_audio_track_publications(&participant.user.id.to_string())
1450                {
1451                    let deafened = live_kit.deafened;
1452                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1453                }
1454            }
1455
1456            Ok(cx.foreground_executor().spawn(async move {
1457                if let Some(mute_task) = mute_task {
1458                    mute_task.await?;
1459                }
1460                for task in tasks {
1461                    task.await?;
1462                }
1463                Ok(())
1464            }))
1465        } else {
1466            Err(anyhow!("LiveKit not started"))
1467        }
1468    }
1469
1470    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1471        if self.status.is_offline() {
1472            return Err(anyhow!("room is offline"));
1473        }
1474
1475        let live_kit = self
1476            .live_kit
1477            .as_mut()
1478            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1479        match mem::take(&mut live_kit.screen_track) {
1480            LocalTrack::None => Err(anyhow!("screen was not shared")),
1481            LocalTrack::Pending { .. } => {
1482                cx.notify();
1483                Ok(())
1484            }
1485            LocalTrack::Published {
1486                track_publication, ..
1487            } => {
1488                live_kit.room.unpublish_track(track_publication);
1489                cx.notify();
1490
1491                Audio::play_sound(Sound::StopScreenshare, cx);
1492                Ok(())
1493            }
1494        }
1495    }
1496
1497    #[cfg(any(test, feature = "test-support"))]
1498    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1499        self.live_kit
1500            .as_ref()
1501            .unwrap()
1502            .room
1503            .set_display_sources(sources);
1504    }
1505}
1506
1507struct LiveKitRoom {
1508    room: Arc<live_kit_client::Room>,
1509    screen_track: LocalTrack,
1510    microphone_track: LocalTrack,
1511    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1512    muted_by_user: bool,
1513    deafened: bool,
1514    speaking: bool,
1515    next_publish_id: usize,
1516    _maintain_room: Task<()>,
1517    _maintain_tracks: [Task<()>; 2],
1518}
1519
1520impl LiveKitRoom {
1521    fn set_mute(
1522        self: &mut LiveKitRoom,
1523        should_mute: bool,
1524        cx: &mut ModelContext<Room>,
1525    ) -> Result<(Task<Result<()>>, bool)> {
1526        if !should_mute {
1527            // clear user muting state.
1528            self.muted_by_user = false;
1529        }
1530
1531        let (result, old_muted) = match &mut self.microphone_track {
1532            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1533            LocalTrack::Pending { muted, .. } => {
1534                let old_muted = *muted;
1535                *muted = should_mute;
1536                cx.notify();
1537                Ok((Task::Ready(Some(Ok(()))), old_muted))
1538            }
1539            LocalTrack::Published {
1540                track_publication,
1541                muted,
1542            } => {
1543                let old_muted = *muted;
1544                *muted = should_mute;
1545                cx.notify();
1546                Ok((
1547                    cx.background_executor()
1548                        .spawn(track_publication.set_mute(*muted)),
1549                    old_muted,
1550                ))
1551            }
1552        }?;
1553
1554        if old_muted != should_mute {
1555            if should_mute {
1556                Audio::play_sound(Sound::Mute, cx);
1557            } else {
1558                Audio::play_sound(Sound::Unmute, cx);
1559            }
1560        }
1561
1562        Ok((result, old_muted))
1563    }
1564}
1565
1566enum LocalTrack {
1567    None,
1568    Pending {
1569        publish_id: usize,
1570        muted: bool,
1571    },
1572    Published {
1573        track_publication: LocalTrackPublication,
1574        muted: bool,
1575    },
1576}
1577
1578impl Default for LocalTrack {
1579    fn default() -> Self {
1580        Self::None
1581    }
1582}
1583
1584#[derive(Copy, Clone, PartialEq, Eq)]
1585pub enum RoomStatus {
1586    Online,
1587    Rejoining,
1588    Offline,
1589}
1590
1591impl RoomStatus {
1592    pub fn is_offline(&self) -> bool {
1593        matches!(self, RoomStatus::Offline)
1594    }
1595
1596    pub fn is_online(&self) -> bool {
1597        matches!(self, RoomStatus::Online)
1598    }
1599}