1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
16};
17use language::LanguageRegistry;
18use live_kit_client::{
19 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
20 RemoteVideoTrackUpdate,
21};
22use postage::{sink::Sink, stream::Stream, watch};
23use project::Project;
24use settings::Settings as _;
25use std::{future::Future, mem, sync::Arc, time::Duration};
26use util::{post_inc, ResultExt, TryFutureExt};
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub enum Event {
32 ParticipantLocationChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteVideoTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteAudioTracksChanged {
39 participant_id: proto::PeerId,
40 },
41 RemoteProjectShared {
42 owner: Arc<User>,
43 project_id: u64,
44 worktree_root_names: Vec<String>,
45 },
46 RemoteProjectUnshared {
47 project_id: u64,
48 },
49 RemoteProjectJoined {
50 project_id: u64,
51 },
52 RemoteProjectInvitationDiscarded {
53 project_id: u64,
54 },
55 Left,
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<u64>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakModel<Project>>,
64 joined_projects: HashSet<WeakModel<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Model<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<u64> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 live_kit_client::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<u64>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Model<UserStore>,
111 cx: &mut ModelContext<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = live_kit_client::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(|this, mut cx| async move {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == live_kit_client::ConnectionState::Disconnected {
127 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
128 .ok();
129 break;
130 }
131 }
132 });
133
134 let _maintain_video_tracks = cx.spawn({
135 let room = room.clone();
136 move |this, mut cx| async move {
137 let mut track_video_changes = room.remote_video_track_updates();
138 while let Some(track_change) = track_video_changes.next().await {
139 let this = if let Some(this) = this.upgrade() {
140 this
141 } else {
142 break;
143 };
144
145 this.update(&mut cx, |this, cx| {
146 this.remote_video_track_updated(track_change, cx).log_err()
147 })
148 .ok();
149 }
150 }
151 });
152
153 let _maintain_audio_tracks = cx.spawn({
154 let room = room.clone();
155 |this, mut cx| async move {
156 let mut track_audio_changes = room.remote_audio_track_updates();
157 while let Some(track_change) = track_audio_changes.next().await {
158 let this = if let Some(this) = this.upgrade() {
159 this
160 } else {
161 break;
162 };
163
164 this.update(&mut cx, |this, cx| {
165 this.remote_audio_track_updated(track_change, cx).log_err()
166 })
167 .ok();
168 }
169 }
170 });
171
172 let connect = room.connect(&connection_info.server_url, &connection_info.token);
173 cx.spawn(|this, mut cx| async move {
174 connect.await?;
175
176 if !cx.update(|cx| Self::mute_on_join(cx))? {
177 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
178 .await?;
179 }
180
181 anyhow::Ok(())
182 })
183 .detach_and_log_err(cx);
184
185 Some(LiveKitRoom {
186 room,
187 screen_track: LocalTrack::None,
188 microphone_track: LocalTrack::None,
189 next_publish_id: 0,
190 muted_by_user: false,
191 deafened: false,
192 speaking: false,
193 _maintain_room,
194 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
195 })
196 } else {
197 None
198 };
199
200 let maintain_connection = cx.spawn({
201 let client = client.clone();
202 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
203 });
204
205 Audio::play_sound(Sound::Joined, cx);
206
207 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
208
209 Self {
210 id,
211 channel_id,
212 live_kit: live_kit_room,
213 status: RoomStatus::Online,
214 shared_projects: Default::default(),
215 joined_projects: Default::default(),
216 participant_user_ids: Default::default(),
217 local_participant: Default::default(),
218 remote_participants: Default::default(),
219 pending_participants: Default::default(),
220 pending_call_count: 0,
221 client_subscriptions: vec![
222 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
223 ],
224 _subscriptions: vec![
225 cx.on_release(Self::released),
226 cx.on_app_quit(Self::app_will_quit),
227 ],
228 leave_when_empty: false,
229 pending_room_update: None,
230 client,
231 user_store,
232 follows_by_leader_id_project_id: Default::default(),
233 maintain_connection: Some(maintain_connection),
234 room_update_completed_tx,
235 room_update_completed_rx,
236 }
237 }
238
239 pub(crate) fn create(
240 called_user_id: u64,
241 initial_project: Option<Model<Project>>,
242 client: Arc<Client>,
243 user_store: Model<UserStore>,
244 cx: &mut AppContext,
245 ) -> Task<Result<Model<Self>>> {
246 cx.spawn(move |mut cx| async move {
247 let response = client.request(proto::CreateRoom {}).await?;
248 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
249 let room = cx.build_model(|cx| {
250 Self::new(
251 room_proto.id,
252 None,
253 response.live_kit_connection_info,
254 client,
255 user_store,
256 cx,
257 )
258 })?;
259
260 let initial_project_id = if let Some(initial_project) = initial_project {
261 let initial_project_id = room
262 .update(&mut cx, |room, cx| {
263 room.share_project(initial_project.clone(), cx)
264 })?
265 .await?;
266 Some(initial_project_id)
267 } else {
268 None
269 };
270
271 match room
272 .update(&mut cx, |room, cx| {
273 room.leave_when_empty = true;
274 room.call(called_user_id, initial_project_id, cx)
275 })?
276 .await
277 {
278 Ok(()) => Ok(room),
279 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
280 }
281 })
282 }
283
284 pub(crate) async fn join_channel(
285 channel_id: u64,
286 client: Arc<Client>,
287 user_store: Model<UserStore>,
288 cx: AsyncAppContext,
289 ) -> Result<Model<Self>> {
290 Self::from_join_response(
291 client.request(proto::JoinChannel { channel_id }).await?,
292 client,
293 user_store,
294 cx,
295 )
296 }
297
298 pub(crate) async fn join(
299 room_id: u64,
300 client: Arc<Client>,
301 user_store: Model<UserStore>,
302 cx: AsyncAppContext,
303 ) -> Result<Model<Self>> {
304 Self::from_join_response(
305 client.request(proto::JoinRoom { id: room_id }).await?,
306 client,
307 user_store,
308 cx,
309 )
310 }
311
312 fn released(&mut self, cx: &mut AppContext) {
313 if self.status.is_online() {
314 self.leave_internal(cx).detach_and_log_err(cx);
315 }
316 }
317
318 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
319 let task = if self.status.is_online() {
320 let leave = self.leave_internal(cx);
321 Some(cx.background_executor().spawn(async move {
322 leave.await.log_err();
323 }))
324 } else {
325 None
326 };
327
328 async move {
329 if let Some(task) = task {
330 task.await;
331 }
332 }
333 }
334
335 pub fn mute_on_join(cx: &AppContext) -> bool {
336 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
337 }
338
339 fn from_join_response(
340 response: proto::JoinRoomResponse,
341 client: Arc<Client>,
342 user_store: Model<UserStore>,
343 mut cx: AsyncAppContext,
344 ) -> Result<Model<Self>> {
345 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
346 let room = cx.build_model(|cx| {
347 Self::new(
348 room_proto.id,
349 response.channel_id,
350 response.live_kit_connection_info,
351 client,
352 user_store,
353 cx,
354 )
355 })?;
356 room.update(&mut cx, |room, cx| {
357 room.leave_when_empty = room.channel_id.is_none();
358 room.apply_room_update(room_proto, cx)?;
359 anyhow::Ok(())
360 })??;
361 Ok(room)
362 }
363
364 fn should_leave(&self) -> bool {
365 self.leave_when_empty
366 && self.pending_room_update.is_none()
367 && self.pending_participants.is_empty()
368 && self.remote_participants.is_empty()
369 && self.pending_call_count == 0
370 }
371
372 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
373 cx.notify();
374 cx.emit(Event::Left);
375 self.leave_internal(cx)
376 }
377
378 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
379 if self.status.is_offline() {
380 return Task::ready(Err(anyhow!("room is offline")));
381 }
382
383 log::info!("leaving room");
384 Audio::play_sound(Sound::Leave, cx);
385
386 self.clear_state(cx);
387
388 let leave_room = self.client.request(proto::LeaveRoom {});
389 cx.background_executor().spawn(async move {
390 leave_room.await?;
391 anyhow::Ok(())
392 })
393 }
394
395 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
396 for project in self.shared_projects.drain() {
397 if let Some(project) = project.upgrade() {
398 project.update(cx, |project, cx| {
399 project.unshare(cx).log_err();
400 });
401 }
402 }
403 for project in self.joined_projects.drain() {
404 if let Some(project) = project.upgrade() {
405 project.update(cx, |project, cx| {
406 project.disconnected_from_host(cx);
407 project.close(cx);
408 });
409 }
410 }
411
412 self.status = RoomStatus::Offline;
413 self.remote_participants.clear();
414 self.pending_participants.clear();
415 self.participant_user_ids.clear();
416 self.client_subscriptions.clear();
417 self.live_kit.take();
418 self.pending_room_update.take();
419 self.maintain_connection.take();
420 }
421
422 async fn maintain_connection(
423 this: WeakModel<Self>,
424 client: Arc<Client>,
425 mut cx: AsyncAppContext,
426 ) -> Result<()> {
427 let mut client_status = client.status();
428 loop {
429 let _ = client_status.try_recv();
430 let is_connected = client_status.borrow().is_connected();
431 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
432 if !is_connected || client_status.next().await.is_some() {
433 log::info!("detected client disconnection");
434
435 this.upgrade()
436 .ok_or_else(|| anyhow!("room was dropped"))?
437 .update(&mut cx, |this, cx| {
438 this.status = RoomStatus::Rejoining;
439 cx.notify();
440 })?;
441
442 // Wait for client to re-establish a connection to the server.
443 {
444 let mut reconnection_timeout =
445 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
446 let client_reconnection = async {
447 let mut remaining_attempts = 3;
448 while remaining_attempts > 0 {
449 if client_status.borrow().is_connected() {
450 log::info!("client reconnected, attempting to rejoin room");
451
452 let Some(this) = this.upgrade() else { break };
453 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
454 Ok(task) => {
455 if task.await.log_err().is_some() {
456 return true;
457 } else {
458 remaining_attempts -= 1;
459 }
460 }
461 Err(_app_dropped) => return false,
462 }
463 } else if client_status.borrow().is_signed_out() {
464 return false;
465 }
466
467 log::info!(
468 "waiting for client status change, remaining attempts {}",
469 remaining_attempts
470 );
471 client_status.next().await;
472 }
473 false
474 }
475 .fuse();
476 futures::pin_mut!(client_reconnection);
477
478 futures::select_biased! {
479 reconnected = client_reconnection => {
480 if reconnected {
481 log::info!("successfully reconnected to room");
482 // If we successfully joined the room, go back around the loop
483 // waiting for future connection status changes.
484 continue;
485 }
486 }
487 _ = reconnection_timeout => {
488 log::info!("room reconnection timeout expired");
489 }
490 }
491 }
492
493 break;
494 }
495 }
496
497 // The client failed to re-establish a connection to the server
498 // or an error occurred while trying to re-join the room. Either way
499 // we leave the room and return an error.
500 if let Some(this) = this.upgrade() {
501 log::info!("reconnection failed, leaving room");
502 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
503 }
504 Err(anyhow!(
505 "can't reconnect to room: client failed to re-establish connection"
506 ))
507 }
508
509 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
510 let mut projects = HashMap::default();
511 let mut reshared_projects = Vec::new();
512 let mut rejoined_projects = Vec::new();
513 self.shared_projects.retain(|project| {
514 if let Some(handle) = project.upgrade() {
515 let project = handle.read(cx);
516 if let Some(project_id) = project.remote_id() {
517 projects.insert(project_id, handle.clone());
518 reshared_projects.push(proto::UpdateProject {
519 project_id,
520 worktrees: project.worktree_metadata_protos(cx),
521 });
522 return true;
523 }
524 }
525 false
526 });
527 self.joined_projects.retain(|project| {
528 if let Some(handle) = project.upgrade() {
529 let project = handle.read(cx);
530 if let Some(project_id) = project.remote_id() {
531 projects.insert(project_id, handle.clone());
532 rejoined_projects.push(proto::RejoinProject {
533 id: project_id,
534 worktrees: project
535 .worktrees()
536 .map(|worktree| {
537 let worktree = worktree.read(cx);
538 proto::RejoinWorktree {
539 id: worktree.id().to_proto(),
540 scan_id: worktree.completed_scan_id() as u64,
541 }
542 })
543 .collect(),
544 });
545 }
546 return true;
547 }
548 false
549 });
550
551 let response = self.client.request_envelope(proto::RejoinRoom {
552 id: self.id,
553 reshared_projects,
554 rejoined_projects,
555 });
556
557 cx.spawn(|this, mut cx| async move {
558 let response = response.await?;
559 let message_id = response.message_id;
560 let response = response.payload;
561 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
562 this.update(&mut cx, |this, cx| {
563 this.status = RoomStatus::Online;
564 this.apply_room_update(room_proto, cx)?;
565
566 for reshared_project in response.reshared_projects {
567 if let Some(project) = projects.get(&reshared_project.id) {
568 project.update(cx, |project, cx| {
569 project.reshared(reshared_project, cx).log_err();
570 });
571 }
572 }
573
574 for rejoined_project in response.rejoined_projects {
575 if let Some(project) = projects.get(&rejoined_project.id) {
576 project.update(cx, |project, cx| {
577 project.rejoined(rejoined_project, message_id, cx).log_err();
578 });
579 }
580 }
581
582 anyhow::Ok(())
583 })?
584 })
585 }
586
587 pub fn id(&self) -> u64 {
588 self.id
589 }
590
591 pub fn status(&self) -> RoomStatus {
592 self.status
593 }
594
595 pub fn local_participant(&self) -> &LocalParticipant {
596 &self.local_participant
597 }
598
599 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
600 &self.remote_participants
601 }
602
603 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
604 self.remote_participants
605 .values()
606 .find(|p| p.peer_id == peer_id)
607 }
608
609 pub fn pending_participants(&self) -> &[Arc<User>] {
610 &self.pending_participants
611 }
612
613 pub fn contains_participant(&self, user_id: u64) -> bool {
614 self.participant_user_ids.contains(&user_id)
615 }
616
617 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
618 self.follows_by_leader_id_project_id
619 .get(&(leader_id, project_id))
620 .map_or(&[], |v| v.as_slice())
621 }
622
623 /// Returns the most 'active' projects, defined as most people in the project
624 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
625 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
626 for participant in self.remote_participants.values() {
627 match participant.location {
628 ParticipantLocation::SharedProject { project_id } => {
629 project_hosts_and_guest_counts
630 .entry(project_id)
631 .or_default()
632 .1 += 1;
633 }
634 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
635 }
636 for project in &participant.projects {
637 project_hosts_and_guest_counts
638 .entry(project.id)
639 .or_default()
640 .0 = Some(participant.user.id);
641 }
642 }
643
644 if let Some(user) = self.user_store.read(cx).current_user() {
645 for project in &self.local_participant.projects {
646 project_hosts_and_guest_counts
647 .entry(project.id)
648 .or_default()
649 .0 = Some(user.id);
650 }
651 }
652
653 project_hosts_and_guest_counts
654 .into_iter()
655 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
656 .max_by_key(|(_, _, guest_count)| *guest_count)
657 .map(|(id, host, _)| (id, host))
658 }
659
660 async fn handle_room_updated(
661 this: Model<Self>,
662 envelope: TypedEnvelope<proto::RoomUpdated>,
663 _: Arc<Client>,
664 mut cx: AsyncAppContext,
665 ) -> Result<()> {
666 let room = envelope
667 .payload
668 .room
669 .ok_or_else(|| anyhow!("invalid room"))?;
670 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
671 }
672
673 fn apply_room_update(
674 &mut self,
675 mut room: proto::Room,
676 cx: &mut ModelContext<Self>,
677 ) -> Result<()> {
678 // Filter ourselves out from the room's participants.
679 let local_participant_ix = room
680 .participants
681 .iter()
682 .position(|participant| Some(participant.user_id) == self.client.user_id());
683 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
684
685 let pending_participant_user_ids = room
686 .pending_participants
687 .iter()
688 .map(|p| p.user_id)
689 .collect::<Vec<_>>();
690
691 let remote_participant_user_ids = room
692 .participants
693 .iter()
694 .map(|p| p.user_id)
695 .collect::<Vec<_>>();
696
697 let (remote_participants, pending_participants) =
698 self.user_store.update(cx, move |user_store, cx| {
699 (
700 user_store.get_users(remote_participant_user_ids, cx),
701 user_store.get_users(pending_participant_user_ids, cx),
702 )
703 });
704
705 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
706 let (remote_participants, pending_participants) =
707 futures::join!(remote_participants, pending_participants);
708
709 this.update(&mut cx, |this, cx| {
710 this.participant_user_ids.clear();
711
712 if let Some(participant) = local_participant {
713 this.local_participant.projects = participant.projects;
714 } else {
715 this.local_participant.projects.clear();
716 }
717
718 if let Some(participants) = remote_participants.log_err() {
719 for (participant, user) in room.participants.into_iter().zip(participants) {
720 let Some(peer_id) = participant.peer_id else {
721 continue;
722 };
723 let participant_index = ParticipantIndex(participant.participant_index);
724 this.participant_user_ids.insert(participant.user_id);
725
726 let old_projects = this
727 .remote_participants
728 .get(&participant.user_id)
729 .into_iter()
730 .flat_map(|existing| &existing.projects)
731 .map(|project| project.id)
732 .collect::<HashSet<_>>();
733 let new_projects = participant
734 .projects
735 .iter()
736 .map(|project| project.id)
737 .collect::<HashSet<_>>();
738
739 for project in &participant.projects {
740 if !old_projects.contains(&project.id) {
741 cx.emit(Event::RemoteProjectShared {
742 owner: user.clone(),
743 project_id: project.id,
744 worktree_root_names: project.worktree_root_names.clone(),
745 });
746 }
747 }
748
749 for unshared_project_id in old_projects.difference(&new_projects) {
750 this.joined_projects.retain(|project| {
751 if let Some(project) = project.upgrade() {
752 project.update(cx, |project, cx| {
753 if project.remote_id() == Some(*unshared_project_id) {
754 project.disconnected_from_host(cx);
755 false
756 } else {
757 true
758 }
759 })
760 } else {
761 false
762 }
763 });
764 cx.emit(Event::RemoteProjectUnshared {
765 project_id: *unshared_project_id,
766 });
767 }
768
769 let location = ParticipantLocation::from_proto(participant.location)
770 .unwrap_or(ParticipantLocation::External);
771 if let Some(remote_participant) =
772 this.remote_participants.get_mut(&participant.user_id)
773 {
774 remote_participant.peer_id = peer_id;
775 remote_participant.projects = participant.projects;
776 remote_participant.participant_index = participant_index;
777 if location != remote_participant.location {
778 remote_participant.location = location;
779 cx.emit(Event::ParticipantLocationChanged {
780 participant_id: peer_id,
781 });
782 }
783 } else {
784 this.remote_participants.insert(
785 participant.user_id,
786 RemoteParticipant {
787 user: user.clone(),
788 participant_index,
789 peer_id,
790 projects: participant.projects,
791 location,
792 muted: true,
793 speaking: false,
794 video_tracks: Default::default(),
795 audio_tracks: Default::default(),
796 },
797 );
798
799 Audio::play_sound(Sound::Joined, cx);
800
801 if let Some(live_kit) = this.live_kit.as_ref() {
802 let video_tracks =
803 live_kit.room.remote_video_tracks(&user.id.to_string());
804 let audio_tracks =
805 live_kit.room.remote_audio_tracks(&user.id.to_string());
806 let publications = live_kit
807 .room
808 .remote_audio_track_publications(&user.id.to_string());
809
810 for track in video_tracks {
811 this.remote_video_track_updated(
812 RemoteVideoTrackUpdate::Subscribed(track),
813 cx,
814 )
815 .log_err();
816 }
817
818 for (track, publication) in
819 audio_tracks.iter().zip(publications.iter())
820 {
821 this.remote_audio_track_updated(
822 RemoteAudioTrackUpdate::Subscribed(
823 track.clone(),
824 publication.clone(),
825 ),
826 cx,
827 )
828 .log_err();
829 }
830 }
831 }
832 }
833
834 this.remote_participants.retain(|user_id, participant| {
835 if this.participant_user_ids.contains(user_id) {
836 true
837 } else {
838 for project in &participant.projects {
839 cx.emit(Event::RemoteProjectUnshared {
840 project_id: project.id,
841 });
842 }
843 false
844 }
845 });
846 }
847
848 if let Some(pending_participants) = pending_participants.log_err() {
849 this.pending_participants = pending_participants;
850 for participant in &this.pending_participants {
851 this.participant_user_ids.insert(participant.id);
852 }
853 }
854
855 this.follows_by_leader_id_project_id.clear();
856 for follower in room.followers {
857 let project_id = follower.project_id;
858 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
859 (Some(leader), Some(follower)) => (leader, follower),
860
861 _ => {
862 log::error!("Follower message {follower:?} missing some state");
863 continue;
864 }
865 };
866
867 let list = this
868 .follows_by_leader_id_project_id
869 .entry((leader, project_id))
870 .or_insert(Vec::new());
871 if !list.contains(&follower) {
872 list.push(follower);
873 }
874 }
875
876 this.pending_room_update.take();
877 if this.should_leave() {
878 log::info!("room is empty, leaving");
879 let _ = this.leave(cx);
880 }
881
882 this.user_store.update(cx, |user_store, cx| {
883 let participant_indices_by_user_id = this
884 .remote_participants
885 .iter()
886 .map(|(user_id, participant)| (*user_id, participant.participant_index))
887 .collect();
888 user_store.set_participant_indices(participant_indices_by_user_id, cx);
889 });
890
891 this.check_invariants();
892 this.room_update_completed_tx.try_send(Some(())).ok();
893 cx.notify();
894 })
895 .ok();
896 }));
897
898 cx.notify();
899 Ok(())
900 }
901
902 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
903 let mut done_rx = self.room_update_completed_rx.clone();
904 async move {
905 while let Some(result) = done_rx.next().await {
906 if result.is_some() {
907 break;
908 }
909 }
910 }
911 }
912
913 fn remote_video_track_updated(
914 &mut self,
915 change: RemoteVideoTrackUpdate,
916 cx: &mut ModelContext<Self>,
917 ) -> Result<()> {
918 match change {
919 RemoteVideoTrackUpdate::Subscribed(track) => {
920 let user_id = track.publisher_id().parse()?;
921 let track_id = track.sid().to_string();
922 let participant = self
923 .remote_participants
924 .get_mut(&user_id)
925 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
926 participant.video_tracks.insert(track_id.clone(), track);
927 cx.emit(Event::RemoteVideoTracksChanged {
928 participant_id: participant.peer_id,
929 });
930 }
931 RemoteVideoTrackUpdate::Unsubscribed {
932 publisher_id,
933 track_id,
934 } => {
935 let user_id = publisher_id.parse()?;
936 let participant = self
937 .remote_participants
938 .get_mut(&user_id)
939 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
940 participant.video_tracks.remove(&track_id);
941 cx.emit(Event::RemoteVideoTracksChanged {
942 participant_id: participant.peer_id,
943 });
944 }
945 }
946
947 cx.notify();
948 Ok(())
949 }
950
951 fn remote_audio_track_updated(
952 &mut self,
953 change: RemoteAudioTrackUpdate,
954 cx: &mut ModelContext<Self>,
955 ) -> Result<()> {
956 match change {
957 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
958 let mut speaker_ids = speakers
959 .into_iter()
960 .filter_map(|speaker_sid| speaker_sid.parse().ok())
961 .collect::<Vec<u64>>();
962 speaker_ids.sort_unstable();
963 for (sid, participant) in &mut self.remote_participants {
964 if let Ok(_) = speaker_ids.binary_search(sid) {
965 participant.speaking = true;
966 } else {
967 participant.speaking = false;
968 }
969 }
970 if let Some(id) = self.client.user_id() {
971 if let Some(room) = &mut self.live_kit {
972 if let Ok(_) = speaker_ids.binary_search(&id) {
973 room.speaking = true;
974 } else {
975 room.speaking = false;
976 }
977 }
978 }
979 cx.notify();
980 }
981 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
982 let mut found = false;
983 for participant in &mut self.remote_participants.values_mut() {
984 for track in participant.audio_tracks.values() {
985 if track.sid() == track_id {
986 found = true;
987 break;
988 }
989 }
990 if found {
991 participant.muted = muted;
992 break;
993 }
994 }
995
996 cx.notify();
997 }
998 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
999 let user_id = track.publisher_id().parse()?;
1000 let track_id = track.sid().to_string();
1001 let participant = self
1002 .remote_participants
1003 .get_mut(&user_id)
1004 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1005 participant.audio_tracks.insert(track_id.clone(), track);
1006 participant.muted = publication.is_muted();
1007
1008 cx.emit(Event::RemoteAudioTracksChanged {
1009 participant_id: participant.peer_id,
1010 });
1011 }
1012 RemoteAudioTrackUpdate::Unsubscribed {
1013 publisher_id,
1014 track_id,
1015 } => {
1016 let user_id = publisher_id.parse()?;
1017 let participant = self
1018 .remote_participants
1019 .get_mut(&user_id)
1020 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1021 participant.audio_tracks.remove(&track_id);
1022 cx.emit(Event::RemoteAudioTracksChanged {
1023 participant_id: participant.peer_id,
1024 });
1025 }
1026 }
1027
1028 cx.notify();
1029 Ok(())
1030 }
1031
1032 fn check_invariants(&self) {
1033 #[cfg(any(test, feature = "test-support"))]
1034 {
1035 for participant in self.remote_participants.values() {
1036 assert!(self.participant_user_ids.contains(&participant.user.id));
1037 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1038 }
1039
1040 for participant in &self.pending_participants {
1041 assert!(self.participant_user_ids.contains(&participant.id));
1042 assert_ne!(participant.id, self.client.user_id().unwrap());
1043 }
1044
1045 assert_eq!(
1046 self.participant_user_ids.len(),
1047 self.remote_participants.len() + self.pending_participants.len()
1048 );
1049 }
1050 }
1051
1052 pub(crate) fn call(
1053 &mut self,
1054 called_user_id: u64,
1055 initial_project_id: Option<u64>,
1056 cx: &mut ModelContext<Self>,
1057 ) -> Task<Result<()>> {
1058 if self.status.is_offline() {
1059 return Task::ready(Err(anyhow!("room is offline")));
1060 }
1061
1062 cx.notify();
1063 let client = self.client.clone();
1064 let room_id = self.id;
1065 self.pending_call_count += 1;
1066 cx.spawn(move |this, mut cx| async move {
1067 let result = client
1068 .request(proto::Call {
1069 room_id,
1070 called_user_id,
1071 initial_project_id,
1072 })
1073 .await;
1074 this.update(&mut cx, |this, cx| {
1075 this.pending_call_count -= 1;
1076 if this.should_leave() {
1077 this.leave(cx).detach_and_log_err(cx);
1078 }
1079 })?;
1080 result?;
1081 Ok(())
1082 })
1083 }
1084
1085 pub fn join_project(
1086 &mut self,
1087 id: u64,
1088 language_registry: Arc<LanguageRegistry>,
1089 fs: Arc<dyn Fs>,
1090 cx: &mut ModelContext<Self>,
1091 ) -> Task<Result<Model<Project>>> {
1092 let client = self.client.clone();
1093 let user_store = self.user_store.clone();
1094 cx.emit(Event::RemoteProjectJoined { project_id: id });
1095 cx.spawn(move |this, mut cx| async move {
1096 let project =
1097 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1098
1099 this.update(&mut cx, |this, cx| {
1100 this.joined_projects.retain(|project| {
1101 if let Some(project) = project.upgrade() {
1102 !project.read(cx).is_read_only()
1103 } else {
1104 false
1105 }
1106 });
1107 this.joined_projects.insert(project.downgrade());
1108 })?;
1109 Ok(project)
1110 })
1111 }
1112
1113 pub(crate) fn share_project(
1114 &mut self,
1115 project: Model<Project>,
1116 cx: &mut ModelContext<Self>,
1117 ) -> Task<Result<u64>> {
1118 if let Some(project_id) = project.read(cx).remote_id() {
1119 return Task::ready(Ok(project_id));
1120 }
1121
1122 let request = self.client.request(proto::ShareProject {
1123 room_id: self.id(),
1124 worktrees: project.read(cx).worktree_metadata_protos(cx),
1125 });
1126 cx.spawn(|this, mut cx| async move {
1127 let response = request.await?;
1128
1129 project.update(&mut cx, |project, cx| {
1130 project.shared(response.project_id, cx)
1131 })??;
1132
1133 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1134 this.update(&mut cx, |this, cx| {
1135 this.shared_projects.insert(project.downgrade());
1136 let active_project = this.local_participant.active_project.as_ref();
1137 if active_project.map_or(false, |location| *location == project) {
1138 this.set_location(Some(&project), cx)
1139 } else {
1140 Task::ready(Ok(()))
1141 }
1142 })?
1143 .await?;
1144
1145 Ok(response.project_id)
1146 })
1147 }
1148
1149 pub(crate) fn unshare_project(
1150 &mut self,
1151 project: Model<Project>,
1152 cx: &mut ModelContext<Self>,
1153 ) -> Result<()> {
1154 let project_id = match project.read(cx).remote_id() {
1155 Some(project_id) => project_id,
1156 None => return Ok(()),
1157 };
1158
1159 self.client.send(proto::UnshareProject { project_id })?;
1160 project.update(cx, |this, cx| this.unshare(cx))
1161 }
1162
1163 pub(crate) fn set_location(
1164 &mut self,
1165 project: Option<&Model<Project>>,
1166 cx: &mut ModelContext<Self>,
1167 ) -> Task<Result<()>> {
1168 if self.status.is_offline() {
1169 return Task::ready(Err(anyhow!("room is offline")));
1170 }
1171
1172 let client = self.client.clone();
1173 let room_id = self.id;
1174 let location = if let Some(project) = project {
1175 self.local_participant.active_project = Some(project.downgrade());
1176 if let Some(project_id) = project.read(cx).remote_id() {
1177 proto::participant_location::Variant::SharedProject(
1178 proto::participant_location::SharedProject { id: project_id },
1179 )
1180 } else {
1181 proto::participant_location::Variant::UnsharedProject(
1182 proto::participant_location::UnsharedProject {},
1183 )
1184 }
1185 } else {
1186 self.local_participant.active_project = None;
1187 proto::participant_location::Variant::External(proto::participant_location::External {})
1188 };
1189
1190 cx.notify();
1191 cx.background_executor().spawn(async move {
1192 client
1193 .request(proto::UpdateParticipantLocation {
1194 room_id,
1195 location: Some(proto::ParticipantLocation {
1196 variant: Some(location),
1197 }),
1198 })
1199 .await?;
1200 Ok(())
1201 })
1202 }
1203
1204 pub fn is_screen_sharing(&self) -> bool {
1205 self.live_kit.as_ref().map_or(false, |live_kit| {
1206 !matches!(live_kit.screen_track, LocalTrack::None)
1207 })
1208 }
1209
1210 pub fn is_sharing_mic(&self) -> bool {
1211 self.live_kit.as_ref().map_or(false, |live_kit| {
1212 !matches!(live_kit.microphone_track, LocalTrack::None)
1213 })
1214 }
1215
1216 pub fn is_muted(&self, cx: &AppContext) -> bool {
1217 self.live_kit
1218 .as_ref()
1219 .and_then(|live_kit| match &live_kit.microphone_track {
1220 LocalTrack::None => Some(Self::mute_on_join(cx)),
1221 LocalTrack::Pending { muted, .. } => Some(*muted),
1222 LocalTrack::Published { muted, .. } => Some(*muted),
1223 })
1224 .unwrap_or(false)
1225 }
1226
1227 pub fn is_speaking(&self) -> bool {
1228 self.live_kit
1229 .as_ref()
1230 .map_or(false, |live_kit| live_kit.speaking)
1231 }
1232
1233 pub fn is_deafened(&self) -> Option<bool> {
1234 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1235 }
1236
1237 #[track_caller]
1238 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1239 if self.status.is_offline() {
1240 return Task::ready(Err(anyhow!("room is offline")));
1241 } else if self.is_sharing_mic() {
1242 return Task::ready(Err(anyhow!("microphone was already shared")));
1243 }
1244
1245 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1246 let publish_id = post_inc(&mut live_kit.next_publish_id);
1247 live_kit.microphone_track = LocalTrack::Pending {
1248 publish_id,
1249 muted: false,
1250 };
1251 cx.notify();
1252 publish_id
1253 } else {
1254 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1255 };
1256
1257 cx.spawn(move |this, mut cx| async move {
1258 let publish_track = async {
1259 let track = LocalAudioTrack::create();
1260 this.upgrade()
1261 .ok_or_else(|| anyhow!("room was dropped"))?
1262 .update(&mut cx, |this, _| {
1263 this.live_kit
1264 .as_ref()
1265 .map(|live_kit| live_kit.room.publish_audio_track(track))
1266 })?
1267 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1268 .await
1269 };
1270 let publication = publish_track.await;
1271 this.upgrade()
1272 .ok_or_else(|| anyhow!("room was dropped"))?
1273 .update(&mut cx, |this, cx| {
1274 let live_kit = this
1275 .live_kit
1276 .as_mut()
1277 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1278
1279 let (canceled, muted) = if let LocalTrack::Pending {
1280 publish_id: cur_publish_id,
1281 muted,
1282 } = &live_kit.microphone_track
1283 {
1284 (*cur_publish_id != publish_id, *muted)
1285 } else {
1286 (true, false)
1287 };
1288
1289 match publication {
1290 Ok(publication) => {
1291 if canceled {
1292 live_kit.room.unpublish_track(publication);
1293 } else {
1294 if muted {
1295 cx.background_executor()
1296 .spawn(publication.set_mute(muted))
1297 .detach();
1298 }
1299 live_kit.microphone_track = LocalTrack::Published {
1300 track_publication: publication,
1301 muted,
1302 };
1303 cx.notify();
1304 }
1305 Ok(())
1306 }
1307 Err(error) => {
1308 if canceled {
1309 Ok(())
1310 } else {
1311 live_kit.microphone_track = LocalTrack::None;
1312 cx.notify();
1313 Err(error)
1314 }
1315 }
1316 }
1317 })?
1318 })
1319 }
1320
1321 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1322 if self.status.is_offline() {
1323 return Task::ready(Err(anyhow!("room is offline")));
1324 } else if self.is_screen_sharing() {
1325 return Task::ready(Err(anyhow!("screen was already shared")));
1326 }
1327
1328 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1329 let publish_id = post_inc(&mut live_kit.next_publish_id);
1330 live_kit.screen_track = LocalTrack::Pending {
1331 publish_id,
1332 muted: false,
1333 };
1334 cx.notify();
1335 (live_kit.room.display_sources(), publish_id)
1336 } else {
1337 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1338 };
1339
1340 cx.spawn(move |this, mut cx| async move {
1341 let publish_track = async {
1342 let displays = displays.await?;
1343 let display = displays
1344 .first()
1345 .ok_or_else(|| anyhow!("no display found"))?;
1346 let track = LocalVideoTrack::screen_share_for_display(&display);
1347 this.upgrade()
1348 .ok_or_else(|| anyhow!("room was dropped"))?
1349 .update(&mut cx, |this, _| {
1350 this.live_kit
1351 .as_ref()
1352 .map(|live_kit| live_kit.room.publish_video_track(track))
1353 })?
1354 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1355 .await
1356 };
1357
1358 let publication = publish_track.await;
1359 this.upgrade()
1360 .ok_or_else(|| anyhow!("room was dropped"))?
1361 .update(&mut cx, |this, cx| {
1362 let live_kit = this
1363 .live_kit
1364 .as_mut()
1365 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1366
1367 let (canceled, muted) = if let LocalTrack::Pending {
1368 publish_id: cur_publish_id,
1369 muted,
1370 } = &live_kit.screen_track
1371 {
1372 (*cur_publish_id != publish_id, *muted)
1373 } else {
1374 (true, false)
1375 };
1376
1377 match publication {
1378 Ok(publication) => {
1379 if canceled {
1380 live_kit.room.unpublish_track(publication);
1381 } else {
1382 if muted {
1383 cx.background_executor()
1384 .spawn(publication.set_mute(muted))
1385 .detach();
1386 }
1387 live_kit.screen_track = LocalTrack::Published {
1388 track_publication: publication,
1389 muted,
1390 };
1391 cx.notify();
1392 }
1393
1394 Audio::play_sound(Sound::StartScreenshare, cx);
1395
1396 Ok(())
1397 }
1398 Err(error) => {
1399 if canceled {
1400 Ok(())
1401 } else {
1402 live_kit.screen_track = LocalTrack::None;
1403 cx.notify();
1404 Err(error)
1405 }
1406 }
1407 }
1408 })?
1409 })
1410 }
1411
1412 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1413 let should_mute = !self.is_muted(cx);
1414 if let Some(live_kit) = self.live_kit.as_mut() {
1415 if matches!(live_kit.microphone_track, LocalTrack::None) {
1416 return Ok(self.share_microphone(cx));
1417 }
1418
1419 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1420 live_kit.muted_by_user = should_mute;
1421
1422 if old_muted == true && live_kit.deafened == true {
1423 if let Some(task) = self.toggle_deafen(cx).ok() {
1424 task.detach();
1425 }
1426 }
1427
1428 Ok(ret_task)
1429 } else {
1430 Err(anyhow!("LiveKit not started"))
1431 }
1432 }
1433
1434 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1435 if let Some(live_kit) = self.live_kit.as_mut() {
1436 (*live_kit).deafened = !live_kit.deafened;
1437
1438 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1439 // Context notification is sent within set_mute itself.
1440 let mut mute_task = None;
1441 // When deafening, mute user's mic as well.
1442 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1443 if live_kit.deafened || !live_kit.muted_by_user {
1444 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1445 };
1446 for participant in self.remote_participants.values() {
1447 for track in live_kit
1448 .room
1449 .remote_audio_track_publications(&participant.user.id.to_string())
1450 {
1451 let deafened = live_kit.deafened;
1452 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1453 }
1454 }
1455
1456 Ok(cx.foreground_executor().spawn(async move {
1457 if let Some(mute_task) = mute_task {
1458 mute_task.await?;
1459 }
1460 for task in tasks {
1461 task.await?;
1462 }
1463 Ok(())
1464 }))
1465 } else {
1466 Err(anyhow!("LiveKit not started"))
1467 }
1468 }
1469
1470 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1471 if self.status.is_offline() {
1472 return Err(anyhow!("room is offline"));
1473 }
1474
1475 let live_kit = self
1476 .live_kit
1477 .as_mut()
1478 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1479 match mem::take(&mut live_kit.screen_track) {
1480 LocalTrack::None => Err(anyhow!("screen was not shared")),
1481 LocalTrack::Pending { .. } => {
1482 cx.notify();
1483 Ok(())
1484 }
1485 LocalTrack::Published {
1486 track_publication, ..
1487 } => {
1488 live_kit.room.unpublish_track(track_publication);
1489 cx.notify();
1490
1491 Audio::play_sound(Sound::StopScreenshare, cx);
1492 Ok(())
1493 }
1494 }
1495 }
1496
1497 #[cfg(any(test, feature = "test-support"))]
1498 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1499 self.live_kit
1500 .as_ref()
1501 .unwrap()
1502 .room
1503 .set_display_sources(sources);
1504 }
1505}
1506
1507struct LiveKitRoom {
1508 room: Arc<live_kit_client::Room>,
1509 screen_track: LocalTrack,
1510 microphone_track: LocalTrack,
1511 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1512 muted_by_user: bool,
1513 deafened: bool,
1514 speaking: bool,
1515 next_publish_id: usize,
1516 _maintain_room: Task<()>,
1517 _maintain_tracks: [Task<()>; 2],
1518}
1519
1520impl LiveKitRoom {
1521 fn set_mute(
1522 self: &mut LiveKitRoom,
1523 should_mute: bool,
1524 cx: &mut ModelContext<Room>,
1525 ) -> Result<(Task<Result<()>>, bool)> {
1526 if !should_mute {
1527 // clear user muting state.
1528 self.muted_by_user = false;
1529 }
1530
1531 let (result, old_muted) = match &mut self.microphone_track {
1532 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1533 LocalTrack::Pending { muted, .. } => {
1534 let old_muted = *muted;
1535 *muted = should_mute;
1536 cx.notify();
1537 Ok((Task::Ready(Some(Ok(()))), old_muted))
1538 }
1539 LocalTrack::Published {
1540 track_publication,
1541 muted,
1542 } => {
1543 let old_muted = *muted;
1544 *muted = should_mute;
1545 cx.notify();
1546 Ok((
1547 cx.background_executor()
1548 .spawn(track_publication.set_mute(*muted)),
1549 old_muted,
1550 ))
1551 }
1552 }?;
1553
1554 if old_muted != should_mute {
1555 if should_mute {
1556 Audio::play_sound(Sound::Mute, cx);
1557 } else {
1558 Audio::play_sound(Sound::Unmute, cx);
1559 }
1560 }
1561
1562 Ok((result, old_muted))
1563 }
1564}
1565
1566enum LocalTrack {
1567 None,
1568 Pending {
1569 publish_id: usize,
1570 muted: bool,
1571 },
1572 Published {
1573 track_publication: LocalTrackPublication,
1574 muted: bool,
1575 },
1576}
1577
1578impl Default for LocalTrack {
1579 fn default() -> Self {
1580 Self::None
1581 }
1582}
1583
1584#[derive(Copy, Clone, PartialEq, Eq)]
1585pub enum RoomStatus {
1586 Online,
1587 Rejoining,
1588 Offline,
1589}
1590
1591impl RoomStatus {
1592 pub fn is_offline(&self) -> bool {
1593 matches!(self, RoomStatus::Offline)
1594 }
1595
1596 pub fn is_online(&self) -> bool {
1597 matches!(self, RoomStatus::Online)
1598 }
1599}