room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{
  19    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  20    RemoteVideoTrackUpdate,
  21};
  22use postage::{sink::Sink, stream::Stream, watch};
  23use project::Project;
  24use settings::Settings;
  25use std::{future::Future, mem, sync::Arc, time::Duration};
  26use util::{post_inc, ResultExt, TryFutureExt};
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30#[derive(Clone, Debug, PartialEq, Eq)]
  31pub enum Event {
  32    ParticipantLocationChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteVideoTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteAudioTracksChanged {
  39        participant_id: proto::PeerId,
  40    },
  41    RemoteProjectShared {
  42        owner: Arc<User>,
  43        project_id: u64,
  44        worktree_root_names: Vec<String>,
  45    },
  46    RemoteProjectUnshared {
  47        project_id: u64,
  48    },
  49    RemoteProjectJoined {
  50        project_id: u64,
  51    },
  52    RemoteProjectInvitationDiscarded {
  53        project_id: u64,
  54    },
  55    Left,
  56}
  57
  58pub struct Room {
  59    id: u64,
  60    channel_id: Option<u64>,
  61    live_kit: Option<LiveKitRoom>,
  62    status: RoomStatus,
  63    shared_projects: HashSet<WeakModel<Project>>,
  64    joined_projects: HashSet<WeakModel<Project>>,
  65    local_participant: LocalParticipant,
  66    remote_participants: BTreeMap<u64, RemoteParticipant>,
  67    pending_participants: Vec<Arc<User>>,
  68    participant_user_ids: HashSet<u64>,
  69    pending_call_count: usize,
  70    leave_when_empty: bool,
  71    client: Arc<Client>,
  72    user_store: Model<UserStore>,
  73    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  74    client_subscriptions: Vec<client::Subscription>,
  75    _subscriptions: Vec<gpui::Subscription>,
  76    room_update_completed_tx: watch::Sender<Option<()>>,
  77    room_update_completed_rx: watch::Receiver<Option<()>>,
  78    pending_room_update: Option<Task<()>>,
  79    maintain_connection: Option<Task<Option<()>>>,
  80}
  81
  82impl EventEmitter for Room {
  83    type Event = Event;
  84}
  85
  86impl Room {
  87    pub fn channel_id(&self) -> Option<u64> {
  88        self.channel_id
  89    }
  90
  91    pub fn is_sharing_project(&self) -> bool {
  92        !self.shared_projects.is_empty()
  93    }
  94
  95    #[cfg(any(test, feature = "test-support"))]
  96    pub fn is_connected(&self) -> bool {
  97        if let Some(live_kit) = self.live_kit.as_ref() {
  98            matches!(
  99                *live_kit.room.status().borrow(),
 100                live_kit_client::ConnectionState::Connected { .. }
 101            )
 102        } else {
 103            false
 104        }
 105    }
 106
 107    fn new(
 108        id: u64,
 109        channel_id: Option<u64>,
 110        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 111        client: Arc<Client>,
 112        user_store: Model<UserStore>,
 113        cx: &mut ModelContext<Self>,
 114    ) -> Self {
 115        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 116            let room = live_kit_client::Room::new();
 117            let mut status = room.status();
 118            // Consume the initial status of the room.
 119            let _ = status.try_recv();
 120            let _maintain_room = cx.spawn(|this, mut cx| async move {
 121                while let Some(status) = status.next().await {
 122                    let this = if let Some(this) = this.upgrade() {
 123                        this
 124                    } else {
 125                        break;
 126                    };
 127
 128                    if status == live_kit_client::ConnectionState::Disconnected {
 129                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 130                            .ok();
 131                        break;
 132                    }
 133                }
 134            });
 135
 136            let _maintain_video_tracks = cx.spawn({
 137                let room = room.clone();
 138                move |this, mut cx| async move {
 139                    let mut track_video_changes = room.remote_video_track_updates();
 140                    while let Some(track_change) = track_video_changes.next().await {
 141                        let this = if let Some(this) = this.upgrade() {
 142                            this
 143                        } else {
 144                            break;
 145                        };
 146
 147                        this.update(&mut cx, |this, cx| {
 148                            this.remote_video_track_updated(track_change, cx).log_err()
 149                        })
 150                        .ok();
 151                    }
 152                }
 153            });
 154
 155            let _maintain_audio_tracks = cx.spawn({
 156                let room = room.clone();
 157                |this, mut cx| async move {
 158                    let mut track_audio_changes = room.remote_audio_track_updates();
 159                    while let Some(track_change) = track_audio_changes.next().await {
 160                        let this = if let Some(this) = this.upgrade() {
 161                            this
 162                        } else {
 163                            break;
 164                        };
 165
 166                        this.update(&mut cx, |this, cx| {
 167                            this.remote_audio_track_updated(track_change, cx).log_err()
 168                        })
 169                        .ok();
 170                    }
 171                }
 172            });
 173
 174            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 175            cx.spawn(|this, mut cx| async move {
 176                connect.await?;
 177
 178                if !cx.update(|cx| Self::mute_on_join(cx))? {
 179                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 180                        .await?;
 181                }
 182
 183                anyhow::Ok(())
 184            })
 185            .detach_and_log_err(cx);
 186
 187            Some(LiveKitRoom {
 188                room,
 189                screen_track: LocalTrack::None,
 190                microphone_track: LocalTrack::None,
 191                next_publish_id: 0,
 192                muted_by_user: false,
 193                deafened: false,
 194                speaking: false,
 195                _maintain_room,
 196                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 197            })
 198        } else {
 199            None
 200        };
 201
 202        let maintain_connection = cx.spawn({
 203            let client = client.clone();
 204            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 205        });
 206
 207        Audio::play_sound(Sound::Joined, cx);
 208
 209        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 210
 211        Self {
 212            id,
 213            channel_id,
 214            live_kit: live_kit_room,
 215            status: RoomStatus::Online,
 216            shared_projects: Default::default(),
 217            joined_projects: Default::default(),
 218            participant_user_ids: Default::default(),
 219            local_participant: Default::default(),
 220            remote_participants: Default::default(),
 221            pending_participants: Default::default(),
 222            pending_call_count: 0,
 223            client_subscriptions: vec![
 224                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 225            ],
 226            _subscriptions: vec![
 227                cx.on_release(Self::released),
 228                cx.on_app_quit(Self::app_will_quit),
 229            ],
 230            leave_when_empty: false,
 231            pending_room_update: None,
 232            client,
 233            user_store,
 234            follows_by_leader_id_project_id: Default::default(),
 235            maintain_connection: Some(maintain_connection),
 236            room_update_completed_tx,
 237            room_update_completed_rx,
 238        }
 239    }
 240
 241    pub(crate) fn create(
 242        called_user_id: u64,
 243        initial_project: Option<Model<Project>>,
 244        client: Arc<Client>,
 245        user_store: Model<UserStore>,
 246        cx: &mut AppContext,
 247    ) -> Task<Result<Model<Self>>> {
 248        cx.spawn(move |mut cx| async move {
 249            let response = client.request(proto::CreateRoom {}).await?;
 250            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 251            let room = cx.build_model(|cx| {
 252                Self::new(
 253                    room_proto.id,
 254                    None,
 255                    response.live_kit_connection_info,
 256                    client,
 257                    user_store,
 258                    cx,
 259                )
 260            })?;
 261
 262            let initial_project_id = if let Some(initial_project) = initial_project {
 263                let initial_project_id = room
 264                    .update(&mut cx, |room, cx| {
 265                        room.share_project(initial_project.clone(), cx)
 266                    })?
 267                    .await?;
 268                Some(initial_project_id)
 269            } else {
 270                None
 271            };
 272
 273            match room
 274                .update(&mut cx, |room, cx| {
 275                    room.leave_when_empty = true;
 276                    room.call(called_user_id, initial_project_id, cx)
 277                })?
 278                .await
 279            {
 280                Ok(()) => Ok(room),
 281                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 282            }
 283        })
 284    }
 285
 286    pub(crate) async fn join_channel(
 287        channel_id: u64,
 288        client: Arc<Client>,
 289        user_store: Model<UserStore>,
 290        cx: AsyncAppContext,
 291    ) -> Result<Model<Self>> {
 292        Self::from_join_response(
 293            client.request(proto::JoinChannel { channel_id }).await?,
 294            client,
 295            user_store,
 296            cx,
 297        )
 298    }
 299
 300    pub(crate) async fn join(
 301        room_id: u64,
 302        client: Arc<Client>,
 303        user_store: Model<UserStore>,
 304        cx: AsyncAppContext,
 305    ) -> Result<Model<Self>> {
 306        Self::from_join_response(
 307            client.request(proto::JoinRoom { id: room_id }).await?,
 308            client,
 309            user_store,
 310            cx,
 311        )
 312    }
 313
 314    fn released(&mut self, cx: &mut AppContext) {
 315        if self.status.is_online() {
 316            self.leave_internal(cx).detach_and_log_err(cx);
 317        }
 318    }
 319
 320    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 321        let task = if self.status.is_online() {
 322            let leave = self.leave_internal(cx);
 323            Some(cx.background_executor().spawn(async move {
 324                leave.await.log_err();
 325            }))
 326        } else {
 327            None
 328        };
 329
 330        async move {
 331            if let Some(task) = task {
 332                task.await;
 333            }
 334        }
 335    }
 336
 337    pub fn mute_on_join(cx: &AppContext) -> bool {
 338        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 339    }
 340
 341    fn from_join_response(
 342        response: proto::JoinRoomResponse,
 343        client: Arc<Client>,
 344        user_store: Model<UserStore>,
 345        mut cx: AsyncAppContext,
 346    ) -> Result<Model<Self>> {
 347        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 348        let room = cx.build_model(|cx| {
 349            Self::new(
 350                room_proto.id,
 351                response.channel_id,
 352                response.live_kit_connection_info,
 353                client,
 354                user_store,
 355                cx,
 356            )
 357        })?;
 358        room.update(&mut cx, |room, cx| {
 359            room.leave_when_empty = room.channel_id.is_none();
 360            room.apply_room_update(room_proto, cx)?;
 361            anyhow::Ok(())
 362        })??;
 363        Ok(room)
 364    }
 365
 366    fn should_leave(&self) -> bool {
 367        self.leave_when_empty
 368            && self.pending_room_update.is_none()
 369            && self.pending_participants.is_empty()
 370            && self.remote_participants.is_empty()
 371            && self.pending_call_count == 0
 372    }
 373
 374    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 375        cx.notify();
 376        cx.emit(Event::Left);
 377        self.leave_internal(cx)
 378    }
 379
 380    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 381        if self.status.is_offline() {
 382            return Task::ready(Err(anyhow!("room is offline")));
 383        }
 384
 385        log::info!("leaving room");
 386        Audio::play_sound(Sound::Leave, cx);
 387
 388        self.clear_state(cx);
 389
 390        let leave_room = self.client.request(proto::LeaveRoom {});
 391        cx.background_executor().spawn(async move {
 392            leave_room.await?;
 393            anyhow::Ok(())
 394        })
 395    }
 396
 397    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 398        for project in self.shared_projects.drain() {
 399            if let Some(project) = project.upgrade() {
 400                project.update(cx, |project, cx| {
 401                    project.unshare(cx).log_err();
 402                });
 403            }
 404        }
 405        for project in self.joined_projects.drain() {
 406            if let Some(project) = project.upgrade() {
 407                project.update(cx, |project, cx| {
 408                    project.disconnected_from_host(cx);
 409                    project.close(cx);
 410                });
 411            }
 412        }
 413
 414        self.status = RoomStatus::Offline;
 415        self.remote_participants.clear();
 416        self.pending_participants.clear();
 417        self.participant_user_ids.clear();
 418        self.client_subscriptions.clear();
 419        self.live_kit.take();
 420        self.pending_room_update.take();
 421        self.maintain_connection.take();
 422    }
 423
 424    async fn maintain_connection(
 425        this: WeakModel<Self>,
 426        client: Arc<Client>,
 427        mut cx: AsyncAppContext,
 428    ) -> Result<()> {
 429        let mut client_status = client.status();
 430        loop {
 431            let _ = client_status.try_recv();
 432            let is_connected = client_status.borrow().is_connected();
 433            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 434            if !is_connected || client_status.next().await.is_some() {
 435                log::info!("detected client disconnection");
 436
 437                this.upgrade()
 438                    .ok_or_else(|| anyhow!("room was dropped"))?
 439                    .update(&mut cx, |this, cx| {
 440                        this.status = RoomStatus::Rejoining;
 441                        cx.notify();
 442                    })?;
 443
 444                // Wait for client to re-establish a connection to the server.
 445                {
 446                    let mut reconnection_timeout =
 447                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 448                    let client_reconnection = async {
 449                        let mut remaining_attempts = 3;
 450                        while remaining_attempts > 0 {
 451                            if client_status.borrow().is_connected() {
 452                                log::info!("client reconnected, attempting to rejoin room");
 453
 454                                let Some(this) = this.upgrade() else { break };
 455                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 456                                    Ok(task) => {
 457                                        if task.await.log_err().is_some() {
 458                                            return true;
 459                                        } else {
 460                                            remaining_attempts -= 1;
 461                                        }
 462                                    }
 463                                    Err(_app_dropped) => return false,
 464                                }
 465                            } else if client_status.borrow().is_signed_out() {
 466                                return false;
 467                            }
 468
 469                            log::info!(
 470                                "waiting for client status change, remaining attempts {}",
 471                                remaining_attempts
 472                            );
 473                            client_status.next().await;
 474                        }
 475                        false
 476                    }
 477                    .fuse();
 478                    futures::pin_mut!(client_reconnection);
 479
 480                    futures::select_biased! {
 481                        reconnected = client_reconnection => {
 482                            if reconnected {
 483                                log::info!("successfully reconnected to room");
 484                                // If we successfully joined the room, go back around the loop
 485                                // waiting for future connection status changes.
 486                                continue;
 487                            }
 488                        }
 489                        _ = reconnection_timeout => {
 490                            log::info!("room reconnection timeout expired");
 491                        }
 492                    }
 493                }
 494
 495                break;
 496            }
 497        }
 498
 499        // The client failed to re-establish a connection to the server
 500        // or an error occurred while trying to re-join the room. Either way
 501        // we leave the room and return an error.
 502        if let Some(this) = this.upgrade() {
 503            log::info!("reconnection failed, leaving room");
 504            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 505        }
 506        Err(anyhow!(
 507            "can't reconnect to room: client failed to re-establish connection"
 508        ))
 509    }
 510
 511    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 512        let mut projects = HashMap::default();
 513        let mut reshared_projects = Vec::new();
 514        let mut rejoined_projects = Vec::new();
 515        self.shared_projects.retain(|project| {
 516            if let Some(handle) = project.upgrade() {
 517                let project = handle.read(cx);
 518                if let Some(project_id) = project.remote_id() {
 519                    projects.insert(project_id, handle.clone());
 520                    reshared_projects.push(proto::UpdateProject {
 521                        project_id,
 522                        worktrees: project.worktree_metadata_protos(cx),
 523                    });
 524                    return true;
 525                }
 526            }
 527            false
 528        });
 529        self.joined_projects.retain(|project| {
 530            if let Some(handle) = project.upgrade() {
 531                let project = handle.read(cx);
 532                if let Some(project_id) = project.remote_id() {
 533                    projects.insert(project_id, handle.clone());
 534                    rejoined_projects.push(proto::RejoinProject {
 535                        id: project_id,
 536                        worktrees: project
 537                            .worktrees()
 538                            .map(|worktree| {
 539                                let worktree = worktree.read(cx);
 540                                proto::RejoinWorktree {
 541                                    id: worktree.id().to_proto(),
 542                                    scan_id: worktree.completed_scan_id() as u64,
 543                                }
 544                            })
 545                            .collect(),
 546                    });
 547                }
 548                return true;
 549            }
 550            false
 551        });
 552
 553        let response = self.client.request_envelope(proto::RejoinRoom {
 554            id: self.id,
 555            reshared_projects,
 556            rejoined_projects,
 557        });
 558
 559        cx.spawn(|this, mut cx| async move {
 560            let response = response.await?;
 561            let message_id = response.message_id;
 562            let response = response.payload;
 563            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 564            this.update(&mut cx, |this, cx| {
 565                this.status = RoomStatus::Online;
 566                this.apply_room_update(room_proto, cx)?;
 567
 568                for reshared_project in response.reshared_projects {
 569                    if let Some(project) = projects.get(&reshared_project.id) {
 570                        project.update(cx, |project, cx| {
 571                            project.reshared(reshared_project, cx).log_err();
 572                        });
 573                    }
 574                }
 575
 576                for rejoined_project in response.rejoined_projects {
 577                    if let Some(project) = projects.get(&rejoined_project.id) {
 578                        project.update(cx, |project, cx| {
 579                            project.rejoined(rejoined_project, message_id, cx).log_err();
 580                        });
 581                    }
 582                }
 583
 584                anyhow::Ok(())
 585            })?
 586        })
 587    }
 588
 589    pub fn id(&self) -> u64 {
 590        self.id
 591    }
 592
 593    pub fn status(&self) -> RoomStatus {
 594        self.status
 595    }
 596
 597    pub fn local_participant(&self) -> &LocalParticipant {
 598        &self.local_participant
 599    }
 600
 601    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 602        &self.remote_participants
 603    }
 604
 605    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 606        self.remote_participants
 607            .values()
 608            .find(|p| p.peer_id == peer_id)
 609    }
 610
 611    pub fn pending_participants(&self) -> &[Arc<User>] {
 612        &self.pending_participants
 613    }
 614
 615    pub fn contains_participant(&self, user_id: u64) -> bool {
 616        self.participant_user_ids.contains(&user_id)
 617    }
 618
 619    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 620        self.follows_by_leader_id_project_id
 621            .get(&(leader_id, project_id))
 622            .map_or(&[], |v| v.as_slice())
 623    }
 624
 625    /// Returns the most 'active' projects, defined as most people in the project
 626    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 627        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 628        for participant in self.remote_participants.values() {
 629            match participant.location {
 630                ParticipantLocation::SharedProject { project_id } => {
 631                    project_hosts_and_guest_counts
 632                        .entry(project_id)
 633                        .or_default()
 634                        .1 += 1;
 635                }
 636                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 637            }
 638            for project in &participant.projects {
 639                project_hosts_and_guest_counts
 640                    .entry(project.id)
 641                    .or_default()
 642                    .0 = Some(participant.user.id);
 643            }
 644        }
 645
 646        if let Some(user) = self.user_store.read(cx).current_user() {
 647            for project in &self.local_participant.projects {
 648                project_hosts_and_guest_counts
 649                    .entry(project.id)
 650                    .or_default()
 651                    .0 = Some(user.id);
 652            }
 653        }
 654
 655        project_hosts_and_guest_counts
 656            .into_iter()
 657            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 658            .max_by_key(|(_, _, guest_count)| *guest_count)
 659            .map(|(id, host, _)| (id, host))
 660    }
 661
 662    async fn handle_room_updated(
 663        this: Model<Self>,
 664        envelope: TypedEnvelope<proto::RoomUpdated>,
 665        _: Arc<Client>,
 666        mut cx: AsyncAppContext,
 667    ) -> Result<()> {
 668        let room = envelope
 669            .payload
 670            .room
 671            .ok_or_else(|| anyhow!("invalid room"))?;
 672        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 673    }
 674
 675    fn apply_room_update(
 676        &mut self,
 677        mut room: proto::Room,
 678        cx: &mut ModelContext<Self>,
 679    ) -> Result<()> {
 680        // Filter ourselves out from the room's participants.
 681        let local_participant_ix = room
 682            .participants
 683            .iter()
 684            .position(|participant| Some(participant.user_id) == self.client.user_id());
 685        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 686
 687        let pending_participant_user_ids = room
 688            .pending_participants
 689            .iter()
 690            .map(|p| p.user_id)
 691            .collect::<Vec<_>>();
 692
 693        let remote_participant_user_ids = room
 694            .participants
 695            .iter()
 696            .map(|p| p.user_id)
 697            .collect::<Vec<_>>();
 698
 699        let (remote_participants, pending_participants) =
 700            self.user_store.update(cx, move |user_store, cx| {
 701                (
 702                    user_store.get_users(remote_participant_user_ids, cx),
 703                    user_store.get_users(pending_participant_user_ids, cx),
 704                )
 705            });
 706
 707        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 708            let (remote_participants, pending_participants) =
 709                futures::join!(remote_participants, pending_participants);
 710
 711            this.update(&mut cx, |this, cx| {
 712                this.participant_user_ids.clear();
 713
 714                if let Some(participant) = local_participant {
 715                    this.local_participant.projects = participant.projects;
 716                } else {
 717                    this.local_participant.projects.clear();
 718                }
 719
 720                if let Some(participants) = remote_participants.log_err() {
 721                    for (participant, user) in room.participants.into_iter().zip(participants) {
 722                        let Some(peer_id) = participant.peer_id else {
 723                            continue;
 724                        };
 725                        let participant_index = ParticipantIndex(participant.participant_index);
 726                        this.participant_user_ids.insert(participant.user_id);
 727
 728                        let old_projects = this
 729                            .remote_participants
 730                            .get(&participant.user_id)
 731                            .into_iter()
 732                            .flat_map(|existing| &existing.projects)
 733                            .map(|project| project.id)
 734                            .collect::<HashSet<_>>();
 735                        let new_projects = participant
 736                            .projects
 737                            .iter()
 738                            .map(|project| project.id)
 739                            .collect::<HashSet<_>>();
 740
 741                        for project in &participant.projects {
 742                            if !old_projects.contains(&project.id) {
 743                                cx.emit(Event::RemoteProjectShared {
 744                                    owner: user.clone(),
 745                                    project_id: project.id,
 746                                    worktree_root_names: project.worktree_root_names.clone(),
 747                                });
 748                            }
 749                        }
 750
 751                        for unshared_project_id in old_projects.difference(&new_projects) {
 752                            this.joined_projects.retain(|project| {
 753                                if let Some(project) = project.upgrade() {
 754                                    project.update(cx, |project, cx| {
 755                                        if project.remote_id() == Some(*unshared_project_id) {
 756                                            project.disconnected_from_host(cx);
 757                                            false
 758                                        } else {
 759                                            true
 760                                        }
 761                                    })
 762                                } else {
 763                                    false
 764                                }
 765                            });
 766                            cx.emit(Event::RemoteProjectUnshared {
 767                                project_id: *unshared_project_id,
 768                            });
 769                        }
 770
 771                        let location = ParticipantLocation::from_proto(participant.location)
 772                            .unwrap_or(ParticipantLocation::External);
 773                        if let Some(remote_participant) =
 774                            this.remote_participants.get_mut(&participant.user_id)
 775                        {
 776                            remote_participant.peer_id = peer_id;
 777                            remote_participant.projects = participant.projects;
 778                            remote_participant.participant_index = participant_index;
 779                            if location != remote_participant.location {
 780                                remote_participant.location = location;
 781                                cx.emit(Event::ParticipantLocationChanged {
 782                                    participant_id: peer_id,
 783                                });
 784                            }
 785                        } else {
 786                            this.remote_participants.insert(
 787                                participant.user_id,
 788                                RemoteParticipant {
 789                                    user: user.clone(),
 790                                    participant_index,
 791                                    peer_id,
 792                                    projects: participant.projects,
 793                                    location,
 794                                    muted: true,
 795                                    speaking: false,
 796                                    video_tracks: Default::default(),
 797                                    audio_tracks: Default::default(),
 798                                },
 799                            );
 800
 801                            Audio::play_sound(Sound::Joined, cx);
 802
 803                            if let Some(live_kit) = this.live_kit.as_ref() {
 804                                let video_tracks =
 805                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 806                                let audio_tracks =
 807                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 808                                let publications = live_kit
 809                                    .room
 810                                    .remote_audio_track_publications(&user.id.to_string());
 811
 812                                for track in video_tracks {
 813                                    this.remote_video_track_updated(
 814                                        RemoteVideoTrackUpdate::Subscribed(track),
 815                                        cx,
 816                                    )
 817                                    .log_err();
 818                                }
 819
 820                                for (track, publication) in
 821                                    audio_tracks.iter().zip(publications.iter())
 822                                {
 823                                    this.remote_audio_track_updated(
 824                                        RemoteAudioTrackUpdate::Subscribed(
 825                                            track.clone(),
 826                                            publication.clone(),
 827                                        ),
 828                                        cx,
 829                                    )
 830                                    .log_err();
 831                                }
 832                            }
 833                        }
 834                    }
 835
 836                    this.remote_participants.retain(|user_id, participant| {
 837                        if this.participant_user_ids.contains(user_id) {
 838                            true
 839                        } else {
 840                            for project in &participant.projects {
 841                                cx.emit(Event::RemoteProjectUnshared {
 842                                    project_id: project.id,
 843                                });
 844                            }
 845                            false
 846                        }
 847                    });
 848                }
 849
 850                if let Some(pending_participants) = pending_participants.log_err() {
 851                    this.pending_participants = pending_participants;
 852                    for participant in &this.pending_participants {
 853                        this.participant_user_ids.insert(participant.id);
 854                    }
 855                }
 856
 857                this.follows_by_leader_id_project_id.clear();
 858                for follower in room.followers {
 859                    let project_id = follower.project_id;
 860                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 861                        (Some(leader), Some(follower)) => (leader, follower),
 862
 863                        _ => {
 864                            log::error!("Follower message {follower:?} missing some state");
 865                            continue;
 866                        }
 867                    };
 868
 869                    let list = this
 870                        .follows_by_leader_id_project_id
 871                        .entry((leader, project_id))
 872                        .or_insert(Vec::new());
 873                    if !list.contains(&follower) {
 874                        list.push(follower);
 875                    }
 876                }
 877
 878                this.pending_room_update.take();
 879                if this.should_leave() {
 880                    log::info!("room is empty, leaving");
 881                    let _ = this.leave(cx);
 882                }
 883
 884                this.user_store.update(cx, |user_store, cx| {
 885                    let participant_indices_by_user_id = this
 886                        .remote_participants
 887                        .iter()
 888                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 889                        .collect();
 890                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 891                });
 892
 893                this.check_invariants();
 894                this.room_update_completed_tx.try_send(Some(())).ok();
 895                cx.notify();
 896            })
 897            .ok();
 898        }));
 899
 900        cx.notify();
 901        Ok(())
 902    }
 903
 904    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 905        let mut done_rx = self.room_update_completed_rx.clone();
 906        async move {
 907            while let Some(result) = done_rx.next().await {
 908                if result.is_some() {
 909                    break;
 910                }
 911            }
 912        }
 913    }
 914
 915    fn remote_video_track_updated(
 916        &mut self,
 917        change: RemoteVideoTrackUpdate,
 918        cx: &mut ModelContext<Self>,
 919    ) -> Result<()> {
 920        match change {
 921            RemoteVideoTrackUpdate::Subscribed(track) => {
 922                let user_id = track.publisher_id().parse()?;
 923                let track_id = track.sid().to_string();
 924                let participant = self
 925                    .remote_participants
 926                    .get_mut(&user_id)
 927                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 928                participant.video_tracks.insert(track_id.clone(), track);
 929                cx.emit(Event::RemoteVideoTracksChanged {
 930                    participant_id: participant.peer_id,
 931                });
 932            }
 933            RemoteVideoTrackUpdate::Unsubscribed {
 934                publisher_id,
 935                track_id,
 936            } => {
 937                let user_id = publisher_id.parse()?;
 938                let participant = self
 939                    .remote_participants
 940                    .get_mut(&user_id)
 941                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 942                participant.video_tracks.remove(&track_id);
 943                cx.emit(Event::RemoteVideoTracksChanged {
 944                    participant_id: participant.peer_id,
 945                });
 946            }
 947        }
 948
 949        cx.notify();
 950        Ok(())
 951    }
 952
 953    fn remote_audio_track_updated(
 954        &mut self,
 955        change: RemoteAudioTrackUpdate,
 956        cx: &mut ModelContext<Self>,
 957    ) -> Result<()> {
 958        match change {
 959            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 960                let mut speaker_ids = speakers
 961                    .into_iter()
 962                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 963                    .collect::<Vec<u64>>();
 964                speaker_ids.sort_unstable();
 965                for (sid, participant) in &mut self.remote_participants {
 966                    if let Ok(_) = speaker_ids.binary_search(sid) {
 967                        participant.speaking = true;
 968                    } else {
 969                        participant.speaking = false;
 970                    }
 971                }
 972                if let Some(id) = self.client.user_id() {
 973                    if let Some(room) = &mut self.live_kit {
 974                        if let Ok(_) = speaker_ids.binary_search(&id) {
 975                            room.speaking = true;
 976                        } else {
 977                            room.speaking = false;
 978                        }
 979                    }
 980                }
 981                cx.notify();
 982            }
 983            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 984                let mut found = false;
 985                for participant in &mut self.remote_participants.values_mut() {
 986                    for track in participant.audio_tracks.values() {
 987                        if track.sid() == track_id {
 988                            found = true;
 989                            break;
 990                        }
 991                    }
 992                    if found {
 993                        participant.muted = muted;
 994                        break;
 995                    }
 996                }
 997
 998                cx.notify();
 999            }
1000            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1001                let user_id = track.publisher_id().parse()?;
1002                let track_id = track.sid().to_string();
1003                let participant = self
1004                    .remote_participants
1005                    .get_mut(&user_id)
1006                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1007                participant.audio_tracks.insert(track_id.clone(), track);
1008                participant.muted = publication.is_muted();
1009
1010                cx.emit(Event::RemoteAudioTracksChanged {
1011                    participant_id: participant.peer_id,
1012                });
1013            }
1014            RemoteAudioTrackUpdate::Unsubscribed {
1015                publisher_id,
1016                track_id,
1017            } => {
1018                let user_id = publisher_id.parse()?;
1019                let participant = self
1020                    .remote_participants
1021                    .get_mut(&user_id)
1022                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1023                participant.audio_tracks.remove(&track_id);
1024                cx.emit(Event::RemoteAudioTracksChanged {
1025                    participant_id: participant.peer_id,
1026                });
1027            }
1028        }
1029
1030        cx.notify();
1031        Ok(())
1032    }
1033
1034    fn check_invariants(&self) {
1035        #[cfg(any(test, feature = "test-support"))]
1036        {
1037            for participant in self.remote_participants.values() {
1038                assert!(self.participant_user_ids.contains(&participant.user.id));
1039                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1040            }
1041
1042            for participant in &self.pending_participants {
1043                assert!(self.participant_user_ids.contains(&participant.id));
1044                assert_ne!(participant.id, self.client.user_id().unwrap());
1045            }
1046
1047            assert_eq!(
1048                self.participant_user_ids.len(),
1049                self.remote_participants.len() + self.pending_participants.len()
1050            );
1051        }
1052    }
1053
1054    pub(crate) fn call(
1055        &mut self,
1056        called_user_id: u64,
1057        initial_project_id: Option<u64>,
1058        cx: &mut ModelContext<Self>,
1059    ) -> Task<Result<()>> {
1060        if self.status.is_offline() {
1061            return Task::ready(Err(anyhow!("room is offline")));
1062        }
1063
1064        cx.notify();
1065        let client = self.client.clone();
1066        let room_id = self.id;
1067        self.pending_call_count += 1;
1068        cx.spawn(move |this, mut cx| async move {
1069            let result = client
1070                .request(proto::Call {
1071                    room_id,
1072                    called_user_id,
1073                    initial_project_id,
1074                })
1075                .await;
1076            this.update(&mut cx, |this, cx| {
1077                this.pending_call_count -= 1;
1078                if this.should_leave() {
1079                    this.leave(cx).detach_and_log_err(cx);
1080                }
1081            })?;
1082            result?;
1083            Ok(())
1084        })
1085    }
1086
1087    pub fn join_project(
1088        &mut self,
1089        id: u64,
1090        language_registry: Arc<LanguageRegistry>,
1091        fs: Arc<dyn Fs>,
1092        cx: &mut ModelContext<Self>,
1093    ) -> Task<Result<Model<Project>>> {
1094        let client = self.client.clone();
1095        let user_store = self.user_store.clone();
1096        cx.emit(Event::RemoteProjectJoined { project_id: id });
1097        cx.spawn(move |this, mut cx| async move {
1098            let project =
1099                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1100
1101            this.update(&mut cx, |this, cx| {
1102                this.joined_projects.retain(|project| {
1103                    if let Some(project) = project.upgrade() {
1104                        !project.read(cx).is_read_only()
1105                    } else {
1106                        false
1107                    }
1108                });
1109                this.joined_projects.insert(project.downgrade());
1110            })?;
1111            Ok(project)
1112        })
1113    }
1114
1115    pub(crate) fn share_project(
1116        &mut self,
1117        project: Model<Project>,
1118        cx: &mut ModelContext<Self>,
1119    ) -> Task<Result<u64>> {
1120        if let Some(project_id) = project.read(cx).remote_id() {
1121            return Task::ready(Ok(project_id));
1122        }
1123
1124        let request = self.client.request(proto::ShareProject {
1125            room_id: self.id(),
1126            worktrees: project.read(cx).worktree_metadata_protos(cx),
1127        });
1128        cx.spawn(|this, mut cx| async move {
1129            let response = request.await?;
1130
1131            project.update(&mut cx, |project, cx| {
1132                project.shared(response.project_id, cx)
1133            })??;
1134
1135            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1136            this.update(&mut cx, |this, cx| {
1137                this.shared_projects.insert(project.downgrade());
1138                let active_project = this.local_participant.active_project.as_ref();
1139                if active_project.map_or(false, |location| *location == project) {
1140                    this.set_location(Some(&project), cx)
1141                } else {
1142                    Task::ready(Ok(()))
1143                }
1144            })?
1145            .await?;
1146
1147            Ok(response.project_id)
1148        })
1149    }
1150
1151    pub(crate) fn unshare_project(
1152        &mut self,
1153        project: Model<Project>,
1154        cx: &mut ModelContext<Self>,
1155    ) -> Result<()> {
1156        let project_id = match project.read(cx).remote_id() {
1157            Some(project_id) => project_id,
1158            None => return Ok(()),
1159        };
1160
1161        self.client.send(proto::UnshareProject { project_id })?;
1162        project.update(cx, |this, cx| this.unshare(cx))
1163    }
1164
1165    pub(crate) fn set_location(
1166        &mut self,
1167        project: Option<&Model<Project>>,
1168        cx: &mut ModelContext<Self>,
1169    ) -> Task<Result<()>> {
1170        if self.status.is_offline() {
1171            return Task::ready(Err(anyhow!("room is offline")));
1172        }
1173
1174        let client = self.client.clone();
1175        let room_id = self.id;
1176        let location = if let Some(project) = project {
1177            self.local_participant.active_project = Some(project.downgrade());
1178            if let Some(project_id) = project.read(cx).remote_id() {
1179                proto::participant_location::Variant::SharedProject(
1180                    proto::participant_location::SharedProject { id: project_id },
1181                )
1182            } else {
1183                proto::participant_location::Variant::UnsharedProject(
1184                    proto::participant_location::UnsharedProject {},
1185                )
1186            }
1187        } else {
1188            self.local_participant.active_project = None;
1189            proto::participant_location::Variant::External(proto::participant_location::External {})
1190        };
1191
1192        cx.notify();
1193        cx.background_executor().spawn(async move {
1194            client
1195                .request(proto::UpdateParticipantLocation {
1196                    room_id,
1197                    location: Some(proto::ParticipantLocation {
1198                        variant: Some(location),
1199                    }),
1200                })
1201                .await?;
1202            Ok(())
1203        })
1204    }
1205
1206    pub fn is_screen_sharing(&self) -> bool {
1207        self.live_kit.as_ref().map_or(false, |live_kit| {
1208            !matches!(live_kit.screen_track, LocalTrack::None)
1209        })
1210    }
1211
1212    pub fn is_sharing_mic(&self) -> bool {
1213        self.live_kit.as_ref().map_or(false, |live_kit| {
1214            !matches!(live_kit.microphone_track, LocalTrack::None)
1215        })
1216    }
1217
1218    pub fn is_muted(&self, cx: &AppContext) -> bool {
1219        self.live_kit
1220            .as_ref()
1221            .and_then(|live_kit| match &live_kit.microphone_track {
1222                LocalTrack::None => Some(Self::mute_on_join(cx)),
1223                LocalTrack::Pending { muted, .. } => Some(*muted),
1224                LocalTrack::Published { muted, .. } => Some(*muted),
1225            })
1226            .unwrap_or(false)
1227    }
1228
1229    pub fn is_speaking(&self) -> bool {
1230        self.live_kit
1231            .as_ref()
1232            .map_or(false, |live_kit| live_kit.speaking)
1233    }
1234
1235    pub fn is_deafened(&self) -> Option<bool> {
1236        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1237    }
1238
1239    #[track_caller]
1240    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1241        if self.status.is_offline() {
1242            return Task::ready(Err(anyhow!("room is offline")));
1243        } else if self.is_sharing_mic() {
1244            return Task::ready(Err(anyhow!("microphone was already shared")));
1245        }
1246
1247        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1248            let publish_id = post_inc(&mut live_kit.next_publish_id);
1249            live_kit.microphone_track = LocalTrack::Pending {
1250                publish_id,
1251                muted: false,
1252            };
1253            cx.notify();
1254            publish_id
1255        } else {
1256            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1257        };
1258
1259        cx.spawn(move |this, mut cx| async move {
1260            let publish_track = async {
1261                let track = LocalAudioTrack::create();
1262                this.upgrade()
1263                    .ok_or_else(|| anyhow!("room was dropped"))?
1264                    .update(&mut cx, |this, _| {
1265                        this.live_kit
1266                            .as_ref()
1267                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1268                    })?
1269                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1270                    .await
1271            };
1272
1273            let publication = publish_track.await;
1274            this.upgrade()
1275                .ok_or_else(|| anyhow!("room was dropped"))?
1276                .update(&mut cx, |this, cx| {
1277                    let live_kit = this
1278                        .live_kit
1279                        .as_mut()
1280                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1281
1282                    let (canceled, muted) = if let LocalTrack::Pending {
1283                        publish_id: cur_publish_id,
1284                        muted,
1285                    } = &live_kit.microphone_track
1286                    {
1287                        (*cur_publish_id != publish_id, *muted)
1288                    } else {
1289                        (true, false)
1290                    };
1291
1292                    match publication {
1293                        Ok(publication) => {
1294                            if canceled {
1295                                live_kit.room.unpublish_track(publication);
1296                            } else {
1297                                if muted {
1298                                    cx.background_executor()
1299                                        .spawn(publication.set_mute(muted))
1300                                        .detach();
1301                                }
1302                                live_kit.microphone_track = LocalTrack::Published {
1303                                    track_publication: publication,
1304                                    muted,
1305                                };
1306                                cx.notify();
1307                            }
1308                            Ok(())
1309                        }
1310                        Err(error) => {
1311                            if canceled {
1312                                Ok(())
1313                            } else {
1314                                live_kit.microphone_track = LocalTrack::None;
1315                                cx.notify();
1316                                Err(error)
1317                            }
1318                        }
1319                    }
1320                })?
1321        })
1322    }
1323
1324    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1325        if self.status.is_offline() {
1326            return Task::ready(Err(anyhow!("room is offline")));
1327        } else if self.is_screen_sharing() {
1328            return Task::ready(Err(anyhow!("screen was already shared")));
1329        }
1330
1331        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1332            let publish_id = post_inc(&mut live_kit.next_publish_id);
1333            live_kit.screen_track = LocalTrack::Pending {
1334                publish_id,
1335                muted: false,
1336            };
1337            cx.notify();
1338            (live_kit.room.display_sources(), publish_id)
1339        } else {
1340            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1341        };
1342
1343        cx.spawn(move |this, mut cx| async move {
1344            let publish_track = async {
1345                let displays = displays.await?;
1346                let display = displays
1347                    .first()
1348                    .ok_or_else(|| anyhow!("no display found"))?;
1349                let track = LocalVideoTrack::screen_share_for_display(&display);
1350                this.upgrade()
1351                    .ok_or_else(|| anyhow!("room was dropped"))?
1352                    .update(&mut cx, |this, _| {
1353                        this.live_kit
1354                            .as_ref()
1355                            .map(|live_kit| live_kit.room.publish_video_track(track))
1356                    })?
1357                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1358                    .await
1359            };
1360
1361            let publication = publish_track.await;
1362            this.upgrade()
1363                .ok_or_else(|| anyhow!("room was dropped"))?
1364                .update(&mut cx, |this, cx| {
1365                    let live_kit = this
1366                        .live_kit
1367                        .as_mut()
1368                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1369
1370                    let (canceled, muted) = if let LocalTrack::Pending {
1371                        publish_id: cur_publish_id,
1372                        muted,
1373                    } = &live_kit.screen_track
1374                    {
1375                        (*cur_publish_id != publish_id, *muted)
1376                    } else {
1377                        (true, false)
1378                    };
1379
1380                    match publication {
1381                        Ok(publication) => {
1382                            if canceled {
1383                                live_kit.room.unpublish_track(publication);
1384                            } else {
1385                                if muted {
1386                                    cx.background_executor()
1387                                        .spawn(publication.set_mute(muted))
1388                                        .detach();
1389                                }
1390                                live_kit.screen_track = LocalTrack::Published {
1391                                    track_publication: publication,
1392                                    muted,
1393                                };
1394                                cx.notify();
1395                            }
1396
1397                            Audio::play_sound(Sound::StartScreenshare, cx);
1398
1399                            Ok(())
1400                        }
1401                        Err(error) => {
1402                            if canceled {
1403                                Ok(())
1404                            } else {
1405                                live_kit.screen_track = LocalTrack::None;
1406                                cx.notify();
1407                                Err(error)
1408                            }
1409                        }
1410                    }
1411                })?
1412        })
1413    }
1414
1415    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1416        let should_mute = !self.is_muted(cx);
1417        if let Some(live_kit) = self.live_kit.as_mut() {
1418            if matches!(live_kit.microphone_track, LocalTrack::None) {
1419                return Ok(self.share_microphone(cx));
1420            }
1421
1422            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1423            live_kit.muted_by_user = should_mute;
1424
1425            if old_muted == true && live_kit.deafened == true {
1426                if let Some(task) = self.toggle_deafen(cx).ok() {
1427                    task.detach();
1428                }
1429            }
1430
1431            Ok(ret_task)
1432        } else {
1433            Err(anyhow!("LiveKit not started"))
1434        }
1435    }
1436
1437    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1438        if let Some(live_kit) = self.live_kit.as_mut() {
1439            (*live_kit).deafened = !live_kit.deafened;
1440
1441            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1442            // Context notification is sent within set_mute itself.
1443            let mut mute_task = None;
1444            // When deafening, mute user's mic as well.
1445            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1446            if live_kit.deafened || !live_kit.muted_by_user {
1447                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1448            };
1449            for participant in self.remote_participants.values() {
1450                for track in live_kit
1451                    .room
1452                    .remote_audio_track_publications(&participant.user.id.to_string())
1453                {
1454                    let deafened = live_kit.deafened;
1455                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1456                }
1457            }
1458
1459            Ok(cx.foreground_executor().spawn(async move {
1460                if let Some(mute_task) = mute_task {
1461                    mute_task.await?;
1462                }
1463                for task in tasks {
1464                    task.await?;
1465                }
1466                Ok(())
1467            }))
1468        } else {
1469            Err(anyhow!("LiveKit not started"))
1470        }
1471    }
1472
1473    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1474        if self.status.is_offline() {
1475            return Err(anyhow!("room is offline"));
1476        }
1477
1478        let live_kit = self
1479            .live_kit
1480            .as_mut()
1481            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1482        match mem::take(&mut live_kit.screen_track) {
1483            LocalTrack::None => Err(anyhow!("screen was not shared")),
1484            LocalTrack::Pending { .. } => {
1485                cx.notify();
1486                Ok(())
1487            }
1488            LocalTrack::Published {
1489                track_publication, ..
1490            } => {
1491                live_kit.room.unpublish_track(track_publication);
1492                cx.notify();
1493
1494                Audio::play_sound(Sound::StopScreenshare, cx);
1495                Ok(())
1496            }
1497        }
1498    }
1499
1500    #[cfg(any(test, feature = "test-support"))]
1501    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1502        self.live_kit
1503            .as_ref()
1504            .unwrap()
1505            .room
1506            .set_display_sources(sources);
1507    }
1508}
1509
1510struct LiveKitRoom {
1511    room: Arc<live_kit_client::Room>,
1512    screen_track: LocalTrack,
1513    microphone_track: LocalTrack,
1514    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1515    muted_by_user: bool,
1516    deafened: bool,
1517    speaking: bool,
1518    next_publish_id: usize,
1519    _maintain_room: Task<()>,
1520    _maintain_tracks: [Task<()>; 2],
1521}
1522
1523impl LiveKitRoom {
1524    fn set_mute(
1525        self: &mut LiveKitRoom,
1526        should_mute: bool,
1527        cx: &mut ModelContext<Room>,
1528    ) -> Result<(Task<Result<()>>, bool)> {
1529        if !should_mute {
1530            // clear user muting state.
1531            self.muted_by_user = false;
1532        }
1533
1534        let (result, old_muted) = match &mut self.microphone_track {
1535            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1536            LocalTrack::Pending { muted, .. } => {
1537                let old_muted = *muted;
1538                *muted = should_mute;
1539                cx.notify();
1540                Ok((Task::Ready(Some(Ok(()))), old_muted))
1541            }
1542            LocalTrack::Published {
1543                track_publication,
1544                muted,
1545            } => {
1546                let old_muted = *muted;
1547                *muted = should_mute;
1548                cx.notify();
1549                Ok((
1550                    cx.background_executor()
1551                        .spawn(track_publication.set_mute(*muted)),
1552                    old_muted,
1553                ))
1554            }
1555        }?;
1556
1557        if old_muted != should_mute {
1558            if should_mute {
1559                Audio::play_sound(Sound::Mute, cx);
1560            } else {
1561                Audio::play_sound(Sound::Unmute, cx);
1562            }
1563        }
1564
1565        Ok((result, old_muted))
1566    }
1567}
1568
1569enum LocalTrack {
1570    None,
1571    Pending {
1572        publish_id: usize,
1573        muted: bool,
1574    },
1575    Published {
1576        track_publication: LocalTrackPublication,
1577        muted: bool,
1578    },
1579}
1580
1581impl Default for LocalTrack {
1582    fn default() -> Self {
1583        Self::None
1584    }
1585}
1586
1587#[derive(Copy, Clone, PartialEq, Eq)]
1588pub enum RoomStatus {
1589    Online,
1590    Rejoining,
1591    Offline,
1592}
1593
1594impl RoomStatus {
1595    pub fn is_offline(&self) -> bool {
1596        matches!(self, RoomStatus::Offline)
1597    }
1598
1599    pub fn is_online(&self) -> bool {
1600        matches!(self, RoomStatus::Online)
1601    }
1602}