room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    Left,
  48}
  49
  50pub struct Room {
  51    id: u64,
  52    channel_id: Option<u64>,
  53    live_kit: Option<LiveKitRoom>,
  54    status: RoomStatus,
  55    shared_projects: HashSet<WeakModelHandle<Project>>,
  56    joined_projects: HashSet<WeakModelHandle<Project>>,
  57    local_participant: LocalParticipant,
  58    remote_participants: BTreeMap<u64, RemoteParticipant>,
  59    pending_participants: Vec<Arc<User>>,
  60    participant_user_ids: HashSet<u64>,
  61    pending_call_count: usize,
  62    leave_when_empty: bool,
  63    client: Arc<Client>,
  64    user_store: ModelHandle<UserStore>,
  65    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  66    subscriptions: Vec<client::Subscription>,
  67    pending_room_update: Option<Task<()>>,
  68    maintain_connection: Option<Task<Option<()>>>,
  69}
  70
  71impl Entity for Room {
  72    type Event = Event;
  73
  74    fn release(&mut self, cx: &mut AppContext) {
  75        if self.status.is_online() {
  76            self.leave_internal(cx).detach_and_log_err(cx);
  77        }
  78    }
  79
  80    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  81        if self.status.is_online() {
  82            let leave = self.leave_internal(cx);
  83            Some(
  84                cx.background()
  85                    .spawn(async move {
  86                        leave.await.log_err();
  87                    })
  88                    .boxed(),
  89            )
  90        } else {
  91            None
  92        }
  93    }
  94}
  95
  96impl Room {
  97    pub fn channel_id(&self) -> Option<u64> {
  98        self.channel_id
  99    }
 100
 101    #[cfg(any(test, feature = "test-support"))]
 102    pub fn is_connected(&self) -> bool {
 103        if let Some(live_kit) = self.live_kit.as_ref() {
 104            matches!(
 105                *live_kit.room.status().borrow(),
 106                live_kit_client::ConnectionState::Connected { .. }
 107            )
 108        } else {
 109            false
 110        }
 111    }
 112
 113    fn new(
 114        id: u64,
 115        channel_id: Option<u64>,
 116        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 117        client: Arc<Client>,
 118        user_store: ModelHandle<UserStore>,
 119        cx: &mut ModelContext<Self>,
 120    ) -> Self {
 121        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 122            let room = live_kit_client::Room::new();
 123            let mut status = room.status();
 124            // Consume the initial status of the room.
 125            let _ = status.try_recv();
 126            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 127                while let Some(status) = status.next().await {
 128                    let this = if let Some(this) = this.upgrade(&cx) {
 129                        this
 130                    } else {
 131                        break;
 132                    };
 133
 134                    if status == live_kit_client::ConnectionState::Disconnected {
 135                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 136                        break;
 137                    }
 138                }
 139            });
 140
 141            let mut track_video_changes = room.remote_video_track_updates();
 142            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 143                while let Some(track_change) = track_video_changes.next().await {
 144                    let this = if let Some(this) = this.upgrade(&cx) {
 145                        this
 146                    } else {
 147                        break;
 148                    };
 149
 150                    this.update(&mut cx, |this, cx| {
 151                        this.remote_video_track_updated(track_change, cx).log_err()
 152                    });
 153                }
 154            });
 155
 156            let mut track_audio_changes = room.remote_audio_track_updates();
 157            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 158                while let Some(track_change) = track_audio_changes.next().await {
 159                    let this = if let Some(this) = this.upgrade(&cx) {
 160                        this
 161                    } else {
 162                        break;
 163                    };
 164
 165                    this.update(&mut cx, |this, cx| {
 166                        this.remote_audio_track_updated(track_change, cx).log_err()
 167                    });
 168                }
 169            });
 170
 171            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 172            cx.spawn(|this, mut cx| async move {
 173                connect.await?;
 174
 175                if !cx.read(|cx| settings::get::<CallSettings>(cx).mute_on_join) {
 176                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 177                        .await?;
 178                }
 179
 180                anyhow::Ok(())
 181            })
 182            .detach_and_log_err(cx);
 183
 184            Some(LiveKitRoom {
 185                room,
 186                screen_track: LocalTrack::None,
 187                microphone_track: LocalTrack::None,
 188                next_publish_id: 0,
 189                muted_by_user: false,
 190                deafened: false,
 191                speaking: false,
 192                _maintain_room,
 193                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 194            })
 195        } else {
 196            None
 197        };
 198
 199        let maintain_connection =
 200            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 201
 202        Audio::play_sound(Sound::Joined, cx);
 203
 204        Self {
 205            id,
 206            channel_id,
 207            live_kit: live_kit_room,
 208            status: RoomStatus::Online,
 209            shared_projects: Default::default(),
 210            joined_projects: Default::default(),
 211            participant_user_ids: Default::default(),
 212            local_participant: Default::default(),
 213            remote_participants: Default::default(),
 214            pending_participants: Default::default(),
 215            pending_call_count: 0,
 216            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 217            leave_when_empty: false,
 218            pending_room_update: None,
 219            client,
 220            user_store,
 221            follows_by_leader_id_project_id: Default::default(),
 222            maintain_connection: Some(maintain_connection),
 223        }
 224    }
 225
 226    pub(crate) fn create(
 227        called_user_id: u64,
 228        initial_project: Option<ModelHandle<Project>>,
 229        client: Arc<Client>,
 230        user_store: ModelHandle<UserStore>,
 231        cx: &mut AppContext,
 232    ) -> Task<Result<ModelHandle<Self>>> {
 233        cx.spawn(|mut cx| async move {
 234            let response = client.request(proto::CreateRoom {}).await?;
 235            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 236            let room = cx.add_model(|cx| {
 237                Self::new(
 238                    room_proto.id,
 239                    None,
 240                    response.live_kit_connection_info,
 241                    client,
 242                    user_store,
 243                    cx,
 244                )
 245            });
 246
 247            let initial_project_id = if let Some(initial_project) = initial_project {
 248                let initial_project_id = room
 249                    .update(&mut cx, |room, cx| {
 250                        room.share_project(initial_project.clone(), cx)
 251                    })
 252                    .await?;
 253                Some(initial_project_id)
 254            } else {
 255                None
 256            };
 257
 258            match room
 259                .update(&mut cx, |room, cx| {
 260                    room.leave_when_empty = true;
 261                    room.call(called_user_id, initial_project_id, cx)
 262                })
 263                .await
 264            {
 265                Ok(()) => Ok(room),
 266                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 267            }
 268        })
 269    }
 270
 271    pub(crate) fn join_channel(
 272        channel_id: u64,
 273        client: Arc<Client>,
 274        user_store: ModelHandle<UserStore>,
 275        cx: &mut AppContext,
 276    ) -> Task<Result<ModelHandle<Self>>> {
 277        cx.spawn(|mut cx| async move {
 278            let response = client.request(proto::JoinChannel { channel_id }).await?;
 279            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 280            let room = cx.add_model(|cx| {
 281                Self::new(
 282                    room_proto.id,
 283                    Some(channel_id),
 284                    response.live_kit_connection_info,
 285                    client,
 286                    user_store,
 287                    cx,
 288                )
 289            });
 290
 291            room.update(&mut cx, |room, cx| {
 292                room.apply_room_update(room_proto, cx)?;
 293                anyhow::Ok(())
 294            })?;
 295
 296            Ok(room)
 297        })
 298    }
 299
 300    pub(crate) fn join(
 301        call: &IncomingCall,
 302        client: Arc<Client>,
 303        user_store: ModelHandle<UserStore>,
 304        cx: &mut AppContext,
 305    ) -> Task<Result<ModelHandle<Self>>> {
 306        let room_id = call.room_id;
 307        cx.spawn(|mut cx| async move {
 308            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 309            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 310            let room = cx.add_model(|cx| {
 311                Self::new(
 312                    room_id,
 313                    None,
 314                    response.live_kit_connection_info,
 315                    client,
 316                    user_store,
 317                    cx,
 318                )
 319            });
 320            room.update(&mut cx, |room, cx| {
 321                room.leave_when_empty = true;
 322                room.apply_room_update(room_proto, cx)?;
 323                anyhow::Ok(())
 324            })?;
 325
 326            Ok(room)
 327        })
 328    }
 329
 330    fn should_leave(&self) -> bool {
 331        self.leave_when_empty
 332            && self.pending_room_update.is_none()
 333            && self.pending_participants.is_empty()
 334            && self.remote_participants.is_empty()
 335            && self.pending_call_count == 0
 336    }
 337
 338    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 339        cx.notify();
 340        cx.emit(Event::Left);
 341        self.leave_internal(cx)
 342    }
 343
 344    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 345        if self.status.is_offline() {
 346            return Task::ready(Err(anyhow!("room is offline")));
 347        }
 348
 349        log::info!("leaving room");
 350
 351        for project in self.shared_projects.drain() {
 352            if let Some(project) = project.upgrade(cx) {
 353                project.update(cx, |project, cx| {
 354                    project.unshare(cx).log_err();
 355                });
 356            }
 357        }
 358        for project in self.joined_projects.drain() {
 359            if let Some(project) = project.upgrade(cx) {
 360                project.update(cx, |project, cx| {
 361                    project.disconnected_from_host(cx);
 362                    project.close(cx);
 363                });
 364            }
 365        }
 366
 367        Audio::play_sound(Sound::Leave, cx);
 368
 369        self.status = RoomStatus::Offline;
 370        self.remote_participants.clear();
 371        self.pending_participants.clear();
 372        self.participant_user_ids.clear();
 373        self.subscriptions.clear();
 374        self.live_kit.take();
 375        self.pending_room_update.take();
 376        self.maintain_connection.take();
 377
 378        let leave_room = self.client.request(proto::LeaveRoom {});
 379        cx.background().spawn(async move {
 380            leave_room.await?;
 381            anyhow::Ok(())
 382        })
 383    }
 384
 385    async fn maintain_connection(
 386        this: WeakModelHandle<Self>,
 387        client: Arc<Client>,
 388        mut cx: AsyncAppContext,
 389    ) -> Result<()> {
 390        let mut client_status = client.status();
 391        loop {
 392            let _ = client_status.try_recv();
 393            let is_connected = client_status.borrow().is_connected();
 394            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 395            if !is_connected || client_status.next().await.is_some() {
 396                log::info!("detected client disconnection");
 397
 398                this.upgrade(&cx)
 399                    .ok_or_else(|| anyhow!("room was dropped"))?
 400                    .update(&mut cx, |this, cx| {
 401                        this.status = RoomStatus::Rejoining;
 402                        cx.notify();
 403                    });
 404
 405                // Wait for client to re-establish a connection to the server.
 406                {
 407                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 408                    let client_reconnection = async {
 409                        let mut remaining_attempts = 3;
 410                        while remaining_attempts > 0 {
 411                            if client_status.borrow().is_connected() {
 412                                log::info!("client reconnected, attempting to rejoin room");
 413
 414                                let Some(this) = this.upgrade(&cx) else { break };
 415                                if this
 416                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 417                                    .await
 418                                    .log_err()
 419                                    .is_some()
 420                                {
 421                                    return true;
 422                                } else {
 423                                    remaining_attempts -= 1;
 424                                }
 425                            } else if client_status.borrow().is_signed_out() {
 426                                return false;
 427                            }
 428
 429                            log::info!(
 430                                "waiting for client status change, remaining attempts {}",
 431                                remaining_attempts
 432                            );
 433                            client_status.next().await;
 434                        }
 435                        false
 436                    }
 437                    .fuse();
 438                    futures::pin_mut!(client_reconnection);
 439
 440                    futures::select_biased! {
 441                        reconnected = client_reconnection => {
 442                            if reconnected {
 443                                log::info!("successfully reconnected to room");
 444                                // If we successfully joined the room, go back around the loop
 445                                // waiting for future connection status changes.
 446                                continue;
 447                            }
 448                        }
 449                        _ = reconnection_timeout => {
 450                            log::info!("room reconnection timeout expired");
 451                        }
 452                    }
 453                }
 454
 455                break;
 456            }
 457        }
 458
 459        // The client failed to re-establish a connection to the server
 460        // or an error occurred while trying to re-join the room. Either way
 461        // we leave the room and return an error.
 462        if let Some(this) = this.upgrade(&cx) {
 463            log::info!("reconnection failed, leaving room");
 464            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 465        }
 466        Err(anyhow!(
 467            "can't reconnect to room: client failed to re-establish connection"
 468        ))
 469    }
 470
 471    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 472        let mut projects = HashMap::default();
 473        let mut reshared_projects = Vec::new();
 474        let mut rejoined_projects = Vec::new();
 475        self.shared_projects.retain(|project| {
 476            if let Some(handle) = project.upgrade(cx) {
 477                let project = handle.read(cx);
 478                if let Some(project_id) = project.remote_id() {
 479                    projects.insert(project_id, handle.clone());
 480                    reshared_projects.push(proto::UpdateProject {
 481                        project_id,
 482                        worktrees: project.worktree_metadata_protos(cx),
 483                    });
 484                    return true;
 485                }
 486            }
 487            false
 488        });
 489        self.joined_projects.retain(|project| {
 490            if let Some(handle) = project.upgrade(cx) {
 491                let project = handle.read(cx);
 492                if let Some(project_id) = project.remote_id() {
 493                    projects.insert(project_id, handle.clone());
 494                    rejoined_projects.push(proto::RejoinProject {
 495                        id: project_id,
 496                        worktrees: project
 497                            .worktrees(cx)
 498                            .map(|worktree| {
 499                                let worktree = worktree.read(cx);
 500                                proto::RejoinWorktree {
 501                                    id: worktree.id().to_proto(),
 502                                    scan_id: worktree.completed_scan_id() as u64,
 503                                }
 504                            })
 505                            .collect(),
 506                    });
 507                }
 508                return true;
 509            }
 510            false
 511        });
 512
 513        let response = self.client.request_envelope(proto::RejoinRoom {
 514            id: self.id,
 515            reshared_projects,
 516            rejoined_projects,
 517        });
 518
 519        cx.spawn(|this, mut cx| async move {
 520            let response = response.await?;
 521            let message_id = response.message_id;
 522            let response = response.payload;
 523            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 524            this.update(&mut cx, |this, cx| {
 525                this.status = RoomStatus::Online;
 526                this.apply_room_update(room_proto, cx)?;
 527
 528                for reshared_project in response.reshared_projects {
 529                    if let Some(project) = projects.get(&reshared_project.id) {
 530                        project.update(cx, |project, cx| {
 531                            project.reshared(reshared_project, cx).log_err();
 532                        });
 533                    }
 534                }
 535
 536                for rejoined_project in response.rejoined_projects {
 537                    if let Some(project) = projects.get(&rejoined_project.id) {
 538                        project.update(cx, |project, cx| {
 539                            project.rejoined(rejoined_project, message_id, cx).log_err();
 540                        });
 541                    }
 542                }
 543
 544                anyhow::Ok(())
 545            })
 546        })
 547    }
 548
 549    pub fn id(&self) -> u64 {
 550        self.id
 551    }
 552
 553    pub fn status(&self) -> RoomStatus {
 554        self.status
 555    }
 556
 557    pub fn local_participant(&self) -> &LocalParticipant {
 558        &self.local_participant
 559    }
 560
 561    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 562        &self.remote_participants
 563    }
 564
 565    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 566        self.remote_participants
 567            .values()
 568            .find(|p| p.peer_id == peer_id)
 569    }
 570
 571    pub fn pending_participants(&self) -> &[Arc<User>] {
 572        &self.pending_participants
 573    }
 574
 575    pub fn contains_participant(&self, user_id: u64) -> bool {
 576        self.participant_user_ids.contains(&user_id)
 577    }
 578
 579    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 580        self.follows_by_leader_id_project_id
 581            .get(&(leader_id, project_id))
 582            .map_or(&[], |v| v.as_slice())
 583    }
 584
 585    async fn handle_room_updated(
 586        this: ModelHandle<Self>,
 587        envelope: TypedEnvelope<proto::RoomUpdated>,
 588        _: Arc<Client>,
 589        mut cx: AsyncAppContext,
 590    ) -> Result<()> {
 591        let room = envelope
 592            .payload
 593            .room
 594            .ok_or_else(|| anyhow!("invalid room"))?;
 595        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 596    }
 597
 598    fn apply_room_update(
 599        &mut self,
 600        mut room: proto::Room,
 601        cx: &mut ModelContext<Self>,
 602    ) -> Result<()> {
 603        // Filter ourselves out from the room's participants.
 604        let local_participant_ix = room
 605            .participants
 606            .iter()
 607            .position(|participant| Some(participant.user_id) == self.client.user_id());
 608        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 609
 610        let pending_participant_user_ids = room
 611            .pending_participants
 612            .iter()
 613            .map(|p| p.user_id)
 614            .collect::<Vec<_>>();
 615
 616        let remote_participant_user_ids = room
 617            .participants
 618            .iter()
 619            .map(|p| p.user_id)
 620            .collect::<Vec<_>>();
 621
 622        let (remote_participants, pending_participants) =
 623            self.user_store.update(cx, move |user_store, cx| {
 624                (
 625                    user_store.get_users(remote_participant_user_ids, cx),
 626                    user_store.get_users(pending_participant_user_ids, cx),
 627                )
 628            });
 629
 630        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 631            let (remote_participants, pending_participants) =
 632                futures::join!(remote_participants, pending_participants);
 633
 634            this.update(&mut cx, |this, cx| {
 635                this.participant_user_ids.clear();
 636
 637                if let Some(participant) = local_participant {
 638                    this.local_participant.projects = participant.projects;
 639                } else {
 640                    this.local_participant.projects.clear();
 641                }
 642
 643                if let Some(participants) = remote_participants.log_err() {
 644                    for (participant, user) in room.participants.into_iter().zip(participants) {
 645                        let Some(peer_id) = participant.peer_id else { continue };
 646                        this.participant_user_ids.insert(participant.user_id);
 647
 648                        let old_projects = this
 649                            .remote_participants
 650                            .get(&participant.user_id)
 651                            .into_iter()
 652                            .flat_map(|existing| &existing.projects)
 653                            .map(|project| project.id)
 654                            .collect::<HashSet<_>>();
 655                        let new_projects = participant
 656                            .projects
 657                            .iter()
 658                            .map(|project| project.id)
 659                            .collect::<HashSet<_>>();
 660
 661                        for project in &participant.projects {
 662                            if !old_projects.contains(&project.id) {
 663                                cx.emit(Event::RemoteProjectShared {
 664                                    owner: user.clone(),
 665                                    project_id: project.id,
 666                                    worktree_root_names: project.worktree_root_names.clone(),
 667                                });
 668                            }
 669                        }
 670
 671                        for unshared_project_id in old_projects.difference(&new_projects) {
 672                            this.joined_projects.retain(|project| {
 673                                if let Some(project) = project.upgrade(cx) {
 674                                    project.update(cx, |project, cx| {
 675                                        if project.remote_id() == Some(*unshared_project_id) {
 676                                            project.disconnected_from_host(cx);
 677                                            false
 678                                        } else {
 679                                            true
 680                                        }
 681                                    })
 682                                } else {
 683                                    false
 684                                }
 685                            });
 686                            cx.emit(Event::RemoteProjectUnshared {
 687                                project_id: *unshared_project_id,
 688                            });
 689                        }
 690
 691                        let location = ParticipantLocation::from_proto(participant.location)
 692                            .unwrap_or(ParticipantLocation::External);
 693                        if let Some(remote_participant) =
 694                            this.remote_participants.get_mut(&participant.user_id)
 695                        {
 696                            remote_participant.projects = participant.projects;
 697                            remote_participant.peer_id = peer_id;
 698                            if location != remote_participant.location {
 699                                remote_participant.location = location;
 700                                cx.emit(Event::ParticipantLocationChanged {
 701                                    participant_id: peer_id,
 702                                });
 703                            }
 704                        } else {
 705                            this.remote_participants.insert(
 706                                participant.user_id,
 707                                RemoteParticipant {
 708                                    user: user.clone(),
 709                                    peer_id,
 710                                    projects: participant.projects,
 711                                    location,
 712                                    muted: true,
 713                                    speaking: false,
 714                                    video_tracks: Default::default(),
 715                                    audio_tracks: Default::default(),
 716                                },
 717                            );
 718
 719                            Audio::play_sound(Sound::Joined, cx);
 720
 721                            if let Some(live_kit) = this.live_kit.as_ref() {
 722                                let video_tracks =
 723                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 724                                let audio_tracks =
 725                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 726                                let publications = live_kit
 727                                    .room
 728                                    .remote_audio_track_publications(&user.id.to_string());
 729
 730                                for track in video_tracks {
 731                                    this.remote_video_track_updated(
 732                                        RemoteVideoTrackUpdate::Subscribed(track),
 733                                        cx,
 734                                    )
 735                                    .log_err();
 736                                }
 737
 738                                for (track, publication) in
 739                                    audio_tracks.iter().zip(publications.iter())
 740                                {
 741                                    this.remote_audio_track_updated(
 742                                        RemoteAudioTrackUpdate::Subscribed(
 743                                            track.clone(),
 744                                            publication.clone(),
 745                                        ),
 746                                        cx,
 747                                    )
 748                                    .log_err();
 749                                }
 750                            }
 751                        }
 752                    }
 753
 754                    this.remote_participants.retain(|user_id, participant| {
 755                        if this.participant_user_ids.contains(user_id) {
 756                            true
 757                        } else {
 758                            for project in &participant.projects {
 759                                cx.emit(Event::RemoteProjectUnshared {
 760                                    project_id: project.id,
 761                                });
 762                            }
 763                            false
 764                        }
 765                    });
 766                }
 767
 768                if let Some(pending_participants) = pending_participants.log_err() {
 769                    this.pending_participants = pending_participants;
 770                    for participant in &this.pending_participants {
 771                        this.participant_user_ids.insert(participant.id);
 772                    }
 773                }
 774
 775                this.follows_by_leader_id_project_id.clear();
 776                for follower in room.followers {
 777                    let project_id = follower.project_id;
 778                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 779                        (Some(leader), Some(follower)) => (leader, follower),
 780
 781                        _ => {
 782                            log::error!("Follower message {follower:?} missing some state");
 783                            continue;
 784                        }
 785                    };
 786
 787                    let list = this
 788                        .follows_by_leader_id_project_id
 789                        .entry((leader, project_id))
 790                        .or_insert(Vec::new());
 791                    if !list.contains(&follower) {
 792                        list.push(follower);
 793                    }
 794                }
 795
 796                this.pending_room_update.take();
 797                if this.should_leave() {
 798                    log::info!("room is empty, leaving");
 799                    let _ = this.leave(cx);
 800                }
 801
 802                this.check_invariants();
 803                cx.notify();
 804            });
 805        }));
 806
 807        cx.notify();
 808        Ok(())
 809    }
 810
 811    fn remote_video_track_updated(
 812        &mut self,
 813        change: RemoteVideoTrackUpdate,
 814        cx: &mut ModelContext<Self>,
 815    ) -> Result<()> {
 816        match change {
 817            RemoteVideoTrackUpdate::Subscribed(track) => {
 818                let user_id = track.publisher_id().parse()?;
 819                let track_id = track.sid().to_string();
 820                let participant = self
 821                    .remote_participants
 822                    .get_mut(&user_id)
 823                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 824                participant.video_tracks.insert(
 825                    track_id.clone(),
 826                    Arc::new(RemoteVideoTrack {
 827                        live_kit_track: track,
 828                    }),
 829                );
 830                cx.emit(Event::RemoteVideoTracksChanged {
 831                    participant_id: participant.peer_id,
 832                });
 833            }
 834            RemoteVideoTrackUpdate::Unsubscribed {
 835                publisher_id,
 836                track_id,
 837            } => {
 838                let user_id = publisher_id.parse()?;
 839                let participant = self
 840                    .remote_participants
 841                    .get_mut(&user_id)
 842                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 843                participant.video_tracks.remove(&track_id);
 844                cx.emit(Event::RemoteVideoTracksChanged {
 845                    participant_id: participant.peer_id,
 846                });
 847            }
 848        }
 849
 850        cx.notify();
 851        Ok(())
 852    }
 853
 854    fn remote_audio_track_updated(
 855        &mut self,
 856        change: RemoteAudioTrackUpdate,
 857        cx: &mut ModelContext<Self>,
 858    ) -> Result<()> {
 859        match change {
 860            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 861                let mut speaker_ids = speakers
 862                    .into_iter()
 863                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 864                    .collect::<Vec<u64>>();
 865                speaker_ids.sort_unstable();
 866                for (sid, participant) in &mut self.remote_participants {
 867                    if let Ok(_) = speaker_ids.binary_search(sid) {
 868                        participant.speaking = true;
 869                    } else {
 870                        participant.speaking = false;
 871                    }
 872                }
 873                if let Some(id) = self.client.user_id() {
 874                    if let Some(room) = &mut self.live_kit {
 875                        if let Ok(_) = speaker_ids.binary_search(&id) {
 876                            room.speaking = true;
 877                        } else {
 878                            room.speaking = false;
 879                        }
 880                    }
 881                }
 882                cx.notify();
 883            }
 884            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 885                let mut found = false;
 886                for participant in &mut self.remote_participants.values_mut() {
 887                    for track in participant.audio_tracks.values() {
 888                        if track.sid() == track_id {
 889                            found = true;
 890                            break;
 891                        }
 892                    }
 893                    if found {
 894                        participant.muted = muted;
 895                        break;
 896                    }
 897                }
 898
 899                cx.notify();
 900            }
 901            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 902                let user_id = track.publisher_id().parse()?;
 903                let track_id = track.sid().to_string();
 904                let participant = self
 905                    .remote_participants
 906                    .get_mut(&user_id)
 907                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 908
 909                participant.audio_tracks.insert(track_id.clone(), track);
 910                participant.muted = publication.is_muted();
 911
 912                cx.emit(Event::RemoteAudioTracksChanged {
 913                    participant_id: participant.peer_id,
 914                });
 915            }
 916            RemoteAudioTrackUpdate::Unsubscribed {
 917                publisher_id,
 918                track_id,
 919            } => {
 920                let user_id = publisher_id.parse()?;
 921                let participant = self
 922                    .remote_participants
 923                    .get_mut(&user_id)
 924                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 925                participant.audio_tracks.remove(&track_id);
 926                cx.emit(Event::RemoteAudioTracksChanged {
 927                    participant_id: participant.peer_id,
 928                });
 929            }
 930        }
 931
 932        cx.notify();
 933        Ok(())
 934    }
 935
 936    fn check_invariants(&self) {
 937        #[cfg(any(test, feature = "test-support"))]
 938        {
 939            for participant in self.remote_participants.values() {
 940                assert!(self.participant_user_ids.contains(&participant.user.id));
 941                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 942            }
 943
 944            for participant in &self.pending_participants {
 945                assert!(self.participant_user_ids.contains(&participant.id));
 946                assert_ne!(participant.id, self.client.user_id().unwrap());
 947            }
 948
 949            assert_eq!(
 950                self.participant_user_ids.len(),
 951                self.remote_participants.len() + self.pending_participants.len()
 952            );
 953        }
 954    }
 955
 956    pub(crate) fn call(
 957        &mut self,
 958        called_user_id: u64,
 959        initial_project_id: Option<u64>,
 960        cx: &mut ModelContext<Self>,
 961    ) -> Task<Result<()>> {
 962        if self.status.is_offline() {
 963            return Task::ready(Err(anyhow!("room is offline")));
 964        }
 965
 966        cx.notify();
 967        let client = self.client.clone();
 968        let room_id = self.id;
 969        self.pending_call_count += 1;
 970        cx.spawn(|this, mut cx| async move {
 971            let result = client
 972                .request(proto::Call {
 973                    room_id,
 974                    called_user_id,
 975                    initial_project_id,
 976                })
 977                .await;
 978            this.update(&mut cx, |this, cx| {
 979                this.pending_call_count -= 1;
 980                if this.should_leave() {
 981                    this.leave(cx).detach_and_log_err(cx);
 982                }
 983            });
 984            result?;
 985            Ok(())
 986        })
 987    }
 988
 989    pub fn join_project(
 990        &mut self,
 991        id: u64,
 992        language_registry: Arc<LanguageRegistry>,
 993        fs: Arc<dyn Fs>,
 994        cx: &mut ModelContext<Self>,
 995    ) -> Task<Result<ModelHandle<Project>>> {
 996        let client = self.client.clone();
 997        let user_store = self.user_store.clone();
 998        cx.spawn(|this, mut cx| async move {
 999            let project =
1000                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1001
1002            this.update(&mut cx, |this, cx| {
1003                this.joined_projects.retain(|project| {
1004                    if let Some(project) = project.upgrade(cx) {
1005                        !project.read(cx).is_read_only()
1006                    } else {
1007                        false
1008                    }
1009                });
1010                this.joined_projects.insert(project.downgrade());
1011            });
1012            Ok(project)
1013        })
1014    }
1015
1016    pub(crate) fn share_project(
1017        &mut self,
1018        project: ModelHandle<Project>,
1019        cx: &mut ModelContext<Self>,
1020    ) -> Task<Result<u64>> {
1021        if let Some(project_id) = project.read(cx).remote_id() {
1022            return Task::ready(Ok(project_id));
1023        }
1024
1025        let request = self.client.request(proto::ShareProject {
1026            room_id: self.id(),
1027            worktrees: project.read(cx).worktree_metadata_protos(cx),
1028        });
1029        cx.spawn(|this, mut cx| async move {
1030            let response = request.await?;
1031
1032            project.update(&mut cx, |project, cx| {
1033                project.shared(response.project_id, cx)
1034            })?;
1035
1036            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1037            this.update(&mut cx, |this, cx| {
1038                this.shared_projects.insert(project.downgrade());
1039                let active_project = this.local_participant.active_project.as_ref();
1040                if active_project.map_or(false, |location| *location == project) {
1041                    this.set_location(Some(&project), cx)
1042                } else {
1043                    Task::ready(Ok(()))
1044                }
1045            })
1046            .await?;
1047
1048            Ok(response.project_id)
1049        })
1050    }
1051
1052    pub(crate) fn unshare_project(
1053        &mut self,
1054        project: ModelHandle<Project>,
1055        cx: &mut ModelContext<Self>,
1056    ) -> Result<()> {
1057        let project_id = match project.read(cx).remote_id() {
1058            Some(project_id) => project_id,
1059            None => return Ok(()),
1060        };
1061
1062        self.client.send(proto::UnshareProject { project_id })?;
1063        project.update(cx, |this, cx| this.unshare(cx))
1064    }
1065
1066    pub(crate) fn set_location(
1067        &mut self,
1068        project: Option<&ModelHandle<Project>>,
1069        cx: &mut ModelContext<Self>,
1070    ) -> Task<Result<()>> {
1071        if self.status.is_offline() {
1072            return Task::ready(Err(anyhow!("room is offline")));
1073        }
1074
1075        let client = self.client.clone();
1076        let room_id = self.id;
1077        let location = if let Some(project) = project {
1078            self.local_participant.active_project = Some(project.downgrade());
1079            if let Some(project_id) = project.read(cx).remote_id() {
1080                proto::participant_location::Variant::SharedProject(
1081                    proto::participant_location::SharedProject { id: project_id },
1082                )
1083            } else {
1084                proto::participant_location::Variant::UnsharedProject(
1085                    proto::participant_location::UnsharedProject {},
1086                )
1087            }
1088        } else {
1089            self.local_participant.active_project = None;
1090            proto::participant_location::Variant::External(proto::participant_location::External {})
1091        };
1092
1093        cx.notify();
1094        cx.foreground().spawn(async move {
1095            client
1096                .request(proto::UpdateParticipantLocation {
1097                    room_id,
1098                    location: Some(proto::ParticipantLocation {
1099                        variant: Some(location),
1100                    }),
1101                })
1102                .await?;
1103            Ok(())
1104        })
1105    }
1106
1107    pub fn is_screen_sharing(&self) -> bool {
1108        self.live_kit.as_ref().map_or(false, |live_kit| {
1109            !matches!(live_kit.screen_track, LocalTrack::None)
1110        })
1111    }
1112
1113    pub fn is_sharing_mic(&self) -> bool {
1114        self.live_kit.as_ref().map_or(false, |live_kit| {
1115            !matches!(live_kit.microphone_track, LocalTrack::None)
1116        })
1117    }
1118
1119    pub fn is_muted(&self) -> bool {
1120        self.live_kit
1121            .as_ref()
1122            .and_then(|live_kit| match &live_kit.microphone_track {
1123                LocalTrack::None => Some(true),
1124                LocalTrack::Pending { muted, .. } => Some(*muted),
1125                LocalTrack::Published { muted, .. } => Some(*muted),
1126            })
1127            .unwrap_or(false)
1128    }
1129
1130    pub fn is_speaking(&self) -> bool {
1131        self.live_kit
1132            .as_ref()
1133            .map_or(false, |live_kit| live_kit.speaking)
1134    }
1135
1136    pub fn is_deafened(&self) -> Option<bool> {
1137        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1138    }
1139
1140    #[track_caller]
1141    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1142        if self.status.is_offline() {
1143            return Task::ready(Err(anyhow!("room is offline")));
1144        } else if self.is_sharing_mic() {
1145            return Task::ready(Err(anyhow!("microphone was already shared")));
1146        }
1147
1148        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1149            let publish_id = post_inc(&mut live_kit.next_publish_id);
1150            live_kit.microphone_track = LocalTrack::Pending {
1151                publish_id,
1152                muted: false,
1153            };
1154            cx.notify();
1155            publish_id
1156        } else {
1157            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1158        };
1159
1160        cx.spawn_weak(|this, mut cx| async move {
1161            let publish_track = async {
1162                let track = LocalAudioTrack::create();
1163                this.upgrade(&cx)
1164                    .ok_or_else(|| anyhow!("room was dropped"))?
1165                    .read_with(&cx, |this, _| {
1166                        this.live_kit
1167                            .as_ref()
1168                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1169                    })
1170                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1171                    .await
1172            };
1173
1174            let publication = publish_track.await;
1175            this.upgrade(&cx)
1176                .ok_or_else(|| anyhow!("room was dropped"))?
1177                .update(&mut cx, |this, cx| {
1178                    let live_kit = this
1179                        .live_kit
1180                        .as_mut()
1181                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1182
1183                    let (canceled, muted) = if let LocalTrack::Pending {
1184                        publish_id: cur_publish_id,
1185                        muted,
1186                    } = &live_kit.microphone_track
1187                    {
1188                        (*cur_publish_id != publish_id, *muted)
1189                    } else {
1190                        (true, false)
1191                    };
1192
1193                    match publication {
1194                        Ok(publication) => {
1195                            if canceled {
1196                                live_kit.room.unpublish_track(publication);
1197                            } else {
1198                                if muted {
1199                                    cx.background().spawn(publication.set_mute(muted)).detach();
1200                                }
1201                                live_kit.microphone_track = LocalTrack::Published {
1202                                    track_publication: publication,
1203                                    muted,
1204                                };
1205                                cx.notify();
1206                            }
1207                            Ok(())
1208                        }
1209                        Err(error) => {
1210                            if canceled {
1211                                Ok(())
1212                            } else {
1213                                live_kit.microphone_track = LocalTrack::None;
1214                                cx.notify();
1215                                Err(error)
1216                            }
1217                        }
1218                    }
1219                })
1220        })
1221    }
1222
1223    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1224        if self.status.is_offline() {
1225            return Task::ready(Err(anyhow!("room is offline")));
1226        } else if self.is_screen_sharing() {
1227            return Task::ready(Err(anyhow!("screen was already shared")));
1228        }
1229
1230        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1231            let publish_id = post_inc(&mut live_kit.next_publish_id);
1232            live_kit.screen_track = LocalTrack::Pending {
1233                publish_id,
1234                muted: false,
1235            };
1236            cx.notify();
1237            (live_kit.room.display_sources(), publish_id)
1238        } else {
1239            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1240        };
1241
1242        cx.spawn_weak(|this, mut cx| async move {
1243            let publish_track = async {
1244                let displays = displays.await?;
1245                let display = displays
1246                    .first()
1247                    .ok_or_else(|| anyhow!("no display found"))?;
1248                let track = LocalVideoTrack::screen_share_for_display(&display);
1249                this.upgrade(&cx)
1250                    .ok_or_else(|| anyhow!("room was dropped"))?
1251                    .read_with(&cx, |this, _| {
1252                        this.live_kit
1253                            .as_ref()
1254                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1255                    })
1256                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1257                    .await
1258            };
1259
1260            let publication = publish_track.await;
1261            this.upgrade(&cx)
1262                .ok_or_else(|| anyhow!("room was dropped"))?
1263                .update(&mut cx, |this, cx| {
1264                    let live_kit = this
1265                        .live_kit
1266                        .as_mut()
1267                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1268
1269                    let (canceled, muted) = if let LocalTrack::Pending {
1270                        publish_id: cur_publish_id,
1271                        muted,
1272                    } = &live_kit.screen_track
1273                    {
1274                        (*cur_publish_id != publish_id, *muted)
1275                    } else {
1276                        (true, false)
1277                    };
1278
1279                    match publication {
1280                        Ok(publication) => {
1281                            if canceled {
1282                                live_kit.room.unpublish_track(publication);
1283                            } else {
1284                                if muted {
1285                                    cx.background().spawn(publication.set_mute(muted)).detach();
1286                                }
1287                                live_kit.screen_track = LocalTrack::Published {
1288                                    track_publication: publication,
1289                                    muted,
1290                                };
1291                                cx.notify();
1292                            }
1293
1294                            Audio::play_sound(Sound::StartScreenshare, cx);
1295
1296                            Ok(())
1297                        }
1298                        Err(error) => {
1299                            if canceled {
1300                                Ok(())
1301                            } else {
1302                                live_kit.screen_track = LocalTrack::None;
1303                                cx.notify();
1304                                Err(error)
1305                            }
1306                        }
1307                    }
1308                })
1309        })
1310    }
1311
1312    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1313        let should_mute = !self.is_muted();
1314        if let Some(live_kit) = self.live_kit.as_mut() {
1315            if matches!(live_kit.microphone_track, LocalTrack::None) {
1316                return Ok(self.share_microphone(cx));
1317            }
1318
1319            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1320            live_kit.muted_by_user = should_mute;
1321
1322            if old_muted == true && live_kit.deafened == true {
1323                if let Some(task) = self.toggle_deafen(cx).ok() {
1324                    task.detach();
1325                }
1326            }
1327
1328            Ok(ret_task)
1329        } else {
1330            Err(anyhow!("LiveKit not started"))
1331        }
1332    }
1333
1334    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1335        if let Some(live_kit) = self.live_kit.as_mut() {
1336            (*live_kit).deafened = !live_kit.deafened;
1337
1338            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1339            // Context notification is sent within set_mute itself.
1340            let mut mute_task = None;
1341            // When deafening, mute user's mic as well.
1342            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1343            if live_kit.deafened || !live_kit.muted_by_user {
1344                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1345            };
1346            for participant in self.remote_participants.values() {
1347                for track in live_kit
1348                    .room
1349                    .remote_audio_track_publications(&participant.user.id.to_string())
1350                {
1351                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1352                }
1353            }
1354
1355            Ok(cx.foreground().spawn(async move {
1356                if let Some(mute_task) = mute_task {
1357                    mute_task.await?;
1358                }
1359                for task in tasks {
1360                    task.await?;
1361                }
1362                Ok(())
1363            }))
1364        } else {
1365            Err(anyhow!("LiveKit not started"))
1366        }
1367    }
1368
1369    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1370        if self.status.is_offline() {
1371            return Err(anyhow!("room is offline"));
1372        }
1373
1374        let live_kit = self
1375            .live_kit
1376            .as_mut()
1377            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1378        match mem::take(&mut live_kit.screen_track) {
1379            LocalTrack::None => Err(anyhow!("screen was not shared")),
1380            LocalTrack::Pending { .. } => {
1381                cx.notify();
1382                Ok(())
1383            }
1384            LocalTrack::Published {
1385                track_publication, ..
1386            } => {
1387                live_kit.room.unpublish_track(track_publication);
1388                cx.notify();
1389
1390                Audio::play_sound(Sound::StopScreenshare, cx);
1391                Ok(())
1392            }
1393        }
1394    }
1395
1396    #[cfg(any(test, feature = "test-support"))]
1397    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1398        self.live_kit
1399            .as_ref()
1400            .unwrap()
1401            .room
1402            .set_display_sources(sources);
1403    }
1404}
1405
1406struct LiveKitRoom {
1407    room: Arc<live_kit_client::Room>,
1408    screen_track: LocalTrack,
1409    microphone_track: LocalTrack,
1410    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1411    muted_by_user: bool,
1412    deafened: bool,
1413    speaking: bool,
1414    next_publish_id: usize,
1415    _maintain_room: Task<()>,
1416    _maintain_tracks: [Task<()>; 2],
1417}
1418
1419impl LiveKitRoom {
1420    fn set_mute(
1421        self: &mut LiveKitRoom,
1422        should_mute: bool,
1423        cx: &mut ModelContext<Room>,
1424    ) -> Result<(Task<Result<()>>, bool)> {
1425        if !should_mute {
1426            // clear user muting state.
1427            self.muted_by_user = false;
1428        }
1429
1430        let (result, old_muted) = match &mut self.microphone_track {
1431            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1432            LocalTrack::Pending { muted, .. } => {
1433                let old_muted = *muted;
1434                *muted = should_mute;
1435                cx.notify();
1436                Ok((Task::Ready(Some(Ok(()))), old_muted))
1437            }
1438            LocalTrack::Published {
1439                track_publication,
1440                muted,
1441            } => {
1442                let old_muted = *muted;
1443                *muted = should_mute;
1444                cx.notify();
1445                Ok((
1446                    cx.background().spawn(track_publication.set_mute(*muted)),
1447                    old_muted,
1448                ))
1449            }
1450        }?;
1451
1452        if old_muted != should_mute {
1453            if should_mute {
1454                Audio::play_sound(Sound::Mute, cx);
1455            } else {
1456                Audio::play_sound(Sound::Unmute, cx);
1457            }
1458        }
1459
1460        Ok((result, old_muted))
1461    }
1462}
1463
1464enum LocalTrack {
1465    None,
1466    Pending {
1467        publish_id: usize,
1468        muted: bool,
1469    },
1470    Published {
1471        track_publication: LocalTrackPublication,
1472        muted: bool,
1473    },
1474}
1475
1476impl Default for LocalTrack {
1477    fn default() -> Self {
1478        Self::None
1479    }
1480}
1481
1482#[derive(Copy, Clone, PartialEq, Eq)]
1483pub enum RoomStatus {
1484    Online,
1485    Rejoining,
1486    Offline,
1487}
1488
1489impl RoomStatus {
1490    pub fn is_offline(&self) -> bool {
1491        matches!(self, RoomStatus::Offline)
1492    }
1493
1494    pub fn is_online(&self) -> bool {
1495        matches!(self, RoomStatus::Online)
1496    }
1497}