1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio2::{Audio, Sound};
8use client2::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs2::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui2::{
16 AppContext, AsyncAppContext, Context, EventEmitter, Handle, ModelContext, Task, WeakHandle,
17};
18use language2::LanguageRegistry;
19use live_kit_client::{
20 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
21 RemoteVideoTrackUpdate,
22};
23use postage::{sink::Sink, stream::Stream, watch};
24use project2::Project;
25use settings2::Settings;
26use std::{future::Future, mem, sync::Arc, time::Duration};
27use util::{post_inc, ResultExt, TryFutureExt};
28
29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub enum Event {
33 ParticipantLocationChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteVideoTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteAudioTracksChanged {
40 participant_id: proto::PeerId,
41 },
42 RemoteProjectShared {
43 owner: Arc<User>,
44 project_id: u64,
45 worktree_root_names: Vec<String>,
46 },
47 RemoteProjectUnshared {
48 project_id: u64,
49 },
50 RemoteProjectJoined {
51 project_id: u64,
52 },
53 RemoteProjectInvitationDiscarded {
54 project_id: u64,
55 },
56 Left,
57}
58
59pub struct Room {
60 id: u64,
61 channel_id: Option<u64>,
62 live_kit: Option<LiveKitRoom>,
63 status: RoomStatus,
64 shared_projects: HashSet<WeakHandle<Project>>,
65 joined_projects: HashSet<WeakHandle<Project>>,
66 local_participant: LocalParticipant,
67 remote_participants: BTreeMap<u64, RemoteParticipant>,
68 pending_participants: Vec<Arc<User>>,
69 participant_user_ids: HashSet<u64>,
70 pending_call_count: usize,
71 leave_when_empty: bool,
72 client: Arc<Client>,
73 user_store: Handle<UserStore>,
74 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
75 client_subscriptions: Vec<client2::Subscription>,
76 _subscriptions: Vec<gpui2::Subscription>,
77 room_update_completed_tx: watch::Sender<Option<()>>,
78 room_update_completed_rx: watch::Receiver<Option<()>>,
79 pending_room_update: Option<Task<()>>,
80 maintain_connection: Option<Task<Option<()>>>,
81}
82
83impl EventEmitter for Room {
84 type Event = Event;
85}
86
87impl Room {
88 pub fn channel_id(&self) -> Option<u64> {
89 self.channel_id
90 }
91
92 pub fn is_sharing_project(&self) -> bool {
93 !self.shared_projects.is_empty()
94 }
95
96 #[cfg(any(test, feature = "test-support"))]
97 pub fn is_connected(&self) -> bool {
98 if let Some(live_kit) = self.live_kit.as_ref() {
99 matches!(
100 *live_kit.room.status().borrow(),
101 live_kit_client::ConnectionState::Connected { .. }
102 )
103 } else {
104 false
105 }
106 }
107
108 fn new(
109 id: u64,
110 channel_id: Option<u64>,
111 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
112 client: Arc<Client>,
113 user_store: Handle<UserStore>,
114 cx: &mut ModelContext<Self>,
115 ) -> Self {
116 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
117 let room = live_kit_client::Room::new();
118 let mut status = room.status();
119 // Consume the initial status of the room.
120 let _ = status.try_recv();
121 let _maintain_room = cx.spawn(|this, mut cx| async move {
122 while let Some(status) = status.next().await {
123 let this = if let Some(this) = this.upgrade() {
124 this
125 } else {
126 break;
127 };
128
129 if status == live_kit_client::ConnectionState::Disconnected {
130 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
131 .ok();
132 break;
133 }
134 }
135 });
136
137 let mut track_video_changes = room.remote_video_track_updates();
138 let _maintain_video_tracks = cx.spawn(|this, mut cx| async move {
139 while let Some(track_change) = track_video_changes.next().await {
140 let this = if let Some(this) = this.upgrade() {
141 this
142 } else {
143 break;
144 };
145
146 this.update(&mut cx, |this, cx| {
147 this.remote_video_track_updated(track_change, cx).log_err()
148 })
149 .ok();
150 }
151 });
152
153 let mut track_audio_changes = room.remote_audio_track_updates();
154 let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move {
155 while let Some(track_change) = track_audio_changes.next().await {
156 let this = if let Some(this) = this.upgrade() {
157 this
158 } else {
159 break;
160 };
161
162 this.update(&mut cx, |this, cx| {
163 this.remote_audio_track_updated(track_change, cx).log_err()
164 })
165 .ok();
166 }
167 });
168
169 let connect = room.connect(&connection_info.server_url, &connection_info.token);
170 cx.spawn(|this, mut cx| async move {
171 connect.await?;
172
173 if !cx.update(|cx| Self::mute_on_join(cx))? {
174 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
175 .await?;
176 }
177
178 anyhow::Ok(())
179 })
180 .detach_and_log_err(cx);
181
182 Some(LiveKitRoom {
183 room,
184 screen_track: LocalTrack::None,
185 microphone_track: LocalTrack::None,
186 next_publish_id: 0,
187 muted_by_user: false,
188 deafened: false,
189 speaking: false,
190 _maintain_room,
191 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
192 })
193 } else {
194 None
195 };
196
197 let maintain_connection = cx.spawn({
198 let client = client.clone();
199 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
200 });
201
202 Audio::play_sound(Sound::Joined, cx);
203
204 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
205
206 Self {
207 id,
208 channel_id,
209 live_kit: live_kit_room,
210 status: RoomStatus::Online,
211 shared_projects: Default::default(),
212 joined_projects: Default::default(),
213 participant_user_ids: Default::default(),
214 local_participant: Default::default(),
215 remote_participants: Default::default(),
216 pending_participants: Default::default(),
217 pending_call_count: 0,
218 client_subscriptions: vec![
219 client.add_message_handler(cx.weak_handle(), Self::handle_room_updated)
220 ],
221 _subscriptions: vec![
222 cx.on_release(Self::released),
223 cx.on_app_quit(Self::app_will_quit),
224 ],
225 leave_when_empty: false,
226 pending_room_update: None,
227 client,
228 user_store,
229 follows_by_leader_id_project_id: Default::default(),
230 maintain_connection: Some(maintain_connection),
231 room_update_completed_tx,
232 room_update_completed_rx,
233 }
234 }
235
236 pub(crate) fn create(
237 called_user_id: u64,
238 initial_project: Option<Handle<Project>>,
239 client: Arc<Client>,
240 user_store: Handle<UserStore>,
241 cx: &mut AppContext,
242 ) -> Task<Result<Handle<Self>>> {
243 cx.spawn(move |mut cx| async move {
244 let response = client.request(proto::CreateRoom {}).await?;
245 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
246 let room = cx.entity(|cx| {
247 Self::new(
248 room_proto.id,
249 None,
250 response.live_kit_connection_info,
251 client,
252 user_store,
253 cx,
254 )
255 })?;
256
257 let initial_project_id = if let Some(initial_project) = initial_project {
258 let initial_project_id = room
259 .update(&mut cx, |room, cx| {
260 room.share_project(initial_project.clone(), cx)
261 })?
262 .await?;
263 Some(initial_project_id)
264 } else {
265 None
266 };
267
268 match room
269 .update(&mut cx, |room, cx| {
270 room.leave_when_empty = true;
271 room.call(called_user_id, initial_project_id, cx)
272 })?
273 .await
274 {
275 Ok(()) => Ok(room),
276 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
277 }
278 })
279 }
280
281 pub(crate) fn join_channel(
282 channel_id: u64,
283 client: Arc<Client>,
284 user_store: Handle<UserStore>,
285 cx: &mut AppContext,
286 ) -> Task<Result<Handle<Self>>> {
287 cx.spawn(move |cx| async move {
288 Self::from_join_response(
289 client.request(proto::JoinChannel { channel_id }).await?,
290 client,
291 user_store,
292 cx,
293 )
294 })
295 }
296
297 pub(crate) fn join(
298 call: &IncomingCall,
299 client: Arc<Client>,
300 user_store: Handle<UserStore>,
301 cx: &mut AppContext,
302 ) -> Task<Result<Handle<Self>>> {
303 let id = call.room_id;
304 cx.spawn(move |cx| async move {
305 Self::from_join_response(
306 client.request(proto::JoinRoom { id }).await?,
307 client,
308 user_store,
309 cx,
310 )
311 })
312 }
313
314 fn released(&mut self, cx: &mut AppContext) {
315 if self.status.is_online() {
316 self.leave_internal(cx).detach_and_log_err(cx);
317 }
318 }
319
320 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
321 let task = if self.status.is_online() {
322 let leave = self.leave_internal(cx);
323 Some(cx.executor().spawn(async move {
324 leave.await.log_err();
325 }))
326 } else {
327 None
328 };
329
330 async move {
331 if let Some(task) = task {
332 task.await;
333 }
334 }
335 }
336
337 pub fn mute_on_join(cx: &AppContext) -> bool {
338 CallSettings::get_global(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some()
339 }
340
341 fn from_join_response(
342 response: proto::JoinRoomResponse,
343 client: Arc<Client>,
344 user_store: Handle<UserStore>,
345 mut cx: AsyncAppContext,
346 ) -> Result<Handle<Self>> {
347 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
348 let room = cx.entity(|cx| {
349 Self::new(
350 room_proto.id,
351 response.channel_id,
352 response.live_kit_connection_info,
353 client,
354 user_store,
355 cx,
356 )
357 })?;
358 room.update(&mut cx, |room, cx| {
359 room.leave_when_empty = room.channel_id.is_none();
360 room.apply_room_update(room_proto, cx)?;
361 anyhow::Ok(())
362 })??;
363 Ok(room)
364 }
365
366 fn should_leave(&self) -> bool {
367 self.leave_when_empty
368 && self.pending_room_update.is_none()
369 && self.pending_participants.is_empty()
370 && self.remote_participants.is_empty()
371 && self.pending_call_count == 0
372 }
373
374 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
375 cx.notify();
376 cx.emit(Event::Left);
377 self.leave_internal(cx)
378 }
379
380 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
381 if self.status.is_offline() {
382 return Task::ready(Err(anyhow!("room is offline")));
383 }
384
385 log::info!("leaving room");
386 Audio::play_sound(Sound::Leave, cx);
387
388 self.clear_state(cx);
389
390 let leave_room = self.client.request(proto::LeaveRoom {});
391 cx.executor().spawn(async move {
392 leave_room.await?;
393 anyhow::Ok(())
394 })
395 }
396
397 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
398 for project in self.shared_projects.drain() {
399 if let Some(project) = project.upgrade() {
400 project.update(cx, |project, cx| {
401 project.unshare(cx).log_err();
402 });
403 }
404 }
405 for project in self.joined_projects.drain() {
406 if let Some(project) = project.upgrade() {
407 project.update(cx, |project, cx| {
408 project.disconnected_from_host(cx);
409 project.close(cx);
410 });
411 }
412 }
413
414 self.status = RoomStatus::Offline;
415 self.remote_participants.clear();
416 self.pending_participants.clear();
417 self.participant_user_ids.clear();
418 self.client_subscriptions.clear();
419 self.live_kit.take();
420 self.pending_room_update.take();
421 self.maintain_connection.take();
422 }
423
424 async fn maintain_connection(
425 this: WeakHandle<Self>,
426 client: Arc<Client>,
427 mut cx: AsyncAppContext,
428 ) -> Result<()> {
429 let mut client_status = client.status();
430 loop {
431 let _ = client_status.try_recv();
432 let is_connected = client_status.borrow().is_connected();
433 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
434 if !is_connected || client_status.next().await.is_some() {
435 log::info!("detected client disconnection");
436
437 this.upgrade()
438 .ok_or_else(|| anyhow!("room was dropped"))?
439 .update(&mut cx, |this, cx| {
440 this.status = RoomStatus::Rejoining;
441 cx.notify();
442 })?;
443
444 // Wait for client to re-establish a connection to the server.
445 {
446 let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse();
447 let client_reconnection = async {
448 let mut remaining_attempts = 3;
449 while remaining_attempts > 0 {
450 if client_status.borrow().is_connected() {
451 log::info!("client reconnected, attempting to rejoin room");
452
453 let Some(this) = this.upgrade() else { break };
454 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
455 Ok(task) => {
456 if task.await.log_err().is_some() {
457 return true;
458 } else {
459 remaining_attempts -= 1;
460 }
461 }
462 Err(_app_dropped) => return false,
463 }
464 } else if client_status.borrow().is_signed_out() {
465 return false;
466 }
467
468 log::info!(
469 "waiting for client status change, remaining attempts {}",
470 remaining_attempts
471 );
472 client_status.next().await;
473 }
474 false
475 }
476 .fuse();
477 futures::pin_mut!(client_reconnection);
478
479 futures::select_biased! {
480 reconnected = client_reconnection => {
481 if reconnected {
482 log::info!("successfully reconnected to room");
483 // If we successfully joined the room, go back around the loop
484 // waiting for future connection status changes.
485 continue;
486 }
487 }
488 _ = reconnection_timeout => {
489 log::info!("room reconnection timeout expired");
490 }
491 }
492 }
493
494 break;
495 }
496 }
497
498 // The client failed to re-establish a connection to the server
499 // or an error occurred while trying to re-join the room. Either way
500 // we leave the room and return an error.
501 if let Some(this) = this.upgrade() {
502 log::info!("reconnection failed, leaving room");
503 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
504 }
505 Err(anyhow!(
506 "can't reconnect to room: client failed to re-establish connection"
507 ))
508 }
509
510 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
511 let mut projects = HashMap::default();
512 let mut reshared_projects = Vec::new();
513 let mut rejoined_projects = Vec::new();
514 self.shared_projects.retain(|project| {
515 if let Some(handle) = project.upgrade() {
516 let project = handle.read(cx);
517 if let Some(project_id) = project.remote_id() {
518 projects.insert(project_id, handle.clone());
519 reshared_projects.push(proto::UpdateProject {
520 project_id,
521 worktrees: project.worktree_metadata_protos(cx),
522 });
523 return true;
524 }
525 }
526 false
527 });
528 self.joined_projects.retain(|project| {
529 if let Some(handle) = project.upgrade() {
530 let project = handle.read(cx);
531 if let Some(project_id) = project.remote_id() {
532 projects.insert(project_id, handle.clone());
533 rejoined_projects.push(proto::RejoinProject {
534 id: project_id,
535 worktrees: project
536 .worktrees()
537 .map(|worktree| {
538 let worktree = worktree.read(cx);
539 proto::RejoinWorktree {
540 id: worktree.id().to_proto(),
541 scan_id: worktree.completed_scan_id() as u64,
542 }
543 })
544 .collect(),
545 });
546 }
547 return true;
548 }
549 false
550 });
551
552 let response = self.client.request_envelope(proto::RejoinRoom {
553 id: self.id,
554 reshared_projects,
555 rejoined_projects,
556 });
557
558 cx.spawn(|this, mut cx| async move {
559 let response = response.await?;
560 let message_id = response.message_id;
561 let response = response.payload;
562 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
563 this.update(&mut cx, |this, cx| {
564 this.status = RoomStatus::Online;
565 this.apply_room_update(room_proto, cx)?;
566
567 for reshared_project in response.reshared_projects {
568 if let Some(project) = projects.get(&reshared_project.id) {
569 project.update(cx, |project, cx| {
570 project.reshared(reshared_project, cx).log_err();
571 });
572 }
573 }
574
575 for rejoined_project in response.rejoined_projects {
576 if let Some(project) = projects.get(&rejoined_project.id) {
577 project.update(cx, |project, cx| {
578 project.rejoined(rejoined_project, message_id, cx).log_err();
579 });
580 }
581 }
582
583 anyhow::Ok(())
584 })?
585 })
586 }
587
588 pub fn id(&self) -> u64 {
589 self.id
590 }
591
592 pub fn status(&self) -> RoomStatus {
593 self.status
594 }
595
596 pub fn local_participant(&self) -> &LocalParticipant {
597 &self.local_participant
598 }
599
600 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
601 &self.remote_participants
602 }
603
604 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
605 self.remote_participants
606 .values()
607 .find(|p| p.peer_id == peer_id)
608 }
609
610 pub fn pending_participants(&self) -> &[Arc<User>] {
611 &self.pending_participants
612 }
613
614 pub fn contains_participant(&self, user_id: u64) -> bool {
615 self.participant_user_ids.contains(&user_id)
616 }
617
618 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
619 self.follows_by_leader_id_project_id
620 .get(&(leader_id, project_id))
621 .map_or(&[], |v| v.as_slice())
622 }
623
624 /// Returns the most 'active' projects, defined as most people in the project
625 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
626 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
627 for participant in self.remote_participants.values() {
628 match participant.location {
629 ParticipantLocation::SharedProject { project_id } => {
630 project_hosts_and_guest_counts
631 .entry(project_id)
632 .or_default()
633 .1 += 1;
634 }
635 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
636 }
637 for project in &participant.projects {
638 project_hosts_and_guest_counts
639 .entry(project.id)
640 .or_default()
641 .0 = Some(participant.user.id);
642 }
643 }
644
645 if let Some(user) = self.user_store.read(cx).current_user() {
646 for project in &self.local_participant.projects {
647 project_hosts_and_guest_counts
648 .entry(project.id)
649 .or_default()
650 .0 = Some(user.id);
651 }
652 }
653
654 project_hosts_and_guest_counts
655 .into_iter()
656 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
657 .max_by_key(|(_, _, guest_count)| *guest_count)
658 .map(|(id, host, _)| (id, host))
659 }
660
661 async fn handle_room_updated(
662 this: Handle<Self>,
663 envelope: TypedEnvelope<proto::RoomUpdated>,
664 _: Arc<Client>,
665 mut cx: AsyncAppContext,
666 ) -> Result<()> {
667 let room = envelope
668 .payload
669 .room
670 .ok_or_else(|| anyhow!("invalid room"))?;
671 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
672 }
673
674 fn apply_room_update(
675 &mut self,
676 mut room: proto::Room,
677 cx: &mut ModelContext<Self>,
678 ) -> Result<()> {
679 // Filter ourselves out from the room's participants.
680 let local_participant_ix = room
681 .participants
682 .iter()
683 .position(|participant| Some(participant.user_id) == self.client.user_id());
684 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
685
686 let pending_participant_user_ids = room
687 .pending_participants
688 .iter()
689 .map(|p| p.user_id)
690 .collect::<Vec<_>>();
691
692 let remote_participant_user_ids = room
693 .participants
694 .iter()
695 .map(|p| p.user_id)
696 .collect::<Vec<_>>();
697
698 let (remote_participants, pending_participants) =
699 self.user_store.update(cx, move |user_store, cx| {
700 (
701 user_store.get_users(remote_participant_user_ids, cx),
702 user_store.get_users(pending_participant_user_ids, cx),
703 )
704 });
705
706 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
707 let (remote_participants, pending_participants) =
708 futures::join!(remote_participants, pending_participants);
709
710 this.update(&mut cx, |this, cx| {
711 this.participant_user_ids.clear();
712
713 if let Some(participant) = local_participant {
714 this.local_participant.projects = participant.projects;
715 } else {
716 this.local_participant.projects.clear();
717 }
718
719 if let Some(participants) = remote_participants.log_err() {
720 for (participant, user) in room.participants.into_iter().zip(participants) {
721 let Some(peer_id) = participant.peer_id else {
722 continue;
723 };
724 let participant_index = ParticipantIndex(participant.participant_index);
725 this.participant_user_ids.insert(participant.user_id);
726
727 let old_projects = this
728 .remote_participants
729 .get(&participant.user_id)
730 .into_iter()
731 .flat_map(|existing| &existing.projects)
732 .map(|project| project.id)
733 .collect::<HashSet<_>>();
734 let new_projects = participant
735 .projects
736 .iter()
737 .map(|project| project.id)
738 .collect::<HashSet<_>>();
739
740 for project in &participant.projects {
741 if !old_projects.contains(&project.id) {
742 cx.emit(Event::RemoteProjectShared {
743 owner: user.clone(),
744 project_id: project.id,
745 worktree_root_names: project.worktree_root_names.clone(),
746 });
747 }
748 }
749
750 for unshared_project_id in old_projects.difference(&new_projects) {
751 this.joined_projects.retain(|project| {
752 if let Some(project) = project.upgrade() {
753 project.update(cx, |project, cx| {
754 if project.remote_id() == Some(*unshared_project_id) {
755 project.disconnected_from_host(cx);
756 false
757 } else {
758 true
759 }
760 })
761 } else {
762 false
763 }
764 });
765 cx.emit(Event::RemoteProjectUnshared {
766 project_id: *unshared_project_id,
767 });
768 }
769
770 let location = ParticipantLocation::from_proto(participant.location)
771 .unwrap_or(ParticipantLocation::External);
772 if let Some(remote_participant) =
773 this.remote_participants.get_mut(&participant.user_id)
774 {
775 remote_participant.peer_id = peer_id;
776 remote_participant.projects = participant.projects;
777 remote_participant.participant_index = participant_index;
778 if location != remote_participant.location {
779 remote_participant.location = location;
780 cx.emit(Event::ParticipantLocationChanged {
781 participant_id: peer_id,
782 });
783 }
784 } else {
785 this.remote_participants.insert(
786 participant.user_id,
787 RemoteParticipant {
788 user: user.clone(),
789 participant_index,
790 peer_id,
791 projects: participant.projects,
792 location,
793 muted: true,
794 speaking: false,
795 video_tracks: Default::default(),
796 audio_tracks: Default::default(),
797 },
798 );
799
800 Audio::play_sound(Sound::Joined, cx);
801
802 if let Some(live_kit) = this.live_kit.as_ref() {
803 let video_tracks =
804 live_kit.room.remote_video_tracks(&user.id.to_string());
805 let audio_tracks =
806 live_kit.room.remote_audio_tracks(&user.id.to_string());
807 let publications = live_kit
808 .room
809 .remote_audio_track_publications(&user.id.to_string());
810
811 for track in video_tracks {
812 this.remote_video_track_updated(
813 RemoteVideoTrackUpdate::Subscribed(track),
814 cx,
815 )
816 .log_err();
817 }
818
819 for (track, publication) in
820 audio_tracks.iter().zip(publications.iter())
821 {
822 this.remote_audio_track_updated(
823 RemoteAudioTrackUpdate::Subscribed(
824 track.clone(),
825 publication.clone(),
826 ),
827 cx,
828 )
829 .log_err();
830 }
831 }
832 }
833 }
834
835 this.remote_participants.retain(|user_id, participant| {
836 if this.participant_user_ids.contains(user_id) {
837 true
838 } else {
839 for project in &participant.projects {
840 cx.emit(Event::RemoteProjectUnshared {
841 project_id: project.id,
842 });
843 }
844 false
845 }
846 });
847 }
848
849 if let Some(pending_participants) = pending_participants.log_err() {
850 this.pending_participants = pending_participants;
851 for participant in &this.pending_participants {
852 this.participant_user_ids.insert(participant.id);
853 }
854 }
855
856 this.follows_by_leader_id_project_id.clear();
857 for follower in room.followers {
858 let project_id = follower.project_id;
859 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
860 (Some(leader), Some(follower)) => (leader, follower),
861
862 _ => {
863 log::error!("Follower message {follower:?} missing some state");
864 continue;
865 }
866 };
867
868 let list = this
869 .follows_by_leader_id_project_id
870 .entry((leader, project_id))
871 .or_insert(Vec::new());
872 if !list.contains(&follower) {
873 list.push(follower);
874 }
875 }
876
877 this.pending_room_update.take();
878 if this.should_leave() {
879 log::info!("room is empty, leaving");
880 let _ = this.leave(cx);
881 }
882
883 this.user_store.update(cx, |user_store, cx| {
884 let participant_indices_by_user_id = this
885 .remote_participants
886 .iter()
887 .map(|(user_id, participant)| (*user_id, participant.participant_index))
888 .collect();
889 user_store.set_participant_indices(participant_indices_by_user_id, cx);
890 });
891
892 this.check_invariants();
893 this.room_update_completed_tx.try_send(Some(())).ok();
894 cx.notify();
895 })
896 .ok();
897 }));
898
899 cx.notify();
900 Ok(())
901 }
902
903 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
904 let mut done_rx = self.room_update_completed_rx.clone();
905 async move {
906 while let Some(result) = done_rx.next().await {
907 if result.is_some() {
908 break;
909 }
910 }
911 }
912 }
913
914 fn remote_video_track_updated(
915 &mut self,
916 change: RemoteVideoTrackUpdate,
917 cx: &mut ModelContext<Self>,
918 ) -> Result<()> {
919 match change {
920 RemoteVideoTrackUpdate::Subscribed(track) => {
921 let user_id = track.publisher_id().parse()?;
922 let track_id = track.sid().to_string();
923 let participant = self
924 .remote_participants
925 .get_mut(&user_id)
926 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
927 participant.video_tracks.insert(
928 track_id.clone(),
929 Arc::new(RemoteVideoTrack {
930 live_kit_track: track,
931 }),
932 );
933 cx.emit(Event::RemoteVideoTracksChanged {
934 participant_id: participant.peer_id,
935 });
936 }
937 RemoteVideoTrackUpdate::Unsubscribed {
938 publisher_id,
939 track_id,
940 } => {
941 let user_id = publisher_id.parse()?;
942 let participant = self
943 .remote_participants
944 .get_mut(&user_id)
945 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
946 participant.video_tracks.remove(&track_id);
947 cx.emit(Event::RemoteVideoTracksChanged {
948 participant_id: participant.peer_id,
949 });
950 }
951 }
952
953 cx.notify();
954 Ok(())
955 }
956
957 fn remote_audio_track_updated(
958 &mut self,
959 change: RemoteAudioTrackUpdate,
960 cx: &mut ModelContext<Self>,
961 ) -> Result<()> {
962 match change {
963 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
964 let mut speaker_ids = speakers
965 .into_iter()
966 .filter_map(|speaker_sid| speaker_sid.parse().ok())
967 .collect::<Vec<u64>>();
968 speaker_ids.sort_unstable();
969 for (sid, participant) in &mut self.remote_participants {
970 if let Ok(_) = speaker_ids.binary_search(sid) {
971 participant.speaking = true;
972 } else {
973 participant.speaking = false;
974 }
975 }
976 if let Some(id) = self.client.user_id() {
977 if let Some(room) = &mut self.live_kit {
978 if let Ok(_) = speaker_ids.binary_search(&id) {
979 room.speaking = true;
980 } else {
981 room.speaking = false;
982 }
983 }
984 }
985 cx.notify();
986 }
987 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
988 let mut found = false;
989 for participant in &mut self.remote_participants.values_mut() {
990 for track in participant.audio_tracks.values() {
991 if track.sid() == track_id {
992 found = true;
993 break;
994 }
995 }
996 if found {
997 participant.muted = muted;
998 break;
999 }
1000 }
1001
1002 cx.notify();
1003 }
1004 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1005 let user_id = track.publisher_id().parse()?;
1006 let track_id = track.sid().to_string();
1007 let participant = self
1008 .remote_participants
1009 .get_mut(&user_id)
1010 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1011
1012 participant.audio_tracks.insert(track_id.clone(), track);
1013 participant.muted = publication.is_muted();
1014
1015 cx.emit(Event::RemoteAudioTracksChanged {
1016 participant_id: participant.peer_id,
1017 });
1018 }
1019 RemoteAudioTrackUpdate::Unsubscribed {
1020 publisher_id,
1021 track_id,
1022 } => {
1023 let user_id = publisher_id.parse()?;
1024 let participant = self
1025 .remote_participants
1026 .get_mut(&user_id)
1027 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1028 participant.audio_tracks.remove(&track_id);
1029 cx.emit(Event::RemoteAudioTracksChanged {
1030 participant_id: participant.peer_id,
1031 });
1032 }
1033 }
1034
1035 cx.notify();
1036 Ok(())
1037 }
1038
1039 fn check_invariants(&self) {
1040 #[cfg(any(test, feature = "test-support"))]
1041 {
1042 for participant in self.remote_participants.values() {
1043 assert!(self.participant_user_ids.contains(&participant.user.id));
1044 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1045 }
1046
1047 for participant in &self.pending_participants {
1048 assert!(self.participant_user_ids.contains(&participant.id));
1049 assert_ne!(participant.id, self.client.user_id().unwrap());
1050 }
1051
1052 assert_eq!(
1053 self.participant_user_ids.len(),
1054 self.remote_participants.len() + self.pending_participants.len()
1055 );
1056 }
1057 }
1058
1059 pub(crate) fn call(
1060 &mut self,
1061 called_user_id: u64,
1062 initial_project_id: Option<u64>,
1063 cx: &mut ModelContext<Self>,
1064 ) -> Task<Result<()>> {
1065 if self.status.is_offline() {
1066 return Task::ready(Err(anyhow!("room is offline")));
1067 }
1068
1069 cx.notify();
1070 let client = self.client.clone();
1071 let room_id = self.id;
1072 self.pending_call_count += 1;
1073 cx.spawn(move |this, mut cx| async move {
1074 let result = client
1075 .request(proto::Call {
1076 room_id,
1077 called_user_id,
1078 initial_project_id,
1079 })
1080 .await;
1081 this.update(&mut cx, |this, cx| {
1082 this.pending_call_count -= 1;
1083 if this.should_leave() {
1084 this.leave(cx).detach_and_log_err(cx);
1085 }
1086 })?;
1087 result?;
1088 Ok(())
1089 })
1090 }
1091
1092 pub fn join_project(
1093 &mut self,
1094 id: u64,
1095 language_registry: Arc<LanguageRegistry>,
1096 fs: Arc<dyn Fs>,
1097 cx: &mut ModelContext<Self>,
1098 ) -> Task<Result<Handle<Project>>> {
1099 let client = self.client.clone();
1100 let user_store = self.user_store.clone();
1101 cx.emit(Event::RemoteProjectJoined { project_id: id });
1102 cx.spawn(move |this, mut cx| async move {
1103 let project =
1104 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1105
1106 this.update(&mut cx, |this, cx| {
1107 this.joined_projects.retain(|project| {
1108 if let Some(project) = project.upgrade() {
1109 !project.read(cx).is_read_only()
1110 } else {
1111 false
1112 }
1113 });
1114 this.joined_projects.insert(project.downgrade());
1115 })?;
1116 Ok(project)
1117 })
1118 }
1119
1120 pub(crate) fn share_project(
1121 &mut self,
1122 project: Handle<Project>,
1123 cx: &mut ModelContext<Self>,
1124 ) -> Task<Result<u64>> {
1125 if let Some(project_id) = project.read(cx).remote_id() {
1126 return Task::ready(Ok(project_id));
1127 }
1128
1129 let request = self.client.request(proto::ShareProject {
1130 room_id: self.id(),
1131 worktrees: project.read(cx).worktree_metadata_protos(cx),
1132 });
1133 cx.spawn(|this, mut cx| async move {
1134 let response = request.await?;
1135
1136 project.update(&mut cx, |project, cx| {
1137 project.shared(response.project_id, cx)
1138 })??;
1139
1140 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1141 this.update(&mut cx, |this, cx| {
1142 this.shared_projects.insert(project.downgrade());
1143 let active_project = this.local_participant.active_project.as_ref();
1144 if active_project.map_or(false, |location| *location == project) {
1145 this.set_location(Some(&project), cx)
1146 } else {
1147 Task::ready(Ok(()))
1148 }
1149 })?
1150 .await?;
1151
1152 Ok(response.project_id)
1153 })
1154 }
1155
1156 pub(crate) fn unshare_project(
1157 &mut self,
1158 project: Handle<Project>,
1159 cx: &mut ModelContext<Self>,
1160 ) -> Result<()> {
1161 let project_id = match project.read(cx).remote_id() {
1162 Some(project_id) => project_id,
1163 None => return Ok(()),
1164 };
1165
1166 self.client.send(proto::UnshareProject { project_id })?;
1167 project.update(cx, |this, cx| this.unshare(cx))
1168 }
1169
1170 pub(crate) fn set_location(
1171 &mut self,
1172 project: Option<&Handle<Project>>,
1173 cx: &mut ModelContext<Self>,
1174 ) -> Task<Result<()>> {
1175 if self.status.is_offline() {
1176 return Task::ready(Err(anyhow!("room is offline")));
1177 }
1178
1179 let client = self.client.clone();
1180 let room_id = self.id;
1181 let location = if let Some(project) = project {
1182 self.local_participant.active_project = Some(project.downgrade());
1183 if let Some(project_id) = project.read(cx).remote_id() {
1184 proto::participant_location::Variant::SharedProject(
1185 proto::participant_location::SharedProject { id: project_id },
1186 )
1187 } else {
1188 proto::participant_location::Variant::UnsharedProject(
1189 proto::participant_location::UnsharedProject {},
1190 )
1191 }
1192 } else {
1193 self.local_participant.active_project = None;
1194 proto::participant_location::Variant::External(proto::participant_location::External {})
1195 };
1196
1197 cx.notify();
1198 cx.executor().spawn_on_main(move || async move {
1199 client
1200 .request(proto::UpdateParticipantLocation {
1201 room_id,
1202 location: Some(proto::ParticipantLocation {
1203 variant: Some(location),
1204 }),
1205 })
1206 .await?;
1207 Ok(())
1208 })
1209 }
1210
1211 pub fn is_screen_sharing(&self) -> bool {
1212 self.live_kit.as_ref().map_or(false, |live_kit| {
1213 !matches!(live_kit.screen_track, LocalTrack::None)
1214 })
1215 }
1216
1217 pub fn is_sharing_mic(&self) -> bool {
1218 self.live_kit.as_ref().map_or(false, |live_kit| {
1219 !matches!(live_kit.microphone_track, LocalTrack::None)
1220 })
1221 }
1222
1223 pub fn is_muted(&self, cx: &AppContext) -> bool {
1224 self.live_kit
1225 .as_ref()
1226 .and_then(|live_kit| match &live_kit.microphone_track {
1227 LocalTrack::None => Some(Self::mute_on_join(cx)),
1228 LocalTrack::Pending { muted, .. } => Some(*muted),
1229 LocalTrack::Published { muted, .. } => Some(*muted),
1230 })
1231 .unwrap_or(false)
1232 }
1233
1234 pub fn is_speaking(&self) -> bool {
1235 self.live_kit
1236 .as_ref()
1237 .map_or(false, |live_kit| live_kit.speaking)
1238 }
1239
1240 pub fn is_deafened(&self) -> Option<bool> {
1241 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1242 }
1243
1244 #[track_caller]
1245 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1246 if self.status.is_offline() {
1247 return Task::ready(Err(anyhow!("room is offline")));
1248 } else if self.is_sharing_mic() {
1249 return Task::ready(Err(anyhow!("microphone was already shared")));
1250 }
1251
1252 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1253 let publish_id = post_inc(&mut live_kit.next_publish_id);
1254 live_kit.microphone_track = LocalTrack::Pending {
1255 publish_id,
1256 muted: false,
1257 };
1258 cx.notify();
1259 publish_id
1260 } else {
1261 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1262 };
1263
1264 cx.spawn(move |this, mut cx| async move {
1265 let publish_track = async {
1266 let track = LocalAudioTrack::create();
1267 this.upgrade()
1268 .ok_or_else(|| anyhow!("room was dropped"))?
1269 .update(&mut cx, |this, _| {
1270 this.live_kit
1271 .as_ref()
1272 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1273 })?
1274 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1275 .await
1276 };
1277
1278 let publication = publish_track.await;
1279 this.upgrade()
1280 .ok_or_else(|| anyhow!("room was dropped"))?
1281 .update(&mut cx, |this, cx| {
1282 let live_kit = this
1283 .live_kit
1284 .as_mut()
1285 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1286
1287 let (canceled, muted) = if let LocalTrack::Pending {
1288 publish_id: cur_publish_id,
1289 muted,
1290 } = &live_kit.microphone_track
1291 {
1292 (*cur_publish_id != publish_id, *muted)
1293 } else {
1294 (true, false)
1295 };
1296
1297 match publication {
1298 Ok(publication) => {
1299 if canceled {
1300 live_kit.room.unpublish_track(publication);
1301 } else {
1302 if muted {
1303 cx.executor().spawn(publication.set_mute(muted)).detach();
1304 }
1305 live_kit.microphone_track = LocalTrack::Published {
1306 track_publication: publication,
1307 muted,
1308 };
1309 cx.notify();
1310 }
1311 Ok(())
1312 }
1313 Err(error) => {
1314 if canceled {
1315 Ok(())
1316 } else {
1317 live_kit.microphone_track = LocalTrack::None;
1318 cx.notify();
1319 Err(error)
1320 }
1321 }
1322 }
1323 })?
1324 })
1325 }
1326
1327 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1328 if self.status.is_offline() {
1329 return Task::ready(Err(anyhow!("room is offline")));
1330 } else if self.is_screen_sharing() {
1331 return Task::ready(Err(anyhow!("screen was already shared")));
1332 }
1333
1334 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1335 let publish_id = post_inc(&mut live_kit.next_publish_id);
1336 live_kit.screen_track = LocalTrack::Pending {
1337 publish_id,
1338 muted: false,
1339 };
1340 cx.notify();
1341 (live_kit.room.display_sources(), publish_id)
1342 } else {
1343 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1344 };
1345
1346 cx.spawn(move |this, mut cx| async move {
1347 let publish_track = async {
1348 let displays = displays.await?;
1349 let display = displays
1350 .first()
1351 .ok_or_else(|| anyhow!("no display found"))?;
1352 let track = LocalVideoTrack::screen_share_for_display(&display);
1353 this.upgrade()
1354 .ok_or_else(|| anyhow!("room was dropped"))?
1355 .update(&mut cx, |this, _| {
1356 this.live_kit
1357 .as_ref()
1358 .map(|live_kit| live_kit.room.publish_video_track(&track))
1359 })?
1360 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1361 .await
1362 };
1363
1364 let publication = publish_track.await;
1365 this.upgrade()
1366 .ok_or_else(|| anyhow!("room was dropped"))?
1367 .update(&mut cx, |this, cx| {
1368 let live_kit = this
1369 .live_kit
1370 .as_mut()
1371 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1372
1373 let (canceled, muted) = if let LocalTrack::Pending {
1374 publish_id: cur_publish_id,
1375 muted,
1376 } = &live_kit.screen_track
1377 {
1378 (*cur_publish_id != publish_id, *muted)
1379 } else {
1380 (true, false)
1381 };
1382
1383 match publication {
1384 Ok(publication) => {
1385 if canceled {
1386 live_kit.room.unpublish_track(publication);
1387 } else {
1388 if muted {
1389 cx.executor().spawn(publication.set_mute(muted)).detach();
1390 }
1391 live_kit.screen_track = LocalTrack::Published {
1392 track_publication: publication,
1393 muted,
1394 };
1395 cx.notify();
1396 }
1397
1398 Audio::play_sound(Sound::StartScreenshare, cx);
1399
1400 Ok(())
1401 }
1402 Err(error) => {
1403 if canceled {
1404 Ok(())
1405 } else {
1406 live_kit.screen_track = LocalTrack::None;
1407 cx.notify();
1408 Err(error)
1409 }
1410 }
1411 }
1412 })?
1413 })
1414 }
1415
1416 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1417 let should_mute = !self.is_muted(cx);
1418 if let Some(live_kit) = self.live_kit.as_mut() {
1419 if matches!(live_kit.microphone_track, LocalTrack::None) {
1420 return Ok(self.share_microphone(cx));
1421 }
1422
1423 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1424 live_kit.muted_by_user = should_mute;
1425
1426 if old_muted == true && live_kit.deafened == true {
1427 if let Some(task) = self.toggle_deafen(cx).ok() {
1428 task.detach();
1429 }
1430 }
1431
1432 Ok(ret_task)
1433 } else {
1434 Err(anyhow!("LiveKit not started"))
1435 }
1436 }
1437
1438 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1439 if let Some(live_kit) = self.live_kit.as_mut() {
1440 (*live_kit).deafened = !live_kit.deafened;
1441
1442 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1443 // Context notification is sent within set_mute itself.
1444 let mut mute_task = None;
1445 // When deafening, mute user's mic as well.
1446 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1447 if live_kit.deafened || !live_kit.muted_by_user {
1448 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1449 };
1450 for participant in self.remote_participants.values() {
1451 for track in live_kit
1452 .room
1453 .remote_audio_track_publications(&participant.user.id.to_string())
1454 {
1455 let deafened = live_kit.deafened;
1456 tasks.push(
1457 cx.executor()
1458 .spawn_on_main(move || track.set_enabled(!deafened)),
1459 );
1460 }
1461 }
1462
1463 Ok(cx.executor().spawn_on_main(|| async {
1464 if let Some(mute_task) = mute_task {
1465 mute_task.await?;
1466 }
1467 for task in tasks {
1468 task.await?;
1469 }
1470 Ok(())
1471 }))
1472 } else {
1473 Err(anyhow!("LiveKit not started"))
1474 }
1475 }
1476
1477 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1478 if self.status.is_offline() {
1479 return Err(anyhow!("room is offline"));
1480 }
1481
1482 let live_kit = self
1483 .live_kit
1484 .as_mut()
1485 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1486 match mem::take(&mut live_kit.screen_track) {
1487 LocalTrack::None => Err(anyhow!("screen was not shared")),
1488 LocalTrack::Pending { .. } => {
1489 cx.notify();
1490 Ok(())
1491 }
1492 LocalTrack::Published {
1493 track_publication, ..
1494 } => {
1495 live_kit.room.unpublish_track(track_publication);
1496 cx.notify();
1497
1498 Audio::play_sound(Sound::StopScreenshare, cx);
1499 Ok(())
1500 }
1501 }
1502 }
1503
1504 #[cfg(any(test, feature = "test-support"))]
1505 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1506 self.live_kit
1507 .as_ref()
1508 .unwrap()
1509 .room
1510 .set_display_sources(sources);
1511 }
1512}
1513
1514struct LiveKitRoom {
1515 room: Arc<live_kit_client::Room>,
1516 screen_track: LocalTrack,
1517 microphone_track: LocalTrack,
1518 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1519 muted_by_user: bool,
1520 deafened: bool,
1521 speaking: bool,
1522 next_publish_id: usize,
1523 _maintain_room: Task<()>,
1524 _maintain_tracks: [Task<()>; 2],
1525}
1526
1527impl LiveKitRoom {
1528 fn set_mute(
1529 self: &mut LiveKitRoom,
1530 should_mute: bool,
1531 cx: &mut ModelContext<Room>,
1532 ) -> Result<(Task<Result<()>>, bool)> {
1533 if !should_mute {
1534 // clear user muting state.
1535 self.muted_by_user = false;
1536 }
1537
1538 let (result, old_muted) = match &mut self.microphone_track {
1539 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1540 LocalTrack::Pending { muted, .. } => {
1541 let old_muted = *muted;
1542 *muted = should_mute;
1543 cx.notify();
1544 Ok((Task::Ready(Some(Ok(()))), old_muted))
1545 }
1546 LocalTrack::Published {
1547 track_publication,
1548 muted,
1549 } => {
1550 let old_muted = *muted;
1551 *muted = should_mute;
1552 cx.notify();
1553 Ok((
1554 cx.executor().spawn(track_publication.set_mute(*muted)),
1555 old_muted,
1556 ))
1557 }
1558 }?;
1559
1560 if old_muted != should_mute {
1561 if should_mute {
1562 Audio::play_sound(Sound::Mute, cx);
1563 } else {
1564 Audio::play_sound(Sound::Unmute, cx);
1565 }
1566 }
1567
1568 Ok((result, old_muted))
1569 }
1570}
1571
1572enum LocalTrack {
1573 None,
1574 Pending {
1575 publish_id: usize,
1576 muted: bool,
1577 },
1578 Published {
1579 track_publication: LocalTrackPublication,
1580 muted: bool,
1581 },
1582}
1583
1584impl Default for LocalTrack {
1585 fn default() -> Self {
1586 Self::None
1587 }
1588}
1589
1590#[derive(Copy, Clone, PartialEq, Eq)]
1591pub enum RoomStatus {
1592 Online,
1593 Rejoining,
1594 Offline,
1595}
1596
1597impl RoomStatus {
1598 pub fn is_offline(&self) -> bool {
1599 matches!(self, RoomStatus::Offline)
1600 }
1601
1602 pub fn is_online(&self) -> bool {
1603 matches!(self, RoomStatus::Online)
1604 }
1605}