room.rs

   1#![cfg_attr(target_os = "windows", allow(unused))]
   2
   3use crate::{
   4    call_settings::CallSettings,
   5    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   6};
   7use anyhow::{anyhow, Result};
   8use audio::{Audio, Sound};
   9use client::{
  10    proto::{self, PeerId},
  11    ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  12};
  13use collections::{BTreeMap, HashMap, HashSet};
  14use fs::Fs;
  15use futures::{FutureExt, StreamExt};
  16use gpui::{
  17    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  18};
  19use language::LanguageRegistry;
  20#[cfg(not(target_os = "windows"))]
  21use livekit::{
  22    capture_local_audio_track, capture_local_video_track,
  23    id::ParticipantIdentity,
  24    options::{TrackPublishOptions, VideoCodec},
  25    play_remote_audio_track,
  26    publication::LocalTrackPublication,
  27    track::{TrackKind, TrackSource},
  28    RoomEvent, RoomOptions,
  29};
  30#[cfg(target_os = "windows")]
  31use livekit::{publication::LocalTrackPublication, RoomEvent};
  32use livekit_client as livekit;
  33use postage::{sink::Sink, stream::Stream, watch};
  34use project::Project;
  35use settings::Settings as _;
  36use std::{any::Any, future::Future, mem, sync::Arc, time::Duration};
  37use util::{post_inc, ResultExt, TryFutureExt};
  38
  39pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  40
  41#[derive(Clone, Debug, PartialEq, Eq)]
  42pub enum Event {
  43    RoomJoined {
  44        channel_id: Option<ChannelId>,
  45    },
  46    ParticipantLocationChanged {
  47        participant_id: proto::PeerId,
  48    },
  49    RemoteVideoTracksChanged {
  50        participant_id: proto::PeerId,
  51    },
  52    RemoteAudioTracksChanged {
  53        participant_id: proto::PeerId,
  54    },
  55    RemoteProjectShared {
  56        owner: Arc<User>,
  57        project_id: u64,
  58        worktree_root_names: Vec<String>,
  59    },
  60    RemoteProjectUnshared {
  61        project_id: u64,
  62    },
  63    RemoteProjectJoined {
  64        project_id: u64,
  65    },
  66    RemoteProjectInvitationDiscarded {
  67        project_id: u64,
  68    },
  69    RoomLeft {
  70        channel_id: Option<ChannelId>,
  71    },
  72}
  73
  74pub struct Room {
  75    id: u64,
  76    channel_id: Option<ChannelId>,
  77    live_kit: Option<LiveKitRoom>,
  78    status: RoomStatus,
  79    shared_projects: HashSet<WeakModel<Project>>,
  80    joined_projects: HashSet<WeakModel<Project>>,
  81    local_participant: LocalParticipant,
  82    remote_participants: BTreeMap<u64, RemoteParticipant>,
  83    pending_participants: Vec<Arc<User>>,
  84    participant_user_ids: HashSet<u64>,
  85    pending_call_count: usize,
  86    leave_when_empty: bool,
  87    client: Arc<Client>,
  88    user_store: Model<UserStore>,
  89    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  90    client_subscriptions: Vec<client::Subscription>,
  91    _subscriptions: Vec<gpui::Subscription>,
  92    room_update_completed_tx: watch::Sender<Option<()>>,
  93    room_update_completed_rx: watch::Receiver<Option<()>>,
  94    pending_room_update: Option<Task<()>>,
  95    maintain_connection: Option<Task<Option<()>>>,
  96}
  97
  98impl EventEmitter<Event> for Room {}
  99
 100impl Room {
 101    pub fn channel_id(&self) -> Option<ChannelId> {
 102        self.channel_id
 103    }
 104
 105    pub fn is_sharing_project(&self) -> bool {
 106        !self.shared_projects.is_empty()
 107    }
 108
 109    #[cfg(all(any(test, feature = "test-support"), not(target_os = "windows")))]
 110    pub fn is_connected(&self) -> bool {
 111        if let Some(live_kit) = self.live_kit.as_ref() {
 112            live_kit.room.connection_state() == livekit::ConnectionState::Connected
 113        } else {
 114            false
 115        }
 116    }
 117
 118    fn new(
 119        id: u64,
 120        channel_id: Option<ChannelId>,
 121        livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
 122        client: Arc<Client>,
 123        user_store: Model<UserStore>,
 124        cx: &mut ModelContext<Self>,
 125    ) -> Self {
 126        spawn_room_connection(livekit_connection_info, cx);
 127
 128        let maintain_connection = cx.spawn({
 129            let client = client.clone();
 130            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 131        });
 132
 133        Audio::play_sound(Sound::Joined, cx);
 134
 135        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 136
 137        Self {
 138            id,
 139            channel_id,
 140            live_kit: None,
 141            status: RoomStatus::Online,
 142            shared_projects: Default::default(),
 143            joined_projects: Default::default(),
 144            participant_user_ids: Default::default(),
 145            local_participant: Default::default(),
 146            remote_participants: Default::default(),
 147            pending_participants: Default::default(),
 148            pending_call_count: 0,
 149            client_subscriptions: vec![
 150                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 151            ],
 152            _subscriptions: vec![
 153                cx.on_release(Self::released),
 154                cx.on_app_quit(Self::app_will_quit),
 155            ],
 156            leave_when_empty: false,
 157            pending_room_update: None,
 158            client,
 159            user_store,
 160            follows_by_leader_id_project_id: Default::default(),
 161            maintain_connection: Some(maintain_connection),
 162            room_update_completed_tx,
 163            room_update_completed_rx,
 164        }
 165    }
 166
 167    pub(crate) fn create(
 168        called_user_id: u64,
 169        initial_project: Option<Model<Project>>,
 170        client: Arc<Client>,
 171        user_store: Model<UserStore>,
 172        cx: &mut AppContext,
 173    ) -> Task<Result<Model<Self>>> {
 174        cx.spawn(move |mut cx| async move {
 175            let response = client.request(proto::CreateRoom {}).await?;
 176            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 177            let room = cx.new_model(|cx| {
 178                let mut room = Self::new(
 179                    room_proto.id,
 180                    None,
 181                    response.live_kit_connection_info,
 182                    client,
 183                    user_store,
 184                    cx,
 185                );
 186                if let Some(participant) = room_proto.participants.first() {
 187                    room.local_participant.role = participant.role()
 188                }
 189                room
 190            })?;
 191
 192            let initial_project_id = if let Some(initial_project) = initial_project {
 193                let initial_project_id = room
 194                    .update(&mut cx, |room, cx| {
 195                        room.share_project(initial_project.clone(), cx)
 196                    })?
 197                    .await?;
 198                Some(initial_project_id)
 199            } else {
 200                None
 201            };
 202
 203            let did_join = room
 204                .update(&mut cx, |room, cx| {
 205                    room.leave_when_empty = true;
 206                    room.call(called_user_id, initial_project_id, cx)
 207                })?
 208                .await;
 209            match did_join {
 210                Ok(()) => Ok(room),
 211                Err(error) => Err(error.context("room creation failed")),
 212            }
 213        })
 214    }
 215
 216    pub(crate) async fn join_channel(
 217        channel_id: ChannelId,
 218        client: Arc<Client>,
 219        user_store: Model<UserStore>,
 220        cx: AsyncAppContext,
 221    ) -> Result<Model<Self>> {
 222        Self::from_join_response(
 223            client
 224                .request(proto::JoinChannel {
 225                    channel_id: channel_id.0,
 226                })
 227                .await?,
 228            client,
 229            user_store,
 230            cx,
 231        )
 232    }
 233
 234    pub(crate) async fn join(
 235        room_id: u64,
 236        client: Arc<Client>,
 237        user_store: Model<UserStore>,
 238        cx: AsyncAppContext,
 239    ) -> Result<Model<Self>> {
 240        Self::from_join_response(
 241            client.request(proto::JoinRoom { id: room_id }).await?,
 242            client,
 243            user_store,
 244            cx,
 245        )
 246    }
 247
 248    fn released(&mut self, cx: &mut AppContext) {
 249        if self.status.is_online() {
 250            self.leave_internal(cx).detach_and_log_err(cx);
 251        }
 252    }
 253
 254    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 255        let task = if self.status.is_online() {
 256            let leave = self.leave_internal(cx);
 257            Some(cx.background_executor().spawn(async move {
 258                leave.await.log_err();
 259            }))
 260        } else {
 261            None
 262        };
 263
 264        async move {
 265            if let Some(task) = task {
 266                task.await;
 267            }
 268        }
 269    }
 270
 271    pub fn mute_on_join(cx: &AppContext) -> bool {
 272        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 273    }
 274
 275    fn from_join_response(
 276        response: proto::JoinRoomResponse,
 277        client: Arc<Client>,
 278        user_store: Model<UserStore>,
 279        mut cx: AsyncAppContext,
 280    ) -> Result<Model<Self>> {
 281        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 282        let room = cx.new_model(|cx| {
 283            Self::new(
 284                room_proto.id,
 285                response.channel_id.map(ChannelId),
 286                response.live_kit_connection_info,
 287                client,
 288                user_store,
 289                cx,
 290            )
 291        })?;
 292        room.update(&mut cx, |room, cx| {
 293            room.leave_when_empty = room.channel_id.is_none();
 294            room.apply_room_update(room_proto, cx)?;
 295            anyhow::Ok(())
 296        })??;
 297        Ok(room)
 298    }
 299
 300    fn should_leave(&self) -> bool {
 301        self.leave_when_empty
 302            && self.pending_room_update.is_none()
 303            && self.pending_participants.is_empty()
 304            && self.remote_participants.is_empty()
 305            && self.pending_call_count == 0
 306    }
 307
 308    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 309        cx.notify();
 310        self.leave_internal(cx)
 311    }
 312
 313    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 314        if self.status.is_offline() {
 315            return Task::ready(Err(anyhow!("room is offline")));
 316        }
 317
 318        log::info!("leaving room");
 319        Audio::play_sound(Sound::Leave, cx);
 320
 321        self.clear_state(cx);
 322
 323        let leave_room = self.client.request(proto::LeaveRoom {});
 324        cx.background_executor().spawn(async move {
 325            leave_room.await?;
 326            anyhow::Ok(())
 327        })
 328    }
 329
 330    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 331        for project in self.shared_projects.drain() {
 332            if let Some(project) = project.upgrade() {
 333                project.update(cx, |project, cx| {
 334                    project.unshare(cx).log_err();
 335                });
 336            }
 337        }
 338        for project in self.joined_projects.drain() {
 339            if let Some(project) = project.upgrade() {
 340                project.update(cx, |project, cx| {
 341                    project.disconnected_from_host(cx);
 342                    project.close(cx);
 343                });
 344            }
 345        }
 346
 347        self.status = RoomStatus::Offline;
 348        self.remote_participants.clear();
 349        self.pending_participants.clear();
 350        self.participant_user_ids.clear();
 351        self.client_subscriptions.clear();
 352        self.live_kit.take();
 353        self.pending_room_update.take();
 354        self.maintain_connection.take();
 355    }
 356
 357    async fn maintain_connection(
 358        this: WeakModel<Self>,
 359        client: Arc<Client>,
 360        mut cx: AsyncAppContext,
 361    ) -> Result<()> {
 362        let mut client_status = client.status();
 363        loop {
 364            let _ = client_status.try_recv();
 365            let is_connected = client_status.borrow().is_connected();
 366            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 367            if !is_connected || client_status.next().await.is_some() {
 368                log::info!("detected client disconnection");
 369
 370                this.upgrade()
 371                    .ok_or_else(|| anyhow!("room was dropped"))?
 372                    .update(&mut cx, |this, cx| {
 373                        this.status = RoomStatus::Rejoining;
 374                        cx.notify();
 375                    })?;
 376
 377                // Wait for client to re-establish a connection to the server.
 378                {
 379                    let mut reconnection_timeout =
 380                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 381                    let client_reconnection = async {
 382                        let mut remaining_attempts = 3;
 383                        while remaining_attempts > 0 {
 384                            if client_status.borrow().is_connected() {
 385                                log::info!("client reconnected, attempting to rejoin room");
 386
 387                                let Some(this) = this.upgrade() else { break };
 388                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 389                                    Ok(task) => {
 390                                        if task.await.log_err().is_some() {
 391                                            return true;
 392                                        } else {
 393                                            remaining_attempts -= 1;
 394                                        }
 395                                    }
 396                                    Err(_app_dropped) => return false,
 397                                }
 398                            } else if client_status.borrow().is_signed_out() {
 399                                return false;
 400                            }
 401
 402                            log::info!(
 403                                "waiting for client status change, remaining attempts {}",
 404                                remaining_attempts
 405                            );
 406                            client_status.next().await;
 407                        }
 408                        false
 409                    }
 410                    .fuse();
 411                    futures::pin_mut!(client_reconnection);
 412
 413                    futures::select_biased! {
 414                        reconnected = client_reconnection => {
 415                            if reconnected {
 416                                log::info!("successfully reconnected to room");
 417                                // If we successfully joined the room, go back around the loop
 418                                // waiting for future connection status changes.
 419                                continue;
 420                            }
 421                        }
 422                        _ = reconnection_timeout => {
 423                            log::info!("room reconnection timeout expired");
 424                        }
 425                    }
 426                }
 427
 428                break;
 429            }
 430        }
 431
 432        // The client failed to re-establish a connection to the server
 433        // or an error occurred while trying to re-join the room. Either way
 434        // we leave the room and return an error.
 435        if let Some(this) = this.upgrade() {
 436            log::info!("reconnection failed, leaving room");
 437            this.update(&mut cx, |this, cx| this.leave(cx))?.await?;
 438        }
 439        Err(anyhow!(
 440            "can't reconnect to room: client failed to re-establish connection"
 441        ))
 442    }
 443
 444    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 445        let mut projects = HashMap::default();
 446        let mut reshared_projects = Vec::new();
 447        let mut rejoined_projects = Vec::new();
 448        self.shared_projects.retain(|project| {
 449            if let Some(handle) = project.upgrade() {
 450                let project = handle.read(cx);
 451                if let Some(project_id) = project.remote_id() {
 452                    projects.insert(project_id, handle.clone());
 453                    reshared_projects.push(proto::UpdateProject {
 454                        project_id,
 455                        worktrees: project.worktree_metadata_protos(cx),
 456                    });
 457                    return true;
 458                }
 459            }
 460            false
 461        });
 462        self.joined_projects.retain(|project| {
 463            if let Some(handle) = project.upgrade() {
 464                let project = handle.read(cx);
 465                if let Some(project_id) = project.remote_id() {
 466                    projects.insert(project_id, handle.clone());
 467                    rejoined_projects.push(proto::RejoinProject {
 468                        id: project_id,
 469                        worktrees: project
 470                            .worktrees(cx)
 471                            .map(|worktree| {
 472                                let worktree = worktree.read(cx);
 473                                proto::RejoinWorktree {
 474                                    id: worktree.id().to_proto(),
 475                                    scan_id: worktree.completed_scan_id() as u64,
 476                                }
 477                            })
 478                            .collect(),
 479                    });
 480                }
 481                return true;
 482            }
 483            false
 484        });
 485
 486        let response = self.client.request_envelope(proto::RejoinRoom {
 487            id: self.id,
 488            reshared_projects,
 489            rejoined_projects,
 490        });
 491
 492        cx.spawn(|this, mut cx| async move {
 493            let response = response.await?;
 494            let message_id = response.message_id;
 495            let response = response.payload;
 496            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 497            this.update(&mut cx, |this, cx| {
 498                this.status = RoomStatus::Online;
 499                this.apply_room_update(room_proto, cx)?;
 500
 501                for reshared_project in response.reshared_projects {
 502                    if let Some(project) = projects.get(&reshared_project.id) {
 503                        project.update(cx, |project, cx| {
 504                            project.reshared(reshared_project, cx).log_err();
 505                        });
 506                    }
 507                }
 508
 509                for rejoined_project in response.rejoined_projects {
 510                    if let Some(project) = projects.get(&rejoined_project.id) {
 511                        project.update(cx, |project, cx| {
 512                            project.rejoined(rejoined_project, message_id, cx).log_err();
 513                        });
 514                    }
 515                }
 516
 517                anyhow::Ok(())
 518            })?
 519        })
 520    }
 521
 522    pub fn id(&self) -> u64 {
 523        self.id
 524    }
 525
 526    pub fn status(&self) -> RoomStatus {
 527        self.status
 528    }
 529
 530    pub fn local_participant(&self) -> &LocalParticipant {
 531        &self.local_participant
 532    }
 533
 534    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 535        &self.remote_participants
 536    }
 537
 538    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 539        self.remote_participants
 540            .values()
 541            .find(|p| p.peer_id == peer_id)
 542    }
 543
 544    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 545        self.remote_participants
 546            .get(&user_id)
 547            .map(|participant| participant.role)
 548    }
 549
 550    pub fn contains_guests(&self) -> bool {
 551        self.local_participant.role == proto::ChannelRole::Guest
 552            || self
 553                .remote_participants
 554                .values()
 555                .any(|p| p.role == proto::ChannelRole::Guest)
 556    }
 557
 558    pub fn local_participant_is_admin(&self) -> bool {
 559        self.local_participant.role == proto::ChannelRole::Admin
 560    }
 561
 562    pub fn local_participant_is_guest(&self) -> bool {
 563        self.local_participant.role == proto::ChannelRole::Guest
 564    }
 565
 566    pub fn set_participant_role(
 567        &mut self,
 568        user_id: u64,
 569        role: proto::ChannelRole,
 570        cx: &ModelContext<Self>,
 571    ) -> Task<Result<()>> {
 572        let client = self.client.clone();
 573        let room_id = self.id;
 574        let role = role.into();
 575        cx.spawn(|_, _| async move {
 576            client
 577                .request(proto::SetRoomParticipantRole {
 578                    room_id,
 579                    user_id,
 580                    role,
 581                })
 582                .await
 583                .map(|_| ())
 584        })
 585    }
 586
 587    pub fn pending_participants(&self) -> &[Arc<User>] {
 588        &self.pending_participants
 589    }
 590
 591    pub fn contains_participant(&self, user_id: u64) -> bool {
 592        self.participant_user_ids.contains(&user_id)
 593    }
 594
 595    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 596        self.follows_by_leader_id_project_id
 597            .get(&(leader_id, project_id))
 598            .map_or(&[], |v| v.as_slice())
 599    }
 600
 601    /// Returns the most 'active' projects, defined as most people in the project
 602    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 603        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 604        for participant in self.remote_participants.values() {
 605            match participant.location {
 606                ParticipantLocation::SharedProject { project_id } => {
 607                    project_hosts_and_guest_counts
 608                        .entry(project_id)
 609                        .or_default()
 610                        .1 += 1;
 611                }
 612                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 613            }
 614            for project in &participant.projects {
 615                project_hosts_and_guest_counts
 616                    .entry(project.id)
 617                    .or_default()
 618                    .0 = Some(participant.user.id);
 619            }
 620        }
 621
 622        if let Some(user) = self.user_store.read(cx).current_user() {
 623            for project in &self.local_participant.projects {
 624                project_hosts_and_guest_counts
 625                    .entry(project.id)
 626                    .or_default()
 627                    .0 = Some(user.id);
 628            }
 629        }
 630
 631        project_hosts_and_guest_counts
 632            .into_iter()
 633            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 634            .max_by_key(|(_, _, guest_count)| *guest_count)
 635            .map(|(id, host, _)| (id, host))
 636    }
 637
 638    async fn handle_room_updated(
 639        this: Model<Self>,
 640        envelope: TypedEnvelope<proto::RoomUpdated>,
 641        mut cx: AsyncAppContext,
 642    ) -> Result<()> {
 643        let room = envelope
 644            .payload
 645            .room
 646            .ok_or_else(|| anyhow!("invalid room"))?;
 647        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 648    }
 649
 650    fn apply_room_update(&mut self, room: proto::Room, cx: &mut ModelContext<Self>) -> Result<()> {
 651        log::trace!(
 652            "client {:?}. room update: {:?}",
 653            self.client.user_id(),
 654            &room
 655        );
 656
 657        self.pending_room_update = Some(self.start_room_connection(room, cx));
 658
 659        cx.notify();
 660        Ok(())
 661    }
 662
 663    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 664        let mut done_rx = self.room_update_completed_rx.clone();
 665        async move {
 666            while let Some(result) = done_rx.next().await {
 667                if result.is_some() {
 668                    break;
 669                }
 670            }
 671        }
 672    }
 673
 674    #[cfg(target_os = "windows")]
 675    fn start_room_connection(
 676        &self,
 677        mut room: proto::Room,
 678        cx: &mut ModelContext<Self>,
 679    ) -> Task<()> {
 680        Task::ready(())
 681    }
 682
 683    #[cfg(not(target_os = "windows"))]
 684    fn start_room_connection(
 685        &self,
 686        mut room: proto::Room,
 687        cx: &mut ModelContext<Self>,
 688    ) -> Task<()> {
 689        // Filter ourselves out from the room's participants.
 690        let local_participant_ix = room
 691            .participants
 692            .iter()
 693            .position(|participant| Some(participant.user_id) == self.client.user_id());
 694        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 695
 696        let pending_participant_user_ids = room
 697            .pending_participants
 698            .iter()
 699            .map(|p| p.user_id)
 700            .collect::<Vec<_>>();
 701
 702        let remote_participant_user_ids = room
 703            .participants
 704            .iter()
 705            .map(|p| p.user_id)
 706            .collect::<Vec<_>>();
 707
 708        let (remote_participants, pending_participants) =
 709            self.user_store.update(cx, move |user_store, cx| {
 710                (
 711                    user_store.get_users(remote_participant_user_ids, cx),
 712                    user_store.get_users(pending_participant_user_ids, cx),
 713                )
 714            });
 715        cx.spawn(|this, mut cx| async move {
 716            let (remote_participants, pending_participants) =
 717                futures::join!(remote_participants, pending_participants);
 718
 719            this.update(&mut cx, |this, cx| {
 720                this.participant_user_ids.clear();
 721
 722                if let Some(participant) = local_participant {
 723                    let role = participant.role();
 724                    this.local_participant.projects = participant.projects;
 725                    if this.local_participant.role != role {
 726                        this.local_participant.role = role;
 727
 728                        if role == proto::ChannelRole::Guest {
 729                            for project in mem::take(&mut this.shared_projects) {
 730                                if let Some(project) = project.upgrade() {
 731                                    this.unshare_project(project, cx).log_err();
 732                                }
 733                            }
 734                            this.local_participant.projects.clear();
 735                            if let Some(livekit_room) = &mut this.live_kit {
 736                                livekit_room.stop_publishing(cx);
 737                            }
 738                        }
 739
 740                        this.joined_projects.retain(|project| {
 741                            if let Some(project) = project.upgrade() {
 742                                project.update(cx, |project, cx| project.set_role(role, cx));
 743                                true
 744                            } else {
 745                                false
 746                            }
 747                        });
 748                    }
 749                } else {
 750                    this.local_participant.projects.clear();
 751                }
 752
 753                let livekit_participants = this
 754                    .live_kit
 755                    .as_ref()
 756                    .map(|live_kit| live_kit.room.remote_participants());
 757
 758                if let Some(participants) = remote_participants.log_err() {
 759                    for (participant, user) in room.participants.into_iter().zip(participants) {
 760                        let Some(peer_id) = participant.peer_id else {
 761                            continue;
 762                        };
 763                        let participant_index = ParticipantIndex(participant.participant_index);
 764                        this.participant_user_ids.insert(participant.user_id);
 765
 766                        let old_projects = this
 767                            .remote_participants
 768                            .get(&participant.user_id)
 769                            .into_iter()
 770                            .flat_map(|existing| &existing.projects)
 771                            .map(|project| project.id)
 772                            .collect::<HashSet<_>>();
 773                        let new_projects = participant
 774                            .projects
 775                            .iter()
 776                            .map(|project| project.id)
 777                            .collect::<HashSet<_>>();
 778
 779                        for project in &participant.projects {
 780                            if !old_projects.contains(&project.id) {
 781                                cx.emit(Event::RemoteProjectShared {
 782                                    owner: user.clone(),
 783                                    project_id: project.id,
 784                                    worktree_root_names: project.worktree_root_names.clone(),
 785                                });
 786                            }
 787                        }
 788
 789                        for unshared_project_id in old_projects.difference(&new_projects) {
 790                            this.joined_projects.retain(|project| {
 791                                if let Some(project) = project.upgrade() {
 792                                    project.update(cx, |project, cx| {
 793                                        if project.remote_id() == Some(*unshared_project_id) {
 794                                            project.disconnected_from_host(cx);
 795                                            false
 796                                        } else {
 797                                            true
 798                                        }
 799                                    })
 800                                } else {
 801                                    false
 802                                }
 803                            });
 804                            cx.emit(Event::RemoteProjectUnshared {
 805                                project_id: *unshared_project_id,
 806                            });
 807                        }
 808
 809                        let role = participant.role();
 810                        let location = ParticipantLocation::from_proto(participant.location)
 811                            .unwrap_or(ParticipantLocation::External);
 812                        if let Some(remote_participant) =
 813                            this.remote_participants.get_mut(&participant.user_id)
 814                        {
 815                            remote_participant.peer_id = peer_id;
 816                            remote_participant.projects = participant.projects;
 817                            remote_participant.participant_index = participant_index;
 818                            if location != remote_participant.location
 819                                || role != remote_participant.role
 820                            {
 821                                remote_participant.location = location;
 822                                remote_participant.role = role;
 823                                cx.emit(Event::ParticipantLocationChanged {
 824                                    participant_id: peer_id,
 825                                });
 826                            }
 827                        } else {
 828                            this.remote_participants.insert(
 829                                participant.user_id,
 830                                RemoteParticipant {
 831                                    user: user.clone(),
 832                                    participant_index,
 833                                    peer_id,
 834                                    projects: participant.projects,
 835                                    location,
 836                                    role,
 837                                    muted: true,
 838                                    speaking: false,
 839                                    video_tracks: Default::default(),
 840                                    #[cfg(not(target_os = "windows"))]
 841                                    audio_tracks: Default::default(),
 842                                },
 843                            );
 844
 845                            Audio::play_sound(Sound::Joined, cx);
 846                            if let Some(livekit_participants) = &livekit_participants {
 847                                if let Some(livekit_participant) = livekit_participants
 848                                    .get(&ParticipantIdentity(user.id.to_string()))
 849                                {
 850                                    for publication in
 851                                        livekit_participant.track_publications().into_values()
 852                                    {
 853                                        if let Some(track) = publication.track() {
 854                                            this.livekit_room_updated(
 855                                                RoomEvent::TrackSubscribed {
 856                                                    track,
 857                                                    publication,
 858                                                    participant: livekit_participant.clone(),
 859                                                },
 860                                                cx,
 861                                            )
 862                                            .warn_on_err();
 863                                        }
 864                                    }
 865                                }
 866                            }
 867                        }
 868                    }
 869
 870                    this.remote_participants.retain(|user_id, participant| {
 871                        if this.participant_user_ids.contains(user_id) {
 872                            true
 873                        } else {
 874                            for project in &participant.projects {
 875                                cx.emit(Event::RemoteProjectUnshared {
 876                                    project_id: project.id,
 877                                });
 878                            }
 879                            false
 880                        }
 881                    });
 882                }
 883
 884                if let Some(pending_participants) = pending_participants.log_err() {
 885                    this.pending_participants = pending_participants;
 886                    for participant in &this.pending_participants {
 887                        this.participant_user_ids.insert(participant.id);
 888                    }
 889                }
 890
 891                this.follows_by_leader_id_project_id.clear();
 892                for follower in room.followers {
 893                    let project_id = follower.project_id;
 894                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 895                        (Some(leader), Some(follower)) => (leader, follower),
 896
 897                        _ => {
 898                            log::error!("Follower message {follower:?} missing some state");
 899                            continue;
 900                        }
 901                    };
 902
 903                    let list = this
 904                        .follows_by_leader_id_project_id
 905                        .entry((leader, project_id))
 906                        .or_default();
 907                    if !list.contains(&follower) {
 908                        list.push(follower);
 909                    }
 910                }
 911
 912                this.pending_room_update.take();
 913                if this.should_leave() {
 914                    log::info!("room is empty, leaving");
 915                    this.leave(cx).detach();
 916                }
 917
 918                this.user_store.update(cx, |user_store, cx| {
 919                    let participant_indices_by_user_id = this
 920                        .remote_participants
 921                        .iter()
 922                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 923                        .collect();
 924                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 925                });
 926
 927                this.check_invariants();
 928                this.room_update_completed_tx.try_send(Some(())).ok();
 929                cx.notify();
 930            })
 931            .ok();
 932        })
 933    }
 934
 935    fn livekit_room_updated(
 936        &mut self,
 937        event: RoomEvent,
 938        cx: &mut ModelContext<Self>,
 939    ) -> Result<()> {
 940        log::trace!(
 941            "client {:?}. livekit event: {:?}",
 942            self.client.user_id(),
 943            &event
 944        );
 945
 946        match event {
 947            #[cfg(not(target_os = "windows"))]
 948            RoomEvent::TrackSubscribed {
 949                track,
 950                participant,
 951                publication,
 952            } => {
 953                let user_id = participant.identity().0.parse()?;
 954                let track_id = track.sid();
 955                let participant = self.remote_participants.get_mut(&user_id).ok_or_else(|| {
 956                    anyhow!(
 957                        "{:?} subscribed to track by unknown participant {user_id}",
 958                        self.client.user_id()
 959                    )
 960                })?;
 961                if self.live_kit.as_ref().map_or(true, |kit| kit.deafened) {
 962                    track.rtc_track().set_enabled(false);
 963                }
 964                match track {
 965                    livekit::track::RemoteTrack::Audio(track) => {
 966                        cx.emit(Event::RemoteAudioTracksChanged {
 967                            participant_id: participant.peer_id,
 968                        });
 969                        let stream = play_remote_audio_track(&track, cx.background_executor())?;
 970                        participant.audio_tracks.insert(track_id, (track, stream));
 971                        participant.muted = publication.is_muted();
 972                    }
 973                    livekit::track::RemoteTrack::Video(track) => {
 974                        cx.emit(Event::RemoteVideoTracksChanged {
 975                            participant_id: participant.peer_id,
 976                        });
 977                        participant.video_tracks.insert(track_id, track);
 978                    }
 979                }
 980            }
 981
 982            #[cfg(not(target_os = "windows"))]
 983            RoomEvent::TrackUnsubscribed {
 984                track, participant, ..
 985            } => {
 986                let user_id = participant.identity().0.parse()?;
 987                let participant = self.remote_participants.get_mut(&user_id).ok_or_else(|| {
 988                    anyhow!(
 989                        "{:?}, unsubscribed from track by unknown participant {user_id}",
 990                        self.client.user_id()
 991                    )
 992                })?;
 993                match track {
 994                    livekit::track::RemoteTrack::Audio(track) => {
 995                        participant.audio_tracks.remove(&track.sid());
 996                        participant.muted = true;
 997                        cx.emit(Event::RemoteAudioTracksChanged {
 998                            participant_id: participant.peer_id,
 999                        });
1000                    }
1001                    livekit::track::RemoteTrack::Video(track) => {
1002                        participant.video_tracks.remove(&track.sid());
1003                        cx.emit(Event::RemoteVideoTracksChanged {
1004                            participant_id: participant.peer_id,
1005                        });
1006                    }
1007                }
1008            }
1009
1010            #[cfg(not(target_os = "windows"))]
1011            RoomEvent::ActiveSpeakersChanged { speakers } => {
1012                let mut speaker_ids = speakers
1013                    .into_iter()
1014                    .filter_map(|speaker| speaker.identity().0.parse().ok())
1015                    .collect::<Vec<u64>>();
1016                speaker_ids.sort_unstable();
1017                for (sid, participant) in &mut self.remote_participants {
1018                    participant.speaking = speaker_ids.binary_search(sid).is_ok();
1019                }
1020                if let Some(id) = self.client.user_id() {
1021                    if let Some(room) = &mut self.live_kit {
1022                        room.speaking = speaker_ids.binary_search(&id).is_ok();
1023                    }
1024                }
1025            }
1026
1027            #[cfg(not(target_os = "windows"))]
1028            RoomEvent::TrackMuted {
1029                participant,
1030                publication,
1031            }
1032            | RoomEvent::TrackUnmuted {
1033                participant,
1034                publication,
1035            } => {
1036                let mut found = false;
1037                let user_id = participant.identity().0.parse()?;
1038                let track_id = publication.sid();
1039                if let Some(participant) = self.remote_participants.get_mut(&user_id) {
1040                    for (track, _) in participant.audio_tracks.values() {
1041                        if track.sid() == track_id {
1042                            found = true;
1043                            break;
1044                        }
1045                    }
1046                    if found {
1047                        participant.muted = publication.is_muted();
1048                    }
1049                }
1050            }
1051
1052            #[cfg(not(target_os = "windows"))]
1053            RoomEvent::LocalTrackUnpublished { publication, .. } => {
1054                log::info!("unpublished track {}", publication.sid());
1055                if let Some(room) = &mut self.live_kit {
1056                    if let LocalTrack::Published {
1057                        track_publication, ..
1058                    } = &room.microphone_track
1059                    {
1060                        if track_publication.sid() == publication.sid() {
1061                            room.microphone_track = LocalTrack::None;
1062                        }
1063                    }
1064                    if let LocalTrack::Published {
1065                        track_publication, ..
1066                    } = &room.screen_track
1067                    {
1068                        if track_publication.sid() == publication.sid() {
1069                            room.screen_track = LocalTrack::None;
1070                        }
1071                    }
1072                }
1073            }
1074
1075            #[cfg(not(target_os = "windows"))]
1076            RoomEvent::LocalTrackPublished { publication, .. } => {
1077                log::info!("published track {:?}", publication.sid());
1078            }
1079
1080            #[cfg(not(target_os = "windows"))]
1081            RoomEvent::Disconnected { reason } => {
1082                log::info!("disconnected from room: {reason:?}");
1083                self.leave(cx).detach_and_log_err(cx);
1084            }
1085            _ => {}
1086        }
1087
1088        cx.notify();
1089        Ok(())
1090    }
1091
1092    fn check_invariants(&self) {
1093        #[cfg(any(test, feature = "test-support"))]
1094        {
1095            for participant in self.remote_participants.values() {
1096                assert!(self.participant_user_ids.contains(&participant.user.id));
1097                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1098            }
1099
1100            for participant in &self.pending_participants {
1101                assert!(self.participant_user_ids.contains(&participant.id));
1102                assert_ne!(participant.id, self.client.user_id().unwrap());
1103            }
1104
1105            assert_eq!(
1106                self.participant_user_ids.len(),
1107                self.remote_participants.len() + self.pending_participants.len()
1108            );
1109        }
1110    }
1111
1112    pub(crate) fn call(
1113        &mut self,
1114        called_user_id: u64,
1115        initial_project_id: Option<u64>,
1116        cx: &mut ModelContext<Self>,
1117    ) -> Task<Result<()>> {
1118        if self.status.is_offline() {
1119            return Task::ready(Err(anyhow!("room is offline")));
1120        }
1121
1122        cx.notify();
1123        let client = self.client.clone();
1124        let room_id = self.id;
1125        self.pending_call_count += 1;
1126        cx.spawn(move |this, mut cx| async move {
1127            let result = client
1128                .request(proto::Call {
1129                    room_id,
1130                    called_user_id,
1131                    initial_project_id,
1132                })
1133                .await;
1134            this.update(&mut cx, |this, cx| {
1135                this.pending_call_count -= 1;
1136                if this.should_leave() {
1137                    this.leave(cx).detach_and_log_err(cx);
1138                }
1139            })?;
1140            result?;
1141            Ok(())
1142        })
1143    }
1144
1145    pub fn join_project(
1146        &mut self,
1147        id: u64,
1148        language_registry: Arc<LanguageRegistry>,
1149        fs: Arc<dyn Fs>,
1150        cx: &mut ModelContext<Self>,
1151    ) -> Task<Result<Model<Project>>> {
1152        let client = self.client.clone();
1153        let user_store = self.user_store.clone();
1154        cx.emit(Event::RemoteProjectJoined { project_id: id });
1155        cx.spawn(move |this, mut cx| async move {
1156            let project =
1157                Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1158
1159            this.update(&mut cx, |this, cx| {
1160                this.joined_projects.retain(|project| {
1161                    if let Some(project) = project.upgrade() {
1162                        !project.read(cx).is_disconnected(cx)
1163                    } else {
1164                        false
1165                    }
1166                });
1167                this.joined_projects.insert(project.downgrade());
1168            })?;
1169            Ok(project)
1170        })
1171    }
1172
1173    pub fn share_project(
1174        &mut self,
1175        project: Model<Project>,
1176        cx: &mut ModelContext<Self>,
1177    ) -> Task<Result<u64>> {
1178        if let Some(project_id) = project.read(cx).remote_id() {
1179            return Task::ready(Ok(project_id));
1180        }
1181
1182        let request = self.client.request(proto::ShareProject {
1183            room_id: self.id(),
1184            worktrees: project.read(cx).worktree_metadata_protos(cx),
1185            is_ssh_project: project.read(cx).is_via_ssh(),
1186        });
1187
1188        cx.spawn(|this, mut cx| async move {
1189            let response = request.await?;
1190
1191            project.update(&mut cx, |project, cx| {
1192                project.shared(response.project_id, cx)
1193            })??;
1194
1195            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1196            this.update(&mut cx, |this, cx| {
1197                this.shared_projects.insert(project.downgrade());
1198                let active_project = this.local_participant.active_project.as_ref();
1199                if active_project.map_or(false, |location| *location == project) {
1200                    this.set_location(Some(&project), cx)
1201                } else {
1202                    Task::ready(Ok(()))
1203                }
1204            })?
1205            .await?;
1206
1207            Ok(response.project_id)
1208        })
1209    }
1210
1211    pub(crate) fn unshare_project(
1212        &mut self,
1213        project: Model<Project>,
1214        cx: &mut ModelContext<Self>,
1215    ) -> Result<()> {
1216        let project_id = match project.read(cx).remote_id() {
1217            Some(project_id) => project_id,
1218            None => return Ok(()),
1219        };
1220
1221        self.client.send(proto::UnshareProject { project_id })?;
1222        project.update(cx, |this, cx| this.unshare(cx))?;
1223
1224        if self.local_participant.active_project == Some(project.downgrade()) {
1225            self.set_location(Some(&project), cx).detach_and_log_err(cx);
1226        }
1227        Ok(())
1228    }
1229
1230    pub(crate) fn set_location(
1231        &mut self,
1232        project: Option<&Model<Project>>,
1233        cx: &mut ModelContext<Self>,
1234    ) -> Task<Result<()>> {
1235        if self.status.is_offline() {
1236            return Task::ready(Err(anyhow!("room is offline")));
1237        }
1238
1239        let client = self.client.clone();
1240        let room_id = self.id;
1241        let location = if let Some(project) = project {
1242            self.local_participant.active_project = Some(project.downgrade());
1243            if let Some(project_id) = project.read(cx).remote_id() {
1244                proto::participant_location::Variant::SharedProject(
1245                    proto::participant_location::SharedProject { id: project_id },
1246                )
1247            } else {
1248                proto::participant_location::Variant::UnsharedProject(
1249                    proto::participant_location::UnsharedProject {},
1250                )
1251            }
1252        } else {
1253            self.local_participant.active_project = None;
1254            proto::participant_location::Variant::External(proto::participant_location::External {})
1255        };
1256
1257        cx.notify();
1258        cx.background_executor().spawn(async move {
1259            client
1260                .request(proto::UpdateParticipantLocation {
1261                    room_id,
1262                    location: Some(proto::ParticipantLocation {
1263                        variant: Some(location),
1264                    }),
1265                })
1266                .await?;
1267            Ok(())
1268        })
1269    }
1270
1271    pub fn is_screen_sharing(&self) -> bool {
1272        self.live_kit.as_ref().map_or(false, |live_kit| {
1273            !matches!(live_kit.screen_track, LocalTrack::None)
1274        })
1275    }
1276
1277    pub fn is_sharing_mic(&self) -> bool {
1278        self.live_kit.as_ref().map_or(false, |live_kit| {
1279            !matches!(live_kit.microphone_track, LocalTrack::None)
1280        })
1281    }
1282
1283    pub fn is_muted(&self) -> bool {
1284        self.live_kit.as_ref().map_or(false, |live_kit| {
1285            matches!(live_kit.microphone_track, LocalTrack::None)
1286                || live_kit.muted_by_user
1287                || live_kit.deafened
1288        })
1289    }
1290
1291    pub fn is_speaking(&self) -> bool {
1292        self.live_kit
1293            .as_ref()
1294            .map_or(false, |live_kit| live_kit.speaking)
1295    }
1296
1297    pub fn is_deafened(&self) -> Option<bool> {
1298        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1299    }
1300
1301    pub fn can_use_microphone(&self, _cx: &AppContext) -> bool {
1302        use proto::ChannelRole::*;
1303
1304        #[cfg(not(any(test, feature = "test-support")))]
1305        {
1306            use feature_flags::FeatureFlagAppExt as _;
1307            if cfg!(target_os = "windows") || (cfg!(target_os = "linux") && !_cx.is_staff()) {
1308                return false;
1309            }
1310        }
1311
1312        match self.local_participant.role {
1313            Admin | Member | Talker => true,
1314            Guest | Banned => false,
1315        }
1316    }
1317
1318    pub fn can_share_projects(&self) -> bool {
1319        use proto::ChannelRole::*;
1320        match self.local_participant.role {
1321            Admin | Member => true,
1322            Guest | Banned | Talker => false,
1323        }
1324    }
1325
1326    #[cfg(target_os = "windows")]
1327    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1328        Task::ready(Err(anyhow!("Windows is not supported yet")))
1329    }
1330
1331    #[cfg(not(target_os = "windows"))]
1332    #[track_caller]
1333    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1334        if self.status.is_offline() {
1335            return Task::ready(Err(anyhow!("room is offline")));
1336        }
1337
1338        let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1339            let publish_id = post_inc(&mut live_kit.next_publish_id);
1340            live_kit.microphone_track = LocalTrack::Pending { publish_id };
1341            cx.notify();
1342            (live_kit.room.local_participant(), publish_id)
1343        } else {
1344            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1345        };
1346
1347        cx.spawn(move |this, mut cx| async move {
1348            let (track, stream) = capture_local_audio_track(cx.background_executor())?.await;
1349
1350            let publication = participant
1351                .publish_track(
1352                    livekit::track::LocalTrack::Audio(track),
1353                    TrackPublishOptions {
1354                        source: TrackSource::Microphone,
1355                        ..Default::default()
1356                    },
1357                )
1358                .await
1359                .map_err(|error| anyhow!("failed to publish track: {error}"));
1360            this.update(&mut cx, |this, cx| {
1361                let live_kit = this
1362                    .live_kit
1363                    .as_mut()
1364                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1365
1366                let canceled = if let LocalTrack::Pending {
1367                    publish_id: cur_publish_id,
1368                } = &live_kit.microphone_track
1369                {
1370                    *cur_publish_id != publish_id
1371                } else {
1372                    true
1373                };
1374
1375                match publication {
1376                    Ok(publication) => {
1377                        if canceled {
1378                            cx.background_executor()
1379                                .spawn(async move {
1380                                    participant.unpublish_track(&publication.sid()).await
1381                                })
1382                                .detach_and_log_err(cx)
1383                        } else {
1384                            if live_kit.muted_by_user || live_kit.deafened {
1385                                publication.mute();
1386                            }
1387                            live_kit.microphone_track = LocalTrack::Published {
1388                                track_publication: publication,
1389                                _stream: Box::new(stream),
1390                            };
1391                            cx.notify();
1392                        }
1393                        Ok(())
1394                    }
1395                    Err(error) => {
1396                        if canceled {
1397                            Ok(())
1398                        } else {
1399                            live_kit.microphone_track = LocalTrack::None;
1400                            cx.notify();
1401                            Err(error)
1402                        }
1403                    }
1404                }
1405            })?
1406        })
1407    }
1408
1409    #[cfg(target_os = "windows")]
1410    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1411        Task::ready(Err(anyhow!("Windows is not supported yet")))
1412    }
1413
1414    #[cfg(not(target_os = "windows"))]
1415    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1416        if self.status.is_offline() {
1417            return Task::ready(Err(anyhow!("room is offline")));
1418        }
1419        if self.is_screen_sharing() {
1420            return Task::ready(Err(anyhow!("screen was already shared")));
1421        }
1422
1423        let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1424            let publish_id = post_inc(&mut live_kit.next_publish_id);
1425            live_kit.screen_track = LocalTrack::Pending { publish_id };
1426            cx.notify();
1427            (live_kit.room.local_participant(), publish_id)
1428        } else {
1429            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1430        };
1431
1432        let sources = cx.screen_capture_sources();
1433
1434        cx.spawn(move |this, mut cx| async move {
1435            let sources = sources.await??;
1436            let source = sources.first().ok_or_else(|| anyhow!("no display found"))?;
1437
1438            let (track, stream) = capture_local_video_track(&**source).await?;
1439
1440            let publication = participant
1441                .publish_track(
1442                    livekit::track::LocalTrack::Video(track),
1443                    TrackPublishOptions {
1444                        source: TrackSource::Screenshare,
1445                        video_codec: VideoCodec::H264,
1446                        ..Default::default()
1447                    },
1448                )
1449                .await
1450                .map_err(|error| anyhow!("error publishing screen track {error:?}"));
1451
1452            this.update(&mut cx, |this, cx| {
1453                let live_kit = this
1454                    .live_kit
1455                    .as_mut()
1456                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1457
1458                let canceled = if let LocalTrack::Pending {
1459                    publish_id: cur_publish_id,
1460                } = &live_kit.screen_track
1461                {
1462                    *cur_publish_id != publish_id
1463                } else {
1464                    true
1465                };
1466
1467                match publication {
1468                    Ok(publication) => {
1469                        if canceled {
1470                            cx.background_executor()
1471                                .spawn(async move {
1472                                    participant.unpublish_track(&publication.sid()).await
1473                                })
1474                                .detach()
1475                        } else {
1476                            live_kit.screen_track = LocalTrack::Published {
1477                                track_publication: publication,
1478                                _stream: Box::new(stream),
1479                            };
1480                            cx.notify();
1481                        }
1482
1483                        Audio::play_sound(Sound::StartScreenshare, cx);
1484                        Ok(())
1485                    }
1486                    Err(error) => {
1487                        if canceled {
1488                            Ok(())
1489                        } else {
1490                            live_kit.screen_track = LocalTrack::None;
1491                            cx.notify();
1492                            Err(error)
1493                        }
1494                    }
1495                }
1496            })?
1497        })
1498    }
1499
1500    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) {
1501        if let Some(live_kit) = self.live_kit.as_mut() {
1502            // When unmuting, undeafen if the user was deafened before.
1503            let was_deafened = live_kit.deafened;
1504            if live_kit.muted_by_user
1505                || live_kit.deafened
1506                || matches!(live_kit.microphone_track, LocalTrack::None)
1507            {
1508                live_kit.muted_by_user = false;
1509                live_kit.deafened = false;
1510            } else {
1511                live_kit.muted_by_user = true;
1512            }
1513            let muted = live_kit.muted_by_user;
1514            let should_undeafen = was_deafened && !live_kit.deafened;
1515
1516            if let Some(task) = self.set_mute(muted, cx) {
1517                task.detach_and_log_err(cx);
1518            }
1519
1520            if should_undeafen {
1521                self.set_deafened(false, cx);
1522            }
1523        }
1524    }
1525
1526    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) {
1527        if let Some(live_kit) = self.live_kit.as_mut() {
1528            // When deafening, mute the microphone if it was not already muted.
1529            // When un-deafening, unmute the microphone, unless it was explicitly muted.
1530            let deafened = !live_kit.deafened;
1531            live_kit.deafened = deafened;
1532            let should_change_mute = !live_kit.muted_by_user;
1533
1534            self.set_deafened(deafened, cx);
1535
1536            if should_change_mute {
1537                if let Some(task) = self.set_mute(deafened, cx) {
1538                    task.detach_and_log_err(cx);
1539                }
1540            }
1541        }
1542    }
1543
1544    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1545        if self.status.is_offline() {
1546            return Err(anyhow!("room is offline"));
1547        }
1548
1549        let live_kit = self
1550            .live_kit
1551            .as_mut()
1552            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1553        match mem::take(&mut live_kit.screen_track) {
1554            LocalTrack::None => Err(anyhow!("screen was not shared")),
1555            LocalTrack::Pending { .. } => {
1556                cx.notify();
1557                Ok(())
1558            }
1559            LocalTrack::Published {
1560                track_publication, ..
1561            } => {
1562                #[cfg(not(target_os = "windows"))]
1563                {
1564                    let local_participant = live_kit.room.local_participant();
1565                    let sid = track_publication.sid();
1566                    cx.background_executor()
1567                        .spawn(async move { local_participant.unpublish_track(&sid).await })
1568                        .detach_and_log_err(cx);
1569                    cx.notify();
1570                }
1571                Audio::play_sound(Sound::StopScreenshare, cx);
1572                Ok(())
1573            }
1574        }
1575    }
1576
1577    fn set_deafened(&mut self, deafened: bool, cx: &mut ModelContext<Self>) -> Option<()> {
1578        #[cfg(not(target_os = "windows"))]
1579        {
1580            let live_kit = self.live_kit.as_mut()?;
1581            cx.notify();
1582            for (_, participant) in live_kit.room.remote_participants() {
1583                for (_, publication) in participant.track_publications() {
1584                    if publication.kind() == TrackKind::Audio {
1585                        publication.set_enabled(!deafened);
1586                    }
1587                }
1588            }
1589        }
1590
1591        None
1592    }
1593
1594    fn set_mute(
1595        &mut self,
1596        should_mute: bool,
1597        cx: &mut ModelContext<Room>,
1598    ) -> Option<Task<Result<()>>> {
1599        let live_kit = self.live_kit.as_mut()?;
1600        cx.notify();
1601
1602        if should_mute {
1603            Audio::play_sound(Sound::Mute, cx);
1604        } else {
1605            Audio::play_sound(Sound::Unmute, cx);
1606        }
1607
1608        match &mut live_kit.microphone_track {
1609            LocalTrack::None => {
1610                if should_mute {
1611                    None
1612                } else {
1613                    Some(self.share_microphone(cx))
1614                }
1615            }
1616            LocalTrack::Pending { .. } => None,
1617            LocalTrack::Published {
1618                track_publication, ..
1619            } => {
1620                #[cfg(not(target_os = "windows"))]
1621                {
1622                    if should_mute {
1623                        track_publication.mute()
1624                    } else {
1625                        track_publication.unmute()
1626                    }
1627                }
1628                None
1629            }
1630        }
1631    }
1632}
1633
1634#[cfg(target_os = "windows")]
1635fn spawn_room_connection(
1636    livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
1637    cx: &mut ModelContext<'_, Room>,
1638) {
1639}
1640
1641#[cfg(not(target_os = "windows"))]
1642fn spawn_room_connection(
1643    livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
1644    cx: &mut ModelContext<'_, Room>,
1645) {
1646    if let Some(connection_info) = livekit_connection_info {
1647        cx.spawn(|this, mut cx| async move {
1648            let (room, mut events) = livekit::Room::connect(
1649                &connection_info.server_url,
1650                &connection_info.token,
1651                RoomOptions::default(),
1652            )
1653            .await?;
1654
1655            this.update(&mut cx, |this, cx| {
1656                let _handle_updates = cx.spawn(|this, mut cx| async move {
1657                    while let Some(event) = events.recv().await {
1658                        if this
1659                            .update(&mut cx, |this, cx| {
1660                                this.livekit_room_updated(event, cx).warn_on_err();
1661                            })
1662                            .is_err()
1663                        {
1664                            break;
1665                        }
1666                    }
1667                });
1668
1669                let muted_by_user = Room::mute_on_join(cx);
1670                this.live_kit = Some(LiveKitRoom {
1671                    room: Arc::new(room),
1672                    screen_track: LocalTrack::None,
1673                    microphone_track: LocalTrack::None,
1674                    next_publish_id: 0,
1675                    muted_by_user,
1676                    deafened: false,
1677                    speaking: false,
1678                    _handle_updates,
1679                });
1680
1681                if !muted_by_user && this.can_use_microphone(cx) {
1682                    this.share_microphone(cx)
1683                } else {
1684                    Task::ready(Ok(()))
1685                }
1686            })?
1687            .await
1688        })
1689        .detach_and_log_err(cx);
1690    }
1691}
1692
1693struct LiveKitRoom {
1694    room: Arc<livekit::Room>,
1695    screen_track: LocalTrack,
1696    microphone_track: LocalTrack,
1697    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1698    muted_by_user: bool,
1699    deafened: bool,
1700    speaking: bool,
1701    next_publish_id: usize,
1702    _handle_updates: Task<()>,
1703}
1704
1705impl LiveKitRoom {
1706    #[cfg(target_os = "windows")]
1707    fn stop_publishing(&mut self, _cx: &mut ModelContext<Room>) {}
1708
1709    #[cfg(not(target_os = "windows"))]
1710    fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1711        let mut tracks_to_unpublish = Vec::new();
1712        if let LocalTrack::Published {
1713            track_publication, ..
1714        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1715        {
1716            tracks_to_unpublish.push(track_publication.sid());
1717            cx.notify();
1718        }
1719
1720        if let LocalTrack::Published {
1721            track_publication, ..
1722        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1723        {
1724            tracks_to_unpublish.push(track_publication.sid());
1725            cx.notify();
1726        }
1727
1728        let participant = self.room.local_participant();
1729        cx.background_executor()
1730            .spawn(async move {
1731                for sid in tracks_to_unpublish {
1732                    participant.unpublish_track(&sid).await.log_err();
1733                }
1734            })
1735            .detach();
1736    }
1737}
1738
1739enum LocalTrack {
1740    None,
1741    Pending {
1742        publish_id: usize,
1743    },
1744    Published {
1745        track_publication: LocalTrackPublication,
1746        _stream: Box<dyn Any>,
1747    },
1748}
1749
1750impl Default for LocalTrack {
1751    fn default() -> Self {
1752        Self::None
1753    }
1754}
1755
1756#[derive(Copy, Clone, PartialEq, Eq)]
1757pub enum RoomStatus {
1758    Online,
1759    Rejoining,
1760    Offline,
1761}
1762
1763impl RoomStatus {
1764    pub fn is_offline(&self) -> bool {
1765        matches!(self, RoomStatus::Offline)
1766    }
1767
1768    pub fn is_online(&self) -> bool {
1769        matches!(self, RoomStatus::Online)
1770    }
1771}