room.rs

   1use crate::participant::{LocalParticipant, ParticipantLocation, RemoteParticipant};
   2use anyhow::{anyhow, Result};
   3use audio::{Audio, Sound};
   4use client::{
   5    proto::{self, PeerId},
   6    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
   7};
   8use collections::{BTreeMap, HashMap, HashSet};
   9use fs::Fs;
  10use futures::{FutureExt, StreamExt};
  11use gpui::{
  12    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  13};
  14use language::LanguageRegistry;
  15use live_kit_client::{
  16    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  17    RemoteVideoTrackUpdate,
  18};
  19use postage::{sink::Sink, stream::Stream, watch};
  20use project::Project;
  21use std::{future::Future, mem, sync::Arc, time::Duration};
  22use util::{post_inc, ResultExt, TryFutureExt};
  23
  24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  25
  26#[derive(Clone, Debug, PartialEq, Eq)]
  27pub enum Event {
  28    ParticipantLocationChanged {
  29        participant_id: proto::PeerId,
  30    },
  31    RemoteVideoTracksChanged {
  32        participant_id: proto::PeerId,
  33    },
  34    RemoteAudioTracksChanged {
  35        participant_id: proto::PeerId,
  36    },
  37    RemoteProjectShared {
  38        owner: Arc<User>,
  39        project_id: u64,
  40        worktree_root_names: Vec<String>,
  41    },
  42    RemoteProjectUnshared {
  43        project_id: u64,
  44    },
  45    RemoteProjectJoined {
  46        project_id: u64,
  47    },
  48    RemoteProjectInvitationDiscarded {
  49        project_id: u64,
  50    },
  51    Left,
  52}
  53
  54pub struct Room {
  55    id: u64,
  56    channel_id: Option<u64>,
  57    live_kit: Option<LiveKitRoom>,
  58    status: RoomStatus,
  59    shared_projects: HashSet<WeakModel<Project>>,
  60    joined_projects: HashSet<WeakModel<Project>>,
  61    local_participant: LocalParticipant,
  62    remote_participants: BTreeMap<u64, RemoteParticipant>,
  63    pending_participants: Vec<Arc<User>>,
  64    participant_user_ids: HashSet<u64>,
  65    pending_call_count: usize,
  66    leave_when_empty: bool,
  67    client: Arc<Client>,
  68    user_store: Model<UserStore>,
  69    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  70    client_subscriptions: Vec<client::Subscription>,
  71    _subscriptions: Vec<gpui::Subscription>,
  72    room_update_completed_tx: watch::Sender<Option<()>>,
  73    room_update_completed_rx: watch::Receiver<Option<()>>,
  74    pending_room_update: Option<Task<()>>,
  75    maintain_connection: Option<Task<Option<()>>>,
  76}
  77
  78impl EventEmitter<Event> for Room {}
  79
  80impl Room {
  81    pub fn channel_id(&self) -> Option<u64> {
  82        self.channel_id
  83    }
  84
  85    pub fn is_sharing_project(&self) -> bool {
  86        !self.shared_projects.is_empty()
  87    }
  88
  89    #[cfg(any(test, feature = "test-support"))]
  90    pub fn is_connected(&self) -> bool {
  91        if let Some(live_kit) = self.live_kit.as_ref() {
  92            matches!(
  93                *live_kit.room.status().borrow(),
  94                live_kit_client::ConnectionState::Connected { .. }
  95            )
  96        } else {
  97            false
  98        }
  99    }
 100
 101    fn new(
 102        id: u64,
 103        channel_id: Option<u64>,
 104        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 105        client: Arc<Client>,
 106        user_store: Model<UserStore>,
 107        cx: &mut ModelContext<Self>,
 108    ) -> Self {
 109        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 110            let room = live_kit_client::Room::new();
 111            let mut status = room.status();
 112            // Consume the initial status of the room.
 113            let _ = status.try_recv();
 114            let _maintain_room = cx.spawn(|this, mut cx| async move {
 115                while let Some(status) = status.next().await {
 116                    let this = if let Some(this) = this.upgrade() {
 117                        this
 118                    } else {
 119                        break;
 120                    };
 121
 122                    if status == live_kit_client::ConnectionState::Disconnected {
 123                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 124                            .ok();
 125                        break;
 126                    }
 127                }
 128            });
 129
 130            let _maintain_video_tracks = cx.spawn({
 131                let room = room.clone();
 132                move |this, mut cx| async move {
 133                    let mut track_video_changes = room.remote_video_track_updates();
 134                    while let Some(track_change) = track_video_changes.next().await {
 135                        let this = if let Some(this) = this.upgrade() {
 136                            this
 137                        } else {
 138                            break;
 139                        };
 140
 141                        this.update(&mut cx, |this, cx| {
 142                            this.remote_video_track_updated(track_change, cx).log_err()
 143                        })
 144                        .ok();
 145                    }
 146                }
 147            });
 148
 149            let _maintain_audio_tracks = cx.spawn({
 150                let room = room.clone();
 151                |this, mut cx| async move {
 152                    let mut track_audio_changes = room.remote_audio_track_updates();
 153                    while let Some(track_change) = track_audio_changes.next().await {
 154                        let this = if let Some(this) = this.upgrade() {
 155                            this
 156                        } else {
 157                            break;
 158                        };
 159
 160                        this.update(&mut cx, |this, cx| {
 161                            this.remote_audio_track_updated(track_change, cx).log_err()
 162                        })
 163                        .ok();
 164                    }
 165                }
 166            });
 167
 168            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 169            cx.spawn(|this, mut cx| async move {
 170                connect.await?;
 171
 172                if !cx.update(|cx| Self::mute_on_join(cx))? {
 173                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 174                        .await?;
 175                }
 176
 177                anyhow::Ok(())
 178            })
 179            .detach_and_log_err(cx);
 180
 181            Some(LiveKitRoom {
 182                room,
 183                screen_track: LocalTrack::None,
 184                microphone_track: LocalTrack::None,
 185                next_publish_id: 0,
 186                muted_by_user: false,
 187                deafened: false,
 188                speaking: false,
 189                _maintain_room,
 190                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 191            })
 192        } else {
 193            None
 194        };
 195
 196        let maintain_connection = cx.spawn({
 197            let client = client.clone();
 198            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 199        });
 200
 201        Audio::play_sound(Sound::Joined, cx);
 202
 203        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 204
 205        Self {
 206            id,
 207            channel_id,
 208            live_kit: live_kit_room,
 209            status: RoomStatus::Online,
 210            shared_projects: Default::default(),
 211            joined_projects: Default::default(),
 212            participant_user_ids: Default::default(),
 213            local_participant: Default::default(),
 214            remote_participants: Default::default(),
 215            pending_participants: Default::default(),
 216            pending_call_count: 0,
 217            client_subscriptions: vec![
 218                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 219            ],
 220            _subscriptions: vec![
 221                cx.on_release(Self::released),
 222                cx.on_app_quit(Self::app_will_quit),
 223            ],
 224            leave_when_empty: false,
 225            pending_room_update: None,
 226            client,
 227            user_store,
 228            follows_by_leader_id_project_id: Default::default(),
 229            maintain_connection: Some(maintain_connection),
 230            room_update_completed_tx,
 231            room_update_completed_rx,
 232        }
 233    }
 234
 235    pub(crate) fn create(
 236        called_user_id: u64,
 237        initial_project: Option<Model<Project>>,
 238        client: Arc<Client>,
 239        user_store: Model<UserStore>,
 240        cx: &mut AppContext,
 241    ) -> Task<Result<Model<Self>>> {
 242        cx.spawn(move |mut cx| async move {
 243            let response = client.request(proto::CreateRoom {}).await?;
 244            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 245            let room = cx.build_model(|cx| {
 246                Self::new(
 247                    room_proto.id,
 248                    None,
 249                    response.live_kit_connection_info,
 250                    client,
 251                    user_store,
 252                    cx,
 253                )
 254            })?;
 255
 256            let initial_project_id = if let Some(initial_project) = initial_project {
 257                let initial_project_id = room
 258                    .update(&mut cx, |room, cx| {
 259                        room.share_project(initial_project.clone(), cx)
 260                    })?
 261                    .await?;
 262                Some(initial_project_id)
 263            } else {
 264                None
 265            };
 266
 267            match room
 268                .update(&mut cx, |room, cx| {
 269                    room.leave_when_empty = true;
 270                    room.call(called_user_id, initial_project_id, cx)
 271                })?
 272                .await
 273            {
 274                Ok(()) => Ok(room),
 275                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 276            }
 277        })
 278    }
 279
 280    pub(crate) async fn join_channel(
 281        channel_id: u64,
 282        client: Arc<Client>,
 283        user_store: Model<UserStore>,
 284        cx: AsyncAppContext,
 285    ) -> Result<Model<Self>> {
 286        Self::from_join_response(
 287            client.request(proto::JoinChannel { channel_id }).await?,
 288            client,
 289            user_store,
 290            cx,
 291        )
 292    }
 293
 294    pub(crate) async fn join(
 295        room_id: u64,
 296        client: Arc<Client>,
 297        user_store: Model<UserStore>,
 298        cx: AsyncAppContext,
 299    ) -> Result<Model<Self>> {
 300        Self::from_join_response(
 301            client.request(proto::JoinRoom { id: room_id }).await?,
 302            client,
 303            user_store,
 304            cx,
 305        )
 306    }
 307
 308    fn released(&mut self, cx: &mut AppContext) {
 309        if self.status.is_online() {
 310            self.leave_internal(cx).detach_and_log_err(cx);
 311        }
 312    }
 313
 314    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 315        let task = if self.status.is_online() {
 316            let leave = self.leave_internal(cx);
 317            Some(cx.background_executor().spawn(async move {
 318                leave.await.log_err();
 319            }))
 320        } else {
 321            None
 322        };
 323
 324        async move {
 325            if let Some(task) = task {
 326                task.await;
 327            }
 328        }
 329    }
 330
 331    pub fn mute_on_join(_cx: &AppContext) -> bool {
 332        // todo!() po: This should be uncommented, though then unmuting does not work
 333        false
 334        //CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 335    }
 336
 337    fn from_join_response(
 338        response: proto::JoinRoomResponse,
 339        client: Arc<Client>,
 340        user_store: Model<UserStore>,
 341        mut cx: AsyncAppContext,
 342    ) -> Result<Model<Self>> {
 343        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 344        let room = cx.build_model(|cx| {
 345            Self::new(
 346                room_proto.id,
 347                response.channel_id,
 348                response.live_kit_connection_info,
 349                client,
 350                user_store,
 351                cx,
 352            )
 353        })?;
 354        room.update(&mut cx, |room, cx| {
 355            room.leave_when_empty = room.channel_id.is_none();
 356            room.apply_room_update(room_proto, cx)?;
 357            anyhow::Ok(())
 358        })??;
 359        Ok(room)
 360    }
 361
 362    fn should_leave(&self) -> bool {
 363        self.leave_when_empty
 364            && self.pending_room_update.is_none()
 365            && self.pending_participants.is_empty()
 366            && self.remote_participants.is_empty()
 367            && self.pending_call_count == 0
 368    }
 369
 370    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 371        cx.notify();
 372        cx.emit(Event::Left);
 373        self.leave_internal(cx)
 374    }
 375
 376    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 377        if self.status.is_offline() {
 378            return Task::ready(Err(anyhow!("room is offline")));
 379        }
 380
 381        log::info!("leaving room");
 382        Audio::play_sound(Sound::Leave, cx);
 383
 384        self.clear_state(cx);
 385
 386        let leave_room = self.client.request(proto::LeaveRoom {});
 387        cx.background_executor().spawn(async move {
 388            leave_room.await?;
 389            anyhow::Ok(())
 390        })
 391    }
 392
 393    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 394        for project in self.shared_projects.drain() {
 395            if let Some(project) = project.upgrade() {
 396                project.update(cx, |project, cx| {
 397                    project.unshare(cx).log_err();
 398                });
 399            }
 400        }
 401        for project in self.joined_projects.drain() {
 402            if let Some(project) = project.upgrade() {
 403                project.update(cx, |project, cx| {
 404                    project.disconnected_from_host(cx);
 405                    project.close(cx);
 406                });
 407            }
 408        }
 409
 410        self.status = RoomStatus::Offline;
 411        self.remote_participants.clear();
 412        self.pending_participants.clear();
 413        self.participant_user_ids.clear();
 414        self.client_subscriptions.clear();
 415        self.live_kit.take();
 416        self.pending_room_update.take();
 417        self.maintain_connection.take();
 418    }
 419
 420    async fn maintain_connection(
 421        this: WeakModel<Self>,
 422        client: Arc<Client>,
 423        mut cx: AsyncAppContext,
 424    ) -> Result<()> {
 425        let mut client_status = client.status();
 426        loop {
 427            let _ = client_status.try_recv();
 428            let is_connected = client_status.borrow().is_connected();
 429            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 430            if !is_connected || client_status.next().await.is_some() {
 431                log::info!("detected client disconnection");
 432
 433                this.upgrade()
 434                    .ok_or_else(|| anyhow!("room was dropped"))?
 435                    .update(&mut cx, |this, cx| {
 436                        this.status = RoomStatus::Rejoining;
 437                        cx.notify();
 438                    })?;
 439
 440                // Wait for client to re-establish a connection to the server.
 441                {
 442                    let mut reconnection_timeout =
 443                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 444                    let client_reconnection = async {
 445                        let mut remaining_attempts = 3;
 446                        while remaining_attempts > 0 {
 447                            if client_status.borrow().is_connected() {
 448                                log::info!("client reconnected, attempting to rejoin room");
 449
 450                                let Some(this) = this.upgrade() else { break };
 451                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 452                                    Ok(task) => {
 453                                        if task.await.log_err().is_some() {
 454                                            return true;
 455                                        } else {
 456                                            remaining_attempts -= 1;
 457                                        }
 458                                    }
 459                                    Err(_app_dropped) => return false,
 460                                }
 461                            } else if client_status.borrow().is_signed_out() {
 462                                return false;
 463                            }
 464
 465                            log::info!(
 466                                "waiting for client status change, remaining attempts {}",
 467                                remaining_attempts
 468                            );
 469                            client_status.next().await;
 470                        }
 471                        false
 472                    }
 473                    .fuse();
 474                    futures::pin_mut!(client_reconnection);
 475
 476                    futures::select_biased! {
 477                        reconnected = client_reconnection => {
 478                            if reconnected {
 479                                log::info!("successfully reconnected to room");
 480                                // If we successfully joined the room, go back around the loop
 481                                // waiting for future connection status changes.
 482                                continue;
 483                            }
 484                        }
 485                        _ = reconnection_timeout => {
 486                            log::info!("room reconnection timeout expired");
 487                        }
 488                    }
 489                }
 490
 491                break;
 492            }
 493        }
 494
 495        // The client failed to re-establish a connection to the server
 496        // or an error occurred while trying to re-join the room. Either way
 497        // we leave the room and return an error.
 498        if let Some(this) = this.upgrade() {
 499            log::info!("reconnection failed, leaving room");
 500            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 501        }
 502        Err(anyhow!(
 503            "can't reconnect to room: client failed to re-establish connection"
 504        ))
 505    }
 506
 507    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 508        let mut projects = HashMap::default();
 509        let mut reshared_projects = Vec::new();
 510        let mut rejoined_projects = Vec::new();
 511        self.shared_projects.retain(|project| {
 512            if let Some(handle) = project.upgrade() {
 513                let project = handle.read(cx);
 514                if let Some(project_id) = project.remote_id() {
 515                    projects.insert(project_id, handle.clone());
 516                    reshared_projects.push(proto::UpdateProject {
 517                        project_id,
 518                        worktrees: project.worktree_metadata_protos(cx),
 519                    });
 520                    return true;
 521                }
 522            }
 523            false
 524        });
 525        self.joined_projects.retain(|project| {
 526            if let Some(handle) = project.upgrade() {
 527                let project = handle.read(cx);
 528                if let Some(project_id) = project.remote_id() {
 529                    projects.insert(project_id, handle.clone());
 530                    rejoined_projects.push(proto::RejoinProject {
 531                        id: project_id,
 532                        worktrees: project
 533                            .worktrees()
 534                            .map(|worktree| {
 535                                let worktree = worktree.read(cx);
 536                                proto::RejoinWorktree {
 537                                    id: worktree.id().to_proto(),
 538                                    scan_id: worktree.completed_scan_id() as u64,
 539                                }
 540                            })
 541                            .collect(),
 542                    });
 543                }
 544                return true;
 545            }
 546            false
 547        });
 548
 549        let response = self.client.request_envelope(proto::RejoinRoom {
 550            id: self.id,
 551            reshared_projects,
 552            rejoined_projects,
 553        });
 554
 555        cx.spawn(|this, mut cx| async move {
 556            let response = response.await?;
 557            let message_id = response.message_id;
 558            let response = response.payload;
 559            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 560            this.update(&mut cx, |this, cx| {
 561                this.status = RoomStatus::Online;
 562                this.apply_room_update(room_proto, cx)?;
 563
 564                for reshared_project in response.reshared_projects {
 565                    if let Some(project) = projects.get(&reshared_project.id) {
 566                        project.update(cx, |project, cx| {
 567                            project.reshared(reshared_project, cx).log_err();
 568                        });
 569                    }
 570                }
 571
 572                for rejoined_project in response.rejoined_projects {
 573                    if let Some(project) = projects.get(&rejoined_project.id) {
 574                        project.update(cx, |project, cx| {
 575                            project.rejoined(rejoined_project, message_id, cx).log_err();
 576                        });
 577                    }
 578                }
 579
 580                anyhow::Ok(())
 581            })?
 582        })
 583    }
 584
 585    pub fn id(&self) -> u64 {
 586        self.id
 587    }
 588
 589    pub fn status(&self) -> RoomStatus {
 590        self.status
 591    }
 592
 593    pub fn local_participant(&self) -> &LocalParticipant {
 594        &self.local_participant
 595    }
 596
 597    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 598        &self.remote_participants
 599    }
 600
 601    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 602        self.remote_participants
 603            .values()
 604            .find(|p| p.peer_id == peer_id)
 605    }
 606
 607    pub fn pending_participants(&self) -> &[Arc<User>] {
 608        &self.pending_participants
 609    }
 610
 611    pub fn contains_participant(&self, user_id: u64) -> bool {
 612        self.participant_user_ids.contains(&user_id)
 613    }
 614
 615    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 616        self.follows_by_leader_id_project_id
 617            .get(&(leader_id, project_id))
 618            .map_or(&[], |v| v.as_slice())
 619    }
 620
 621    /// Returns the most 'active' projects, defined as most people in the project
 622    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 623        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 624        for participant in self.remote_participants.values() {
 625            match participant.location {
 626                ParticipantLocation::SharedProject { project_id } => {
 627                    project_hosts_and_guest_counts
 628                        .entry(project_id)
 629                        .or_default()
 630                        .1 += 1;
 631                }
 632                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 633            }
 634            for project in &participant.projects {
 635                project_hosts_and_guest_counts
 636                    .entry(project.id)
 637                    .or_default()
 638                    .0 = Some(participant.user.id);
 639            }
 640        }
 641
 642        if let Some(user) = self.user_store.read(cx).current_user() {
 643            for project in &self.local_participant.projects {
 644                project_hosts_and_guest_counts
 645                    .entry(project.id)
 646                    .or_default()
 647                    .0 = Some(user.id);
 648            }
 649        }
 650
 651        project_hosts_and_guest_counts
 652            .into_iter()
 653            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 654            .max_by_key(|(_, _, guest_count)| *guest_count)
 655            .map(|(id, host, _)| (id, host))
 656    }
 657
 658    async fn handle_room_updated(
 659        this: Model<Self>,
 660        envelope: TypedEnvelope<proto::RoomUpdated>,
 661        _: Arc<Client>,
 662        mut cx: AsyncAppContext,
 663    ) -> Result<()> {
 664        let room = envelope
 665            .payload
 666            .room
 667            .ok_or_else(|| anyhow!("invalid room"))?;
 668        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 669    }
 670
 671    fn apply_room_update(
 672        &mut self,
 673        mut room: proto::Room,
 674        cx: &mut ModelContext<Self>,
 675    ) -> Result<()> {
 676        // Filter ourselves out from the room's participants.
 677        let local_participant_ix = room
 678            .participants
 679            .iter()
 680            .position(|participant| Some(participant.user_id) == self.client.user_id());
 681        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 682
 683        let pending_participant_user_ids = room
 684            .pending_participants
 685            .iter()
 686            .map(|p| p.user_id)
 687            .collect::<Vec<_>>();
 688
 689        let remote_participant_user_ids = room
 690            .participants
 691            .iter()
 692            .map(|p| p.user_id)
 693            .collect::<Vec<_>>();
 694
 695        let (remote_participants, pending_participants) =
 696            self.user_store.update(cx, move |user_store, cx| {
 697                (
 698                    user_store.get_users(remote_participant_user_ids, cx),
 699                    user_store.get_users(pending_participant_user_ids, cx),
 700                )
 701            });
 702
 703        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 704            let (remote_participants, pending_participants) =
 705                futures::join!(remote_participants, pending_participants);
 706
 707            this.update(&mut cx, |this, cx| {
 708                this.participant_user_ids.clear();
 709
 710                if let Some(participant) = local_participant {
 711                    this.local_participant.projects = participant.projects;
 712                } else {
 713                    this.local_participant.projects.clear();
 714                }
 715
 716                if let Some(participants) = remote_participants.log_err() {
 717                    for (participant, user) in room.participants.into_iter().zip(participants) {
 718                        let Some(peer_id) = participant.peer_id else {
 719                            continue;
 720                        };
 721                        let participant_index = ParticipantIndex(participant.participant_index);
 722                        this.participant_user_ids.insert(participant.user_id);
 723
 724                        let old_projects = this
 725                            .remote_participants
 726                            .get(&participant.user_id)
 727                            .into_iter()
 728                            .flat_map(|existing| &existing.projects)
 729                            .map(|project| project.id)
 730                            .collect::<HashSet<_>>();
 731                        let new_projects = participant
 732                            .projects
 733                            .iter()
 734                            .map(|project| project.id)
 735                            .collect::<HashSet<_>>();
 736
 737                        for project in &participant.projects {
 738                            if !old_projects.contains(&project.id) {
 739                                cx.emit(Event::RemoteProjectShared {
 740                                    owner: user.clone(),
 741                                    project_id: project.id,
 742                                    worktree_root_names: project.worktree_root_names.clone(),
 743                                });
 744                            }
 745                        }
 746
 747                        for unshared_project_id in old_projects.difference(&new_projects) {
 748                            this.joined_projects.retain(|project| {
 749                                if let Some(project) = project.upgrade() {
 750                                    project.update(cx, |project, cx| {
 751                                        if project.remote_id() == Some(*unshared_project_id) {
 752                                            project.disconnected_from_host(cx);
 753                                            false
 754                                        } else {
 755                                            true
 756                                        }
 757                                    })
 758                                } else {
 759                                    false
 760                                }
 761                            });
 762                            cx.emit(Event::RemoteProjectUnshared {
 763                                project_id: *unshared_project_id,
 764                            });
 765                        }
 766
 767                        let location = ParticipantLocation::from_proto(participant.location)
 768                            .unwrap_or(ParticipantLocation::External);
 769                        if let Some(remote_participant) =
 770                            this.remote_participants.get_mut(&participant.user_id)
 771                        {
 772                            remote_participant.peer_id = peer_id;
 773                            remote_participant.projects = participant.projects;
 774                            remote_participant.participant_index = participant_index;
 775                            if location != remote_participant.location {
 776                                remote_participant.location = location;
 777                                cx.emit(Event::ParticipantLocationChanged {
 778                                    participant_id: peer_id,
 779                                });
 780                            }
 781                        } else {
 782                            this.remote_participants.insert(
 783                                participant.user_id,
 784                                RemoteParticipant {
 785                                    user: user.clone(),
 786                                    participant_index,
 787                                    peer_id,
 788                                    projects: participant.projects,
 789                                    location,
 790                                    muted: true,
 791                                    speaking: false,
 792                                    video_tracks: Default::default(),
 793                                    audio_tracks: Default::default(),
 794                                },
 795                            );
 796
 797                            Audio::play_sound(Sound::Joined, cx);
 798
 799                            if let Some(live_kit) = this.live_kit.as_ref() {
 800                                let video_tracks =
 801                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 802                                let audio_tracks =
 803                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 804                                let publications = live_kit
 805                                    .room
 806                                    .remote_audio_track_publications(&user.id.to_string());
 807
 808                                for track in video_tracks {
 809                                    this.remote_video_track_updated(
 810                                        RemoteVideoTrackUpdate::Subscribed(track),
 811                                        cx,
 812                                    )
 813                                    .log_err();
 814                                }
 815
 816                                for (track, publication) in
 817                                    audio_tracks.iter().zip(publications.iter())
 818                                {
 819                                    this.remote_audio_track_updated(
 820                                        RemoteAudioTrackUpdate::Subscribed(
 821                                            track.clone(),
 822                                            publication.clone(),
 823                                        ),
 824                                        cx,
 825                                    )
 826                                    .log_err();
 827                                }
 828                            }
 829                        }
 830                    }
 831
 832                    this.remote_participants.retain(|user_id, participant| {
 833                        if this.participant_user_ids.contains(user_id) {
 834                            true
 835                        } else {
 836                            for project in &participant.projects {
 837                                cx.emit(Event::RemoteProjectUnshared {
 838                                    project_id: project.id,
 839                                });
 840                            }
 841                            false
 842                        }
 843                    });
 844                }
 845
 846                if let Some(pending_participants) = pending_participants.log_err() {
 847                    this.pending_participants = pending_participants;
 848                    for participant in &this.pending_participants {
 849                        this.participant_user_ids.insert(participant.id);
 850                    }
 851                }
 852
 853                this.follows_by_leader_id_project_id.clear();
 854                for follower in room.followers {
 855                    let project_id = follower.project_id;
 856                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 857                        (Some(leader), Some(follower)) => (leader, follower),
 858
 859                        _ => {
 860                            log::error!("Follower message {follower:?} missing some state");
 861                            continue;
 862                        }
 863                    };
 864
 865                    let list = this
 866                        .follows_by_leader_id_project_id
 867                        .entry((leader, project_id))
 868                        .or_insert(Vec::new());
 869                    if !list.contains(&follower) {
 870                        list.push(follower);
 871                    }
 872                }
 873
 874                this.pending_room_update.take();
 875                if this.should_leave() {
 876                    log::info!("room is empty, leaving");
 877                    let _ = this.leave(cx);
 878                }
 879
 880                this.user_store.update(cx, |user_store, cx| {
 881                    let participant_indices_by_user_id = this
 882                        .remote_participants
 883                        .iter()
 884                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 885                        .collect();
 886                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 887                });
 888
 889                this.check_invariants();
 890                this.room_update_completed_tx.try_send(Some(())).ok();
 891                cx.notify();
 892            })
 893            .ok();
 894        }));
 895
 896        cx.notify();
 897        Ok(())
 898    }
 899
 900    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 901        let mut done_rx = self.room_update_completed_rx.clone();
 902        async move {
 903            while let Some(result) = done_rx.next().await {
 904                if result.is_some() {
 905                    break;
 906                }
 907            }
 908        }
 909    }
 910
 911    fn remote_video_track_updated(
 912        &mut self,
 913        change: RemoteVideoTrackUpdate,
 914        cx: &mut ModelContext<Self>,
 915    ) -> Result<()> {
 916        match change {
 917            RemoteVideoTrackUpdate::Subscribed(track) => {
 918                let user_id = track.publisher_id().parse()?;
 919                let track_id = track.sid().to_string();
 920                let participant = self
 921                    .remote_participants
 922                    .get_mut(&user_id)
 923                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 924                participant.video_tracks.insert(track_id.clone(), track);
 925                cx.emit(Event::RemoteVideoTracksChanged {
 926                    participant_id: participant.peer_id,
 927                });
 928            }
 929            RemoteVideoTrackUpdate::Unsubscribed {
 930                publisher_id,
 931                track_id,
 932            } => {
 933                let user_id = publisher_id.parse()?;
 934                let participant = self
 935                    .remote_participants
 936                    .get_mut(&user_id)
 937                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 938                participant.video_tracks.remove(&track_id);
 939                cx.emit(Event::RemoteVideoTracksChanged {
 940                    participant_id: participant.peer_id,
 941                });
 942            }
 943        }
 944
 945        cx.notify();
 946        Ok(())
 947    }
 948
 949    fn remote_audio_track_updated(
 950        &mut self,
 951        change: RemoteAudioTrackUpdate,
 952        cx: &mut ModelContext<Self>,
 953    ) -> Result<()> {
 954        match change {
 955            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 956                let mut speaker_ids = speakers
 957                    .into_iter()
 958                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 959                    .collect::<Vec<u64>>();
 960                speaker_ids.sort_unstable();
 961                for (sid, participant) in &mut self.remote_participants {
 962                    if let Ok(_) = speaker_ids.binary_search(sid) {
 963                        participant.speaking = true;
 964                    } else {
 965                        participant.speaking = false;
 966                    }
 967                }
 968                if let Some(id) = self.client.user_id() {
 969                    if let Some(room) = &mut self.live_kit {
 970                        if let Ok(_) = speaker_ids.binary_search(&id) {
 971                            room.speaking = true;
 972                        } else {
 973                            room.speaking = false;
 974                        }
 975                    }
 976                }
 977                cx.notify();
 978            }
 979            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 980                let mut found = false;
 981                for participant in &mut self.remote_participants.values_mut() {
 982                    for track in participant.audio_tracks.values() {
 983                        if track.sid() == track_id {
 984                            found = true;
 985                            break;
 986                        }
 987                    }
 988                    if found {
 989                        participant.muted = muted;
 990                        break;
 991                    }
 992                }
 993
 994                cx.notify();
 995            }
 996            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 997                let user_id = track.publisher_id().parse()?;
 998                let track_id = track.sid().to_string();
 999                let participant = self
1000                    .remote_participants
1001                    .get_mut(&user_id)
1002                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1003                participant.audio_tracks.insert(track_id.clone(), track);
1004                participant.muted = publication.is_muted();
1005
1006                cx.emit(Event::RemoteAudioTracksChanged {
1007                    participant_id: participant.peer_id,
1008                });
1009            }
1010            RemoteAudioTrackUpdate::Unsubscribed {
1011                publisher_id,
1012                track_id,
1013            } => {
1014                let user_id = publisher_id.parse()?;
1015                let participant = self
1016                    .remote_participants
1017                    .get_mut(&user_id)
1018                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1019                participant.audio_tracks.remove(&track_id);
1020                cx.emit(Event::RemoteAudioTracksChanged {
1021                    participant_id: participant.peer_id,
1022                });
1023            }
1024        }
1025
1026        cx.notify();
1027        Ok(())
1028    }
1029
1030    fn check_invariants(&self) {
1031        #[cfg(any(test, feature = "test-support"))]
1032        {
1033            for participant in self.remote_participants.values() {
1034                assert!(self.participant_user_ids.contains(&participant.user.id));
1035                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1036            }
1037
1038            for participant in &self.pending_participants {
1039                assert!(self.participant_user_ids.contains(&participant.id));
1040                assert_ne!(participant.id, self.client.user_id().unwrap());
1041            }
1042
1043            assert_eq!(
1044                self.participant_user_ids.len(),
1045                self.remote_participants.len() + self.pending_participants.len()
1046            );
1047        }
1048    }
1049
1050    pub(crate) fn call(
1051        &mut self,
1052        called_user_id: u64,
1053        initial_project_id: Option<u64>,
1054        cx: &mut ModelContext<Self>,
1055    ) -> Task<Result<()>> {
1056        if self.status.is_offline() {
1057            return Task::ready(Err(anyhow!("room is offline")));
1058        }
1059
1060        cx.notify();
1061        let client = self.client.clone();
1062        let room_id = self.id;
1063        self.pending_call_count += 1;
1064        cx.spawn(move |this, mut cx| async move {
1065            let result = client
1066                .request(proto::Call {
1067                    room_id,
1068                    called_user_id,
1069                    initial_project_id,
1070                })
1071                .await;
1072            this.update(&mut cx, |this, cx| {
1073                this.pending_call_count -= 1;
1074                if this.should_leave() {
1075                    this.leave(cx).detach_and_log_err(cx);
1076                }
1077            })?;
1078            result?;
1079            Ok(())
1080        })
1081    }
1082
1083    pub fn join_project(
1084        &mut self,
1085        id: u64,
1086        language_registry: Arc<LanguageRegistry>,
1087        fs: Arc<dyn Fs>,
1088        cx: &mut ModelContext<Self>,
1089    ) -> Task<Result<Model<Project>>> {
1090        let client = self.client.clone();
1091        let user_store = self.user_store.clone();
1092        cx.emit(Event::RemoteProjectJoined { project_id: id });
1093        cx.spawn(move |this, mut cx| async move {
1094            let project =
1095                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1096
1097            this.update(&mut cx, |this, cx| {
1098                this.joined_projects.retain(|project| {
1099                    if let Some(project) = project.upgrade() {
1100                        !project.read(cx).is_read_only()
1101                    } else {
1102                        false
1103                    }
1104                });
1105                this.joined_projects.insert(project.downgrade());
1106            })?;
1107            Ok(project)
1108        })
1109    }
1110
1111    pub(crate) fn share_project(
1112        &mut self,
1113        project: Model<Project>,
1114        cx: &mut ModelContext<Self>,
1115    ) -> Task<Result<u64>> {
1116        if let Some(project_id) = project.read(cx).remote_id() {
1117            return Task::ready(Ok(project_id));
1118        }
1119
1120        let request = self.client.request(proto::ShareProject {
1121            room_id: self.id(),
1122            worktrees: project.read(cx).worktree_metadata_protos(cx),
1123        });
1124        cx.spawn(|this, mut cx| async move {
1125            let response = request.await?;
1126
1127            project.update(&mut cx, |project, cx| {
1128                project.shared(response.project_id, cx)
1129            })??;
1130
1131            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1132            this.update(&mut cx, |this, cx| {
1133                this.shared_projects.insert(project.downgrade());
1134                let active_project = this.local_participant.active_project.as_ref();
1135                if active_project.map_or(false, |location| *location == project) {
1136                    this.set_location(Some(&project), cx)
1137                } else {
1138                    Task::ready(Ok(()))
1139                }
1140            })?
1141            .await?;
1142
1143            Ok(response.project_id)
1144        })
1145    }
1146
1147    pub(crate) fn unshare_project(
1148        &mut self,
1149        project: Model<Project>,
1150        cx: &mut ModelContext<Self>,
1151    ) -> Result<()> {
1152        let project_id = match project.read(cx).remote_id() {
1153            Some(project_id) => project_id,
1154            None => return Ok(()),
1155        };
1156
1157        self.client.send(proto::UnshareProject { project_id })?;
1158        project.update(cx, |this, cx| this.unshare(cx))
1159    }
1160
1161    pub(crate) fn set_location(
1162        &mut self,
1163        project: Option<&Model<Project>>,
1164        cx: &mut ModelContext<Self>,
1165    ) -> Task<Result<()>> {
1166        if self.status.is_offline() {
1167            return Task::ready(Err(anyhow!("room is offline")));
1168        }
1169
1170        let client = self.client.clone();
1171        let room_id = self.id;
1172        let location = if let Some(project) = project {
1173            self.local_participant.active_project = Some(project.downgrade());
1174            if let Some(project_id) = project.read(cx).remote_id() {
1175                proto::participant_location::Variant::SharedProject(
1176                    proto::participant_location::SharedProject { id: project_id },
1177                )
1178            } else {
1179                proto::participant_location::Variant::UnsharedProject(
1180                    proto::participant_location::UnsharedProject {},
1181                )
1182            }
1183        } else {
1184            self.local_participant.active_project = None;
1185            proto::participant_location::Variant::External(proto::participant_location::External {})
1186        };
1187
1188        cx.notify();
1189        cx.background_executor().spawn(async move {
1190            client
1191                .request(proto::UpdateParticipantLocation {
1192                    room_id,
1193                    location: Some(proto::ParticipantLocation {
1194                        variant: Some(location),
1195                    }),
1196                })
1197                .await?;
1198            Ok(())
1199        })
1200    }
1201
1202    pub fn is_screen_sharing(&self) -> bool {
1203        self.live_kit.as_ref().map_or(false, |live_kit| {
1204            !matches!(live_kit.screen_track, LocalTrack::None)
1205        })
1206    }
1207
1208    pub fn is_sharing_mic(&self) -> bool {
1209        self.live_kit.as_ref().map_or(false, |live_kit| {
1210            !matches!(live_kit.microphone_track, LocalTrack::None)
1211        })
1212    }
1213
1214    pub fn is_muted(&self, cx: &AppContext) -> bool {
1215        self.live_kit
1216            .as_ref()
1217            .and_then(|live_kit| match &live_kit.microphone_track {
1218                LocalTrack::None => Some(Self::mute_on_join(cx)),
1219                LocalTrack::Pending { muted, .. } => Some(*muted),
1220                LocalTrack::Published { muted, .. } => Some(*muted),
1221            })
1222            .unwrap_or(false)
1223    }
1224
1225    pub fn is_speaking(&self) -> bool {
1226        self.live_kit
1227            .as_ref()
1228            .map_or(false, |live_kit| live_kit.speaking)
1229    }
1230
1231    pub fn is_deafened(&self) -> Option<bool> {
1232        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1233    }
1234
1235    #[track_caller]
1236    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1237        if self.status.is_offline() {
1238            return Task::ready(Err(anyhow!("room is offline")));
1239        } else if self.is_sharing_mic() {
1240            return Task::ready(Err(anyhow!("microphone was already shared")));
1241        }
1242
1243        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1244            let publish_id = post_inc(&mut live_kit.next_publish_id);
1245            live_kit.microphone_track = LocalTrack::Pending {
1246                publish_id,
1247                muted: false,
1248            };
1249            cx.notify();
1250            publish_id
1251        } else {
1252            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1253        };
1254
1255        cx.spawn(move |this, mut cx| async move {
1256            let publish_track = async {
1257                let track = LocalAudioTrack::create();
1258                this.upgrade()
1259                    .ok_or_else(|| anyhow!("room was dropped"))?
1260                    .update(&mut cx, |this, _| {
1261                        this.live_kit
1262                            .as_ref()
1263                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1264                    })?
1265                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1266                    .await
1267            };
1268
1269            let publication = publish_track.await;
1270            this.upgrade()
1271                .ok_or_else(|| anyhow!("room was dropped"))?
1272                .update(&mut cx, |this, cx| {
1273                    let live_kit = this
1274                        .live_kit
1275                        .as_mut()
1276                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1277
1278                    let (canceled, muted) = if let LocalTrack::Pending {
1279                        publish_id: cur_publish_id,
1280                        muted,
1281                    } = &live_kit.microphone_track
1282                    {
1283                        (*cur_publish_id != publish_id, *muted)
1284                    } else {
1285                        (true, false)
1286                    };
1287
1288                    match publication {
1289                        Ok(publication) => {
1290                            if canceled {
1291                                live_kit.room.unpublish_track(publication);
1292                            } else {
1293                                if muted {
1294                                    cx.background_executor()
1295                                        .spawn(publication.set_mute(muted))
1296                                        .detach();
1297                                }
1298                                live_kit.microphone_track = LocalTrack::Published {
1299                                    track_publication: publication,
1300                                    muted,
1301                                };
1302                                cx.notify();
1303                            }
1304                            Ok(())
1305                        }
1306                        Err(error) => {
1307                            if canceled {
1308                                Ok(())
1309                            } else {
1310                                live_kit.microphone_track = LocalTrack::None;
1311                                cx.notify();
1312                                Err(error)
1313                            }
1314                        }
1315                    }
1316                })?
1317        })
1318    }
1319
1320    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1321        if self.status.is_offline() {
1322            return Task::ready(Err(anyhow!("room is offline")));
1323        } else if self.is_screen_sharing() {
1324            return Task::ready(Err(anyhow!("screen was already shared")));
1325        }
1326
1327        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1328            let publish_id = post_inc(&mut live_kit.next_publish_id);
1329            live_kit.screen_track = LocalTrack::Pending {
1330                publish_id,
1331                muted: false,
1332            };
1333            cx.notify();
1334            (live_kit.room.display_sources(), publish_id)
1335        } else {
1336            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1337        };
1338
1339        cx.spawn(move |this, mut cx| async move {
1340            let publish_track = async {
1341                let displays = displays.await?;
1342                let display = displays
1343                    .first()
1344                    .ok_or_else(|| anyhow!("no display found"))?;
1345                let track = LocalVideoTrack::screen_share_for_display(&display);
1346                this.upgrade()
1347                    .ok_or_else(|| anyhow!("room was dropped"))?
1348                    .update(&mut cx, |this, _| {
1349                        this.live_kit
1350                            .as_ref()
1351                            .map(|live_kit| live_kit.room.publish_video_track(track))
1352                    })?
1353                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1354                    .await
1355            };
1356
1357            let publication = publish_track.await;
1358            this.upgrade()
1359                .ok_or_else(|| anyhow!("room was dropped"))?
1360                .update(&mut cx, |this, cx| {
1361                    let live_kit = this
1362                        .live_kit
1363                        .as_mut()
1364                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1365
1366                    let (canceled, muted) = if let LocalTrack::Pending {
1367                        publish_id: cur_publish_id,
1368                        muted,
1369                    } = &live_kit.screen_track
1370                    {
1371                        (*cur_publish_id != publish_id, *muted)
1372                    } else {
1373                        (true, false)
1374                    };
1375
1376                    match publication {
1377                        Ok(publication) => {
1378                            if canceled {
1379                                live_kit.room.unpublish_track(publication);
1380                            } else {
1381                                if muted {
1382                                    cx.background_executor()
1383                                        .spawn(publication.set_mute(muted))
1384                                        .detach();
1385                                }
1386                                live_kit.screen_track = LocalTrack::Published {
1387                                    track_publication: publication,
1388                                    muted,
1389                                };
1390                                cx.notify();
1391                            }
1392
1393                            Audio::play_sound(Sound::StartScreenshare, cx);
1394
1395                            Ok(())
1396                        }
1397                        Err(error) => {
1398                            if canceled {
1399                                Ok(())
1400                            } else {
1401                                live_kit.screen_track = LocalTrack::None;
1402                                cx.notify();
1403                                Err(error)
1404                            }
1405                        }
1406                    }
1407                })?
1408        })
1409    }
1410
1411    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1412        let should_mute = !self.is_muted(cx);
1413        if let Some(live_kit) = self.live_kit.as_mut() {
1414            if matches!(live_kit.microphone_track, LocalTrack::None) {
1415                return Ok(self.share_microphone(cx));
1416            }
1417
1418            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1419            live_kit.muted_by_user = should_mute;
1420
1421            if old_muted == true && live_kit.deafened == true {
1422                if let Some(task) = self.toggle_deafen(cx).ok() {
1423                    task.detach();
1424                }
1425            }
1426
1427            Ok(ret_task)
1428        } else {
1429            Err(anyhow!("LiveKit not started"))
1430        }
1431    }
1432
1433    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1434        if let Some(live_kit) = self.live_kit.as_mut() {
1435            (*live_kit).deafened = !live_kit.deafened;
1436
1437            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1438            // Context notification is sent within set_mute itself.
1439            let mut mute_task = None;
1440            // When deafening, mute user's mic as well.
1441            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1442            if live_kit.deafened || !live_kit.muted_by_user {
1443                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1444            };
1445            for participant in self.remote_participants.values() {
1446                for track in live_kit
1447                    .room
1448                    .remote_audio_track_publications(&participant.user.id.to_string())
1449                {
1450                    let deafened = live_kit.deafened;
1451                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1452                }
1453            }
1454
1455            Ok(cx.foreground_executor().spawn(async move {
1456                if let Some(mute_task) = mute_task {
1457                    mute_task.await?;
1458                }
1459                for task in tasks {
1460                    task.await?;
1461                }
1462                Ok(())
1463            }))
1464        } else {
1465            Err(anyhow!("LiveKit not started"))
1466        }
1467    }
1468
1469    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1470        if self.status.is_offline() {
1471            return Err(anyhow!("room is offline"));
1472        }
1473
1474        let live_kit = self
1475            .live_kit
1476            .as_mut()
1477            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1478        match mem::take(&mut live_kit.screen_track) {
1479            LocalTrack::None => Err(anyhow!("screen was not shared")),
1480            LocalTrack::Pending { .. } => {
1481                cx.notify();
1482                Ok(())
1483            }
1484            LocalTrack::Published {
1485                track_publication, ..
1486            } => {
1487                live_kit.room.unpublish_track(track_publication);
1488                cx.notify();
1489
1490                Audio::play_sound(Sound::StopScreenshare, cx);
1491                Ok(())
1492            }
1493        }
1494    }
1495
1496    #[cfg(any(test, feature = "test-support"))]
1497    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1498        self.live_kit
1499            .as_ref()
1500            .unwrap()
1501            .room
1502            .set_display_sources(sources);
1503    }
1504}
1505
1506struct LiveKitRoom {
1507    room: Arc<live_kit_client::Room>,
1508    screen_track: LocalTrack,
1509    microphone_track: LocalTrack,
1510    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1511    muted_by_user: bool,
1512    deafened: bool,
1513    speaking: bool,
1514    next_publish_id: usize,
1515    _maintain_room: Task<()>,
1516    _maintain_tracks: [Task<()>; 2],
1517}
1518
1519impl LiveKitRoom {
1520    fn set_mute(
1521        self: &mut LiveKitRoom,
1522        should_mute: bool,
1523        cx: &mut ModelContext<Room>,
1524    ) -> Result<(Task<Result<()>>, bool)> {
1525        if !should_mute {
1526            // clear user muting state.
1527            self.muted_by_user = false;
1528        }
1529
1530        let (result, old_muted) = match &mut self.microphone_track {
1531            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1532            LocalTrack::Pending { muted, .. } => {
1533                let old_muted = *muted;
1534                *muted = should_mute;
1535                cx.notify();
1536                Ok((Task::Ready(Some(Ok(()))), old_muted))
1537            }
1538            LocalTrack::Published {
1539                track_publication,
1540                muted,
1541            } => {
1542                let old_muted = *muted;
1543                *muted = should_mute;
1544                cx.notify();
1545                Ok((
1546                    cx.background_executor()
1547                        .spawn(track_publication.set_mute(*muted)),
1548                    old_muted,
1549                ))
1550            }
1551        }?;
1552
1553        if old_muted != should_mute {
1554            if should_mute {
1555                Audio::play_sound(Sound::Mute, cx);
1556            } else {
1557                Audio::play_sound(Sound::Unmute, cx);
1558            }
1559        }
1560
1561        Ok((result, old_muted))
1562    }
1563}
1564
1565enum LocalTrack {
1566    None,
1567    Pending {
1568        publish_id: usize,
1569        muted: bool,
1570    },
1571    Published {
1572        track_publication: LocalTrackPublication,
1573        muted: bool,
1574    },
1575}
1576
1577impl Default for LocalTrack {
1578    fn default() -> Self {
1579        Self::None
1580    }
1581}
1582
1583#[derive(Copy, Clone, PartialEq, Eq)]
1584pub enum RoomStatus {
1585    Online,
1586    Rejoining,
1587    Offline,
1588}
1589
1590impl RoomStatus {
1591    pub fn is_offline(&self) -> bool {
1592        matches!(self, RoomStatus::Offline)
1593    }
1594
1595    pub fn is_online(&self) -> bool {
1596        matches!(self, RoomStatus::Online)
1597    }
1598}