1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
16};
17use language::LanguageRegistry;
18use live_kit_client::{
19 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
20 RemoteVideoTrackUpdate,
21};
22use postage::{sink::Sink, stream::Stream, watch};
23use project::Project;
24use settings::Settings;
25use std::{future::Future, mem, sync::Arc, time::Duration};
26use util::{post_inc, ResultExt, TryFutureExt};
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub enum Event {
32 ParticipantLocationChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteVideoTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteAudioTracksChanged {
39 participant_id: proto::PeerId,
40 },
41 RemoteProjectShared {
42 owner: Arc<User>,
43 project_id: u64,
44 worktree_root_names: Vec<String>,
45 },
46 RemoteProjectUnshared {
47 project_id: u64,
48 },
49 RemoteProjectJoined {
50 project_id: u64,
51 },
52 RemoteProjectInvitationDiscarded {
53 project_id: u64,
54 },
55 Left,
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<u64>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakModel<Project>>,
64 joined_projects: HashSet<WeakModel<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Model<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<u64> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 live_kit_client::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<u64>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Model<UserStore>,
111 cx: &mut ModelContext<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = live_kit_client::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(|this, mut cx| async move {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == live_kit_client::ConnectionState::Disconnected {
127 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
128 .ok();
129 break;
130 }
131 }
132 });
133
134 let _maintain_video_tracks = cx.spawn({
135 let room = room.clone();
136 move |this, mut cx| async move {
137 let mut track_video_changes = room.remote_video_track_updates();
138 while let Some(track_change) = track_video_changes.next().await {
139 let this = if let Some(this) = this.upgrade() {
140 this
141 } else {
142 break;
143 };
144
145 this.update(&mut cx, |this, cx| {
146 this.remote_video_track_updated(track_change, cx).log_err()
147 })
148 .ok();
149 }
150 }
151 });
152
153 let _maintain_audio_tracks = cx.spawn({
154 let room = room.clone();
155 |this, mut cx| async move {
156 let mut track_audio_changes = room.remote_audio_track_updates();
157 while let Some(track_change) = track_audio_changes.next().await {
158 let this = if let Some(this) = this.upgrade() {
159 this
160 } else {
161 break;
162 };
163
164 this.update(&mut cx, |this, cx| {
165 this.remote_audio_track_updated(track_change, cx).log_err()
166 })
167 .ok();
168 }
169 }
170 });
171
172 let connect = room.connect(&connection_info.server_url, &connection_info.token);
173 cx.spawn(|this, mut cx| async move {
174 connect.await?;
175
176 if !cx.update(|cx| Self::mute_on_join(cx))? {
177 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
178 .await?;
179 }
180
181 anyhow::Ok(())
182 })
183 .detach_and_log_err(cx);
184
185 Some(LiveKitRoom {
186 room,
187 screen_track: LocalTrack::None,
188 microphone_track: LocalTrack::None,
189 next_publish_id: 0,
190 muted_by_user: false,
191 deafened: false,
192 speaking: false,
193 _maintain_room,
194 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
195 })
196 } else {
197 None
198 };
199
200 let maintain_connection = cx.spawn({
201 let client = client.clone();
202 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
203 });
204
205 Audio::play_sound(Sound::Joined, cx);
206
207 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
208
209 Self {
210 id,
211 channel_id,
212 live_kit: live_kit_room,
213 status: RoomStatus::Online,
214 shared_projects: Default::default(),
215 joined_projects: Default::default(),
216 participant_user_ids: Default::default(),
217 local_participant: Default::default(),
218 remote_participants: Default::default(),
219 pending_participants: Default::default(),
220 pending_call_count: 0,
221 client_subscriptions: vec![
222 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
223 ],
224 _subscriptions: vec![
225 cx.on_release(Self::released),
226 cx.on_app_quit(Self::app_will_quit),
227 ],
228 leave_when_empty: false,
229 pending_room_update: None,
230 client,
231 user_store,
232 follows_by_leader_id_project_id: Default::default(),
233 maintain_connection: Some(maintain_connection),
234 room_update_completed_tx,
235 room_update_completed_rx,
236 }
237 }
238
239 pub(crate) fn create(
240 called_user_id: u64,
241 initial_project: Option<Model<Project>>,
242 client: Arc<Client>,
243 user_store: Model<UserStore>,
244 cx: &mut AppContext,
245 ) -> Task<Result<Model<Self>>> {
246 cx.spawn(move |mut cx| async move {
247 let response = client.request(proto::CreateRoom {}).await?;
248 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
249 let room = cx.build_model(|cx| {
250 Self::new(
251 room_proto.id,
252 None,
253 response.live_kit_connection_info,
254 client,
255 user_store,
256 cx,
257 )
258 })?;
259
260 let initial_project_id = if let Some(initial_project) = initial_project {
261 let initial_project_id = room
262 .update(&mut cx, |room, cx| {
263 room.share_project(initial_project.clone(), cx)
264 })?
265 .await?;
266 Some(initial_project_id)
267 } else {
268 None
269 };
270
271 match room
272 .update(&mut cx, |room, cx| {
273 room.leave_when_empty = true;
274 room.call(called_user_id, initial_project_id, cx)
275 })?
276 .await
277 {
278 Ok(()) => Ok(room),
279 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
280 }
281 })
282 }
283
284 pub(crate) async fn join_channel(
285 channel_id: u64,
286 client: Arc<Client>,
287 user_store: Model<UserStore>,
288 cx: AsyncAppContext,
289 ) -> Result<Model<Self>> {
290 Self::from_join_response(
291 client.request(proto::JoinChannel { channel_id }).await?,
292 client,
293 user_store,
294 cx,
295 )
296 }
297
298 pub(crate) async fn join(
299 room_id: u64,
300 client: Arc<Client>,
301 user_store: Model<UserStore>,
302 cx: AsyncAppContext,
303 ) -> Result<Model<Self>> {
304 Self::from_join_response(
305 client.request(proto::JoinRoom { id: room_id }).await?,
306 client,
307 user_store,
308 cx,
309 )
310 }
311
312 fn released(&mut self, cx: &mut AppContext) {
313 if self.status.is_online() {
314 self.leave_internal(cx).detach_and_log_err(cx);
315 }
316 }
317
318 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
319 let task = if self.status.is_online() {
320 let leave = self.leave_internal(cx);
321 Some(cx.background_executor().spawn(async move {
322 leave.await.log_err();
323 }))
324 } else {
325 None
326 };
327
328 async move {
329 if let Some(task) = task {
330 task.await;
331 }
332 }
333 }
334
335 pub fn mute_on_join(cx: &AppContext) -> bool {
336 false
337 //CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
338 }
339
340 fn from_join_response(
341 response: proto::JoinRoomResponse,
342 client: Arc<Client>,
343 user_store: Model<UserStore>,
344 mut cx: AsyncAppContext,
345 ) -> Result<Model<Self>> {
346 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
347 let room = cx.build_model(|cx| {
348 Self::new(
349 room_proto.id,
350 response.channel_id,
351 response.live_kit_connection_info,
352 client,
353 user_store,
354 cx,
355 )
356 })?;
357 room.update(&mut cx, |room, cx| {
358 room.leave_when_empty = room.channel_id.is_none();
359 room.apply_room_update(room_proto, cx)?;
360 anyhow::Ok(())
361 })??;
362 Ok(room)
363 }
364
365 fn should_leave(&self) -> bool {
366 self.leave_when_empty
367 && self.pending_room_update.is_none()
368 && self.pending_participants.is_empty()
369 && self.remote_participants.is_empty()
370 && self.pending_call_count == 0
371 }
372
373 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
374 cx.notify();
375 cx.emit(Event::Left);
376 self.leave_internal(cx)
377 }
378
379 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
380 if self.status.is_offline() {
381 return Task::ready(Err(anyhow!("room is offline")));
382 }
383
384 log::info!("leaving room");
385 Audio::play_sound(Sound::Leave, cx);
386
387 self.clear_state(cx);
388
389 let leave_room = self.client.request(proto::LeaveRoom {});
390 cx.background_executor().spawn(async move {
391 leave_room.await?;
392 anyhow::Ok(())
393 })
394 }
395
396 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
397 for project in self.shared_projects.drain() {
398 if let Some(project) = project.upgrade() {
399 project.update(cx, |project, cx| {
400 project.unshare(cx).log_err();
401 });
402 }
403 }
404 for project in self.joined_projects.drain() {
405 if let Some(project) = project.upgrade() {
406 project.update(cx, |project, cx| {
407 project.disconnected_from_host(cx);
408 project.close(cx);
409 });
410 }
411 }
412
413 self.status = RoomStatus::Offline;
414 self.remote_participants.clear();
415 self.pending_participants.clear();
416 self.participant_user_ids.clear();
417 self.client_subscriptions.clear();
418 self.live_kit.take();
419 self.pending_room_update.take();
420 self.maintain_connection.take();
421 }
422
423 async fn maintain_connection(
424 this: WeakModel<Self>,
425 client: Arc<Client>,
426 mut cx: AsyncAppContext,
427 ) -> Result<()> {
428 let mut client_status = client.status();
429 loop {
430 let _ = client_status.try_recv();
431 let is_connected = client_status.borrow().is_connected();
432 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
433 if !is_connected || client_status.next().await.is_some() {
434 log::info!("detected client disconnection");
435
436 this.upgrade()
437 .ok_or_else(|| anyhow!("room was dropped"))?
438 .update(&mut cx, |this, cx| {
439 this.status = RoomStatus::Rejoining;
440 cx.notify();
441 })?;
442
443 // Wait for client to re-establish a connection to the server.
444 {
445 let mut reconnection_timeout =
446 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
447 let client_reconnection = async {
448 let mut remaining_attempts = 3;
449 while remaining_attempts > 0 {
450 if client_status.borrow().is_connected() {
451 log::info!("client reconnected, attempting to rejoin room");
452
453 let Some(this) = this.upgrade() else { break };
454 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
455 Ok(task) => {
456 if task.await.log_err().is_some() {
457 return true;
458 } else {
459 remaining_attempts -= 1;
460 }
461 }
462 Err(_app_dropped) => return false,
463 }
464 } else if client_status.borrow().is_signed_out() {
465 return false;
466 }
467
468 log::info!(
469 "waiting for client status change, remaining attempts {}",
470 remaining_attempts
471 );
472 client_status.next().await;
473 }
474 false
475 }
476 .fuse();
477 futures::pin_mut!(client_reconnection);
478
479 futures::select_biased! {
480 reconnected = client_reconnection => {
481 if reconnected {
482 log::info!("successfully reconnected to room");
483 // If we successfully joined the room, go back around the loop
484 // waiting for future connection status changes.
485 continue;
486 }
487 }
488 _ = reconnection_timeout => {
489 log::info!("room reconnection timeout expired");
490 }
491 }
492 }
493
494 break;
495 }
496 }
497
498 // The client failed to re-establish a connection to the server
499 // or an error occurred while trying to re-join the room. Either way
500 // we leave the room and return an error.
501 if let Some(this) = this.upgrade() {
502 log::info!("reconnection failed, leaving room");
503 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
504 }
505 Err(anyhow!(
506 "can't reconnect to room: client failed to re-establish connection"
507 ))
508 }
509
510 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
511 let mut projects = HashMap::default();
512 let mut reshared_projects = Vec::new();
513 let mut rejoined_projects = Vec::new();
514 self.shared_projects.retain(|project| {
515 if let Some(handle) = project.upgrade() {
516 let project = handle.read(cx);
517 if let Some(project_id) = project.remote_id() {
518 projects.insert(project_id, handle.clone());
519 reshared_projects.push(proto::UpdateProject {
520 project_id,
521 worktrees: project.worktree_metadata_protos(cx),
522 });
523 return true;
524 }
525 }
526 false
527 });
528 self.joined_projects.retain(|project| {
529 if let Some(handle) = project.upgrade() {
530 let project = handle.read(cx);
531 if let Some(project_id) = project.remote_id() {
532 projects.insert(project_id, handle.clone());
533 rejoined_projects.push(proto::RejoinProject {
534 id: project_id,
535 worktrees: project
536 .worktrees()
537 .map(|worktree| {
538 let worktree = worktree.read(cx);
539 proto::RejoinWorktree {
540 id: worktree.id().to_proto(),
541 scan_id: worktree.completed_scan_id() as u64,
542 }
543 })
544 .collect(),
545 });
546 }
547 return true;
548 }
549 false
550 });
551
552 let response = self.client.request_envelope(proto::RejoinRoom {
553 id: self.id,
554 reshared_projects,
555 rejoined_projects,
556 });
557
558 cx.spawn(|this, mut cx| async move {
559 let response = response.await?;
560 let message_id = response.message_id;
561 let response = response.payload;
562 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
563 this.update(&mut cx, |this, cx| {
564 this.status = RoomStatus::Online;
565 this.apply_room_update(room_proto, cx)?;
566
567 for reshared_project in response.reshared_projects {
568 if let Some(project) = projects.get(&reshared_project.id) {
569 project.update(cx, |project, cx| {
570 project.reshared(reshared_project, cx).log_err();
571 });
572 }
573 }
574
575 for rejoined_project in response.rejoined_projects {
576 if let Some(project) = projects.get(&rejoined_project.id) {
577 project.update(cx, |project, cx| {
578 project.rejoined(rejoined_project, message_id, cx).log_err();
579 });
580 }
581 }
582
583 anyhow::Ok(())
584 })?
585 })
586 }
587
588 pub fn id(&self) -> u64 {
589 self.id
590 }
591
592 pub fn status(&self) -> RoomStatus {
593 self.status
594 }
595
596 pub fn local_participant(&self) -> &LocalParticipant {
597 &self.local_participant
598 }
599
600 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
601 &self.remote_participants
602 }
603
604 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
605 self.remote_participants
606 .values()
607 .find(|p| p.peer_id == peer_id)
608 }
609
610 pub fn pending_participants(&self) -> &[Arc<User>] {
611 &self.pending_participants
612 }
613
614 pub fn contains_participant(&self, user_id: u64) -> bool {
615 self.participant_user_ids.contains(&user_id)
616 }
617
618 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
619 self.follows_by_leader_id_project_id
620 .get(&(leader_id, project_id))
621 .map_or(&[], |v| v.as_slice())
622 }
623
624 /// Returns the most 'active' projects, defined as most people in the project
625 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
626 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
627 for participant in self.remote_participants.values() {
628 match participant.location {
629 ParticipantLocation::SharedProject { project_id } => {
630 project_hosts_and_guest_counts
631 .entry(project_id)
632 .or_default()
633 .1 += 1;
634 }
635 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
636 }
637 for project in &participant.projects {
638 project_hosts_and_guest_counts
639 .entry(project.id)
640 .or_default()
641 .0 = Some(participant.user.id);
642 }
643 }
644
645 if let Some(user) = self.user_store.read(cx).current_user() {
646 for project in &self.local_participant.projects {
647 project_hosts_and_guest_counts
648 .entry(project.id)
649 .or_default()
650 .0 = Some(user.id);
651 }
652 }
653
654 project_hosts_and_guest_counts
655 .into_iter()
656 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
657 .max_by_key(|(_, _, guest_count)| *guest_count)
658 .map(|(id, host, _)| (id, host))
659 }
660
661 async fn handle_room_updated(
662 this: Model<Self>,
663 envelope: TypedEnvelope<proto::RoomUpdated>,
664 _: Arc<Client>,
665 mut cx: AsyncAppContext,
666 ) -> Result<()> {
667 let room = envelope
668 .payload
669 .room
670 .ok_or_else(|| anyhow!("invalid room"))?;
671 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
672 }
673
674 fn apply_room_update(
675 &mut self,
676 mut room: proto::Room,
677 cx: &mut ModelContext<Self>,
678 ) -> Result<()> {
679 // Filter ourselves out from the room's participants.
680 let local_participant_ix = room
681 .participants
682 .iter()
683 .position(|participant| Some(participant.user_id) == self.client.user_id());
684 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
685
686 let pending_participant_user_ids = room
687 .pending_participants
688 .iter()
689 .map(|p| p.user_id)
690 .collect::<Vec<_>>();
691
692 let remote_participant_user_ids = room
693 .participants
694 .iter()
695 .map(|p| p.user_id)
696 .collect::<Vec<_>>();
697
698 let (remote_participants, pending_participants) =
699 self.user_store.update(cx, move |user_store, cx| {
700 (
701 user_store.get_users(remote_participant_user_ids, cx),
702 user_store.get_users(pending_participant_user_ids, cx),
703 )
704 });
705
706 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
707 let (remote_participants, pending_participants) =
708 futures::join!(remote_participants, pending_participants);
709
710 this.update(&mut cx, |this, cx| {
711 this.participant_user_ids.clear();
712
713 if let Some(participant) = local_participant {
714 this.local_participant.projects = participant.projects;
715 } else {
716 this.local_participant.projects.clear();
717 }
718
719 if let Some(participants) = remote_participants.log_err() {
720 for (participant, user) in room.participants.into_iter().zip(participants) {
721 let Some(peer_id) = participant.peer_id else {
722 continue;
723 };
724 let participant_index = ParticipantIndex(participant.participant_index);
725 this.participant_user_ids.insert(participant.user_id);
726
727 let old_projects = this
728 .remote_participants
729 .get(&participant.user_id)
730 .into_iter()
731 .flat_map(|existing| &existing.projects)
732 .map(|project| project.id)
733 .collect::<HashSet<_>>();
734 let new_projects = participant
735 .projects
736 .iter()
737 .map(|project| project.id)
738 .collect::<HashSet<_>>();
739
740 for project in &participant.projects {
741 if !old_projects.contains(&project.id) {
742 cx.emit(Event::RemoteProjectShared {
743 owner: user.clone(),
744 project_id: project.id,
745 worktree_root_names: project.worktree_root_names.clone(),
746 });
747 }
748 }
749
750 for unshared_project_id in old_projects.difference(&new_projects) {
751 this.joined_projects.retain(|project| {
752 if let Some(project) = project.upgrade() {
753 project.update(cx, |project, cx| {
754 if project.remote_id() == Some(*unshared_project_id) {
755 project.disconnected_from_host(cx);
756 false
757 } else {
758 true
759 }
760 })
761 } else {
762 false
763 }
764 });
765 cx.emit(Event::RemoteProjectUnshared {
766 project_id: *unshared_project_id,
767 });
768 }
769
770 let location = ParticipantLocation::from_proto(participant.location)
771 .unwrap_or(ParticipantLocation::External);
772 if let Some(remote_participant) =
773 this.remote_participants.get_mut(&participant.user_id)
774 {
775 remote_participant.peer_id = peer_id;
776 remote_participant.projects = participant.projects;
777 remote_participant.participant_index = participant_index;
778 if location != remote_participant.location {
779 remote_participant.location = location;
780 cx.emit(Event::ParticipantLocationChanged {
781 participant_id: peer_id,
782 });
783 }
784 } else {
785 this.remote_participants.insert(
786 participant.user_id,
787 RemoteParticipant {
788 user: user.clone(),
789 participant_index,
790 peer_id,
791 projects: participant.projects,
792 location,
793 muted: true,
794 speaking: false,
795 video_tracks: Default::default(),
796 audio_tracks: Default::default(),
797 },
798 );
799
800 Audio::play_sound(Sound::Joined, cx);
801
802 if let Some(live_kit) = this.live_kit.as_ref() {
803 let video_tracks =
804 live_kit.room.remote_video_tracks(&user.id.to_string());
805 let audio_tracks =
806 live_kit.room.remote_audio_tracks(&user.id.to_string());
807 let publications = live_kit
808 .room
809 .remote_audio_track_publications(&user.id.to_string());
810
811 for track in video_tracks {
812 this.remote_video_track_updated(
813 RemoteVideoTrackUpdate::Subscribed(track),
814 cx,
815 )
816 .log_err();
817 }
818
819 for (track, publication) in
820 audio_tracks.iter().zip(publications.iter())
821 {
822 this.remote_audio_track_updated(
823 RemoteAudioTrackUpdate::Subscribed(
824 track.clone(),
825 publication.clone(),
826 ),
827 cx,
828 )
829 .log_err();
830 }
831 }
832 }
833 }
834
835 this.remote_participants.retain(|user_id, participant| {
836 if this.participant_user_ids.contains(user_id) {
837 true
838 } else {
839 for project in &participant.projects {
840 cx.emit(Event::RemoteProjectUnshared {
841 project_id: project.id,
842 });
843 }
844 false
845 }
846 });
847 }
848
849 if let Some(pending_participants) = pending_participants.log_err() {
850 this.pending_participants = pending_participants;
851 for participant in &this.pending_participants {
852 this.participant_user_ids.insert(participant.id);
853 }
854 }
855
856 this.follows_by_leader_id_project_id.clear();
857 for follower in room.followers {
858 let project_id = follower.project_id;
859 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
860 (Some(leader), Some(follower)) => (leader, follower),
861
862 _ => {
863 log::error!("Follower message {follower:?} missing some state");
864 continue;
865 }
866 };
867
868 let list = this
869 .follows_by_leader_id_project_id
870 .entry((leader, project_id))
871 .or_insert(Vec::new());
872 if !list.contains(&follower) {
873 list.push(follower);
874 }
875 }
876
877 this.pending_room_update.take();
878 if this.should_leave() {
879 log::info!("room is empty, leaving");
880 let _ = this.leave(cx);
881 }
882
883 this.user_store.update(cx, |user_store, cx| {
884 let participant_indices_by_user_id = this
885 .remote_participants
886 .iter()
887 .map(|(user_id, participant)| (*user_id, participant.participant_index))
888 .collect();
889 user_store.set_participant_indices(participant_indices_by_user_id, cx);
890 });
891
892 this.check_invariants();
893 this.room_update_completed_tx.try_send(Some(())).ok();
894 cx.notify();
895 })
896 .ok();
897 }));
898
899 cx.notify();
900 Ok(())
901 }
902
903 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
904 let mut done_rx = self.room_update_completed_rx.clone();
905 async move {
906 while let Some(result) = done_rx.next().await {
907 if result.is_some() {
908 break;
909 }
910 }
911 }
912 }
913
914 fn remote_video_track_updated(
915 &mut self,
916 change: RemoteVideoTrackUpdate,
917 cx: &mut ModelContext<Self>,
918 ) -> Result<()> {
919 match change {
920 RemoteVideoTrackUpdate::Subscribed(track) => {
921 let user_id = track.publisher_id().parse()?;
922 let track_id = track.sid().to_string();
923 let participant = self
924 .remote_participants
925 .get_mut(&user_id)
926 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
927 participant.video_tracks.insert(track_id.clone(), track);
928 cx.emit(Event::RemoteVideoTracksChanged {
929 participant_id: participant.peer_id,
930 });
931 }
932 RemoteVideoTrackUpdate::Unsubscribed {
933 publisher_id,
934 track_id,
935 } => {
936 let user_id = publisher_id.parse()?;
937 let participant = self
938 .remote_participants
939 .get_mut(&user_id)
940 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
941 participant.video_tracks.remove(&track_id);
942 cx.emit(Event::RemoteVideoTracksChanged {
943 participant_id: participant.peer_id,
944 });
945 }
946 }
947
948 cx.notify();
949 Ok(())
950 }
951
952 fn remote_audio_track_updated(
953 &mut self,
954 change: RemoteAudioTrackUpdate,
955 cx: &mut ModelContext<Self>,
956 ) -> Result<()> {
957 match change {
958 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
959 let mut speaker_ids = speakers
960 .into_iter()
961 .filter_map(|speaker_sid| speaker_sid.parse().ok())
962 .collect::<Vec<u64>>();
963 speaker_ids.sort_unstable();
964 for (sid, participant) in &mut self.remote_participants {
965 if let Ok(_) = speaker_ids.binary_search(sid) {
966 participant.speaking = true;
967 } else {
968 participant.speaking = false;
969 }
970 }
971 if let Some(id) = self.client.user_id() {
972 if let Some(room) = &mut self.live_kit {
973 if let Ok(_) = speaker_ids.binary_search(&id) {
974 room.speaking = true;
975 } else {
976 room.speaking = false;
977 }
978 }
979 }
980 cx.notify();
981 }
982 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
983 let mut found = false;
984 for participant in &mut self.remote_participants.values_mut() {
985 for track in participant.audio_tracks.values() {
986 if track.sid() == track_id {
987 found = true;
988 break;
989 }
990 }
991 if found {
992 participant.muted = muted;
993 break;
994 }
995 }
996
997 cx.notify();
998 }
999 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1000 let user_id = track.publisher_id().parse()?;
1001 let track_id = track.sid().to_string();
1002 let participant = self
1003 .remote_participants
1004 .get_mut(&user_id)
1005 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1006 participant.audio_tracks.insert(track_id.clone(), track);
1007 participant.muted = publication.is_muted();
1008
1009 cx.emit(Event::RemoteAudioTracksChanged {
1010 participant_id: participant.peer_id,
1011 });
1012 }
1013 RemoteAudioTrackUpdate::Unsubscribed {
1014 publisher_id,
1015 track_id,
1016 } => {
1017 let user_id = publisher_id.parse()?;
1018 let participant = self
1019 .remote_participants
1020 .get_mut(&user_id)
1021 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1022 participant.audio_tracks.remove(&track_id);
1023 cx.emit(Event::RemoteAudioTracksChanged {
1024 participant_id: participant.peer_id,
1025 });
1026 }
1027 }
1028
1029 cx.notify();
1030 Ok(())
1031 }
1032
1033 fn check_invariants(&self) {
1034 #[cfg(any(test, feature = "test-support"))]
1035 {
1036 for participant in self.remote_participants.values() {
1037 assert!(self.participant_user_ids.contains(&participant.user.id));
1038 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1039 }
1040
1041 for participant in &self.pending_participants {
1042 assert!(self.participant_user_ids.contains(&participant.id));
1043 assert_ne!(participant.id, self.client.user_id().unwrap());
1044 }
1045
1046 assert_eq!(
1047 self.participant_user_ids.len(),
1048 self.remote_participants.len() + self.pending_participants.len()
1049 );
1050 }
1051 }
1052
1053 pub(crate) fn call(
1054 &mut self,
1055 called_user_id: u64,
1056 initial_project_id: Option<u64>,
1057 cx: &mut ModelContext<Self>,
1058 ) -> Task<Result<()>> {
1059 if self.status.is_offline() {
1060 return Task::ready(Err(anyhow!("room is offline")));
1061 }
1062
1063 cx.notify();
1064 let client = self.client.clone();
1065 let room_id = self.id;
1066 self.pending_call_count += 1;
1067 cx.spawn(move |this, mut cx| async move {
1068 let result = client
1069 .request(proto::Call {
1070 room_id,
1071 called_user_id,
1072 initial_project_id,
1073 })
1074 .await;
1075 this.update(&mut cx, |this, cx| {
1076 this.pending_call_count -= 1;
1077 if this.should_leave() {
1078 this.leave(cx).detach_and_log_err(cx);
1079 }
1080 })?;
1081 result?;
1082 Ok(())
1083 })
1084 }
1085
1086 pub fn join_project(
1087 &mut self,
1088 id: u64,
1089 language_registry: Arc<LanguageRegistry>,
1090 fs: Arc<dyn Fs>,
1091 cx: &mut ModelContext<Self>,
1092 ) -> Task<Result<Model<Project>>> {
1093 let client = self.client.clone();
1094 let user_store = self.user_store.clone();
1095 cx.emit(Event::RemoteProjectJoined { project_id: id });
1096 cx.spawn(move |this, mut cx| async move {
1097 let project =
1098 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1099
1100 this.update(&mut cx, |this, cx| {
1101 this.joined_projects.retain(|project| {
1102 if let Some(project) = project.upgrade() {
1103 !project.read(cx).is_read_only()
1104 } else {
1105 false
1106 }
1107 });
1108 this.joined_projects.insert(project.downgrade());
1109 })?;
1110 Ok(project)
1111 })
1112 }
1113
1114 pub(crate) fn share_project(
1115 &mut self,
1116 project: Model<Project>,
1117 cx: &mut ModelContext<Self>,
1118 ) -> Task<Result<u64>> {
1119 if let Some(project_id) = project.read(cx).remote_id() {
1120 return Task::ready(Ok(project_id));
1121 }
1122
1123 let request = self.client.request(proto::ShareProject {
1124 room_id: self.id(),
1125 worktrees: project.read(cx).worktree_metadata_protos(cx),
1126 });
1127 cx.spawn(|this, mut cx| async move {
1128 let response = request.await?;
1129
1130 project.update(&mut cx, |project, cx| {
1131 project.shared(response.project_id, cx)
1132 })??;
1133
1134 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1135 this.update(&mut cx, |this, cx| {
1136 this.shared_projects.insert(project.downgrade());
1137 let active_project = this.local_participant.active_project.as_ref();
1138 if active_project.map_or(false, |location| *location == project) {
1139 this.set_location(Some(&project), cx)
1140 } else {
1141 Task::ready(Ok(()))
1142 }
1143 })?
1144 .await?;
1145
1146 Ok(response.project_id)
1147 })
1148 }
1149
1150 pub(crate) fn unshare_project(
1151 &mut self,
1152 project: Model<Project>,
1153 cx: &mut ModelContext<Self>,
1154 ) -> Result<()> {
1155 let project_id = match project.read(cx).remote_id() {
1156 Some(project_id) => project_id,
1157 None => return Ok(()),
1158 };
1159
1160 self.client.send(proto::UnshareProject { project_id })?;
1161 project.update(cx, |this, cx| this.unshare(cx))
1162 }
1163
1164 pub(crate) fn set_location(
1165 &mut self,
1166 project: Option<&Model<Project>>,
1167 cx: &mut ModelContext<Self>,
1168 ) -> Task<Result<()>> {
1169 if self.status.is_offline() {
1170 return Task::ready(Err(anyhow!("room is offline")));
1171 }
1172
1173 let client = self.client.clone();
1174 let room_id = self.id;
1175 let location = if let Some(project) = project {
1176 self.local_participant.active_project = Some(project.downgrade());
1177 if let Some(project_id) = project.read(cx).remote_id() {
1178 proto::participant_location::Variant::SharedProject(
1179 proto::participant_location::SharedProject { id: project_id },
1180 )
1181 } else {
1182 proto::participant_location::Variant::UnsharedProject(
1183 proto::participant_location::UnsharedProject {},
1184 )
1185 }
1186 } else {
1187 self.local_participant.active_project = None;
1188 proto::participant_location::Variant::External(proto::participant_location::External {})
1189 };
1190
1191 cx.notify();
1192 cx.background_executor().spawn(async move {
1193 client
1194 .request(proto::UpdateParticipantLocation {
1195 room_id,
1196 location: Some(proto::ParticipantLocation {
1197 variant: Some(location),
1198 }),
1199 })
1200 .await?;
1201 Ok(())
1202 })
1203 }
1204
1205 pub fn is_screen_sharing(&self) -> bool {
1206 self.live_kit.as_ref().map_or(false, |live_kit| {
1207 !matches!(live_kit.screen_track, LocalTrack::None)
1208 })
1209 }
1210
1211 pub fn is_sharing_mic(&self) -> bool {
1212 self.live_kit.as_ref().map_or(false, |live_kit| {
1213 !matches!(live_kit.microphone_track, LocalTrack::None)
1214 })
1215 }
1216
1217 pub fn is_muted(&self, cx: &AppContext) -> bool {
1218 self.live_kit
1219 .as_ref()
1220 .and_then(|live_kit| match &live_kit.microphone_track {
1221 LocalTrack::None => Some(Self::mute_on_join(cx)),
1222 LocalTrack::Pending { muted, .. } => Some(*muted),
1223 LocalTrack::Published { muted, .. } => Some(*muted),
1224 })
1225 .unwrap_or(false)
1226 }
1227
1228 pub fn is_speaking(&self) -> bool {
1229 self.live_kit
1230 .as_ref()
1231 .map_or(false, |live_kit| live_kit.speaking)
1232 }
1233
1234 pub fn is_deafened(&self) -> Option<bool> {
1235 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1236 }
1237
1238 #[track_caller]
1239 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1240 if self.status.is_offline() {
1241 return Task::ready(Err(anyhow!("room is offline")));
1242 } else if self.is_sharing_mic() {
1243 return Task::ready(Err(anyhow!("microphone was already shared")));
1244 }
1245
1246 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1247 let publish_id = post_inc(&mut live_kit.next_publish_id);
1248 live_kit.microphone_track = LocalTrack::Pending {
1249 publish_id,
1250 muted: false,
1251 };
1252 cx.notify();
1253 publish_id
1254 } else {
1255 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1256 };
1257
1258 cx.spawn(move |this, mut cx| async move {
1259 let publish_track = async {
1260 let track = LocalAudioTrack::create();
1261 this.upgrade()
1262 .ok_or_else(|| anyhow!("room was dropped"))?
1263 .update(&mut cx, |this, _| {
1264 this.live_kit
1265 .as_ref()
1266 .map(|live_kit| live_kit.room.publish_audio_track(track))
1267 })?
1268 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1269 .await
1270 };
1271
1272 let publication = publish_track.await;
1273 this.upgrade()
1274 .ok_or_else(|| anyhow!("room was dropped"))?
1275 .update(&mut cx, |this, cx| {
1276 let live_kit = this
1277 .live_kit
1278 .as_mut()
1279 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1280
1281 let (canceled, muted) = if let LocalTrack::Pending {
1282 publish_id: cur_publish_id,
1283 muted,
1284 } = &live_kit.microphone_track
1285 {
1286 (*cur_publish_id != publish_id, *muted)
1287 } else {
1288 (true, false)
1289 };
1290
1291 match publication {
1292 Ok(publication) => {
1293 if canceled {
1294 live_kit.room.unpublish_track(publication);
1295 } else {
1296 if muted {
1297 cx.background_executor()
1298 .spawn(publication.set_mute(muted))
1299 .detach();
1300 }
1301 live_kit.microphone_track = LocalTrack::Published {
1302 track_publication: publication,
1303 muted,
1304 };
1305 cx.notify();
1306 }
1307 Ok(())
1308 }
1309 Err(error) => {
1310 if canceled {
1311 Ok(())
1312 } else {
1313 live_kit.microphone_track = LocalTrack::None;
1314 cx.notify();
1315 Err(error)
1316 }
1317 }
1318 }
1319 })?
1320 })
1321 }
1322
1323 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1324 if self.status.is_offline() {
1325 return Task::ready(Err(anyhow!("room is offline")));
1326 } else if self.is_screen_sharing() {
1327 return Task::ready(Err(anyhow!("screen was already shared")));
1328 }
1329
1330 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1331 let publish_id = post_inc(&mut live_kit.next_publish_id);
1332 live_kit.screen_track = LocalTrack::Pending {
1333 publish_id,
1334 muted: false,
1335 };
1336 cx.notify();
1337 (live_kit.room.display_sources(), publish_id)
1338 } else {
1339 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1340 };
1341
1342 cx.spawn(move |this, mut cx| async move {
1343 let publish_track = async {
1344 let displays = displays.await?;
1345 let display = displays
1346 .first()
1347 .ok_or_else(|| anyhow!("no display found"))?;
1348 let track = LocalVideoTrack::screen_share_for_display(&display);
1349 this.upgrade()
1350 .ok_or_else(|| anyhow!("room was dropped"))?
1351 .update(&mut cx, |this, _| {
1352 this.live_kit
1353 .as_ref()
1354 .map(|live_kit| live_kit.room.publish_video_track(track))
1355 })?
1356 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1357 .await
1358 };
1359
1360 let publication = publish_track.await;
1361 this.upgrade()
1362 .ok_or_else(|| anyhow!("room was dropped"))?
1363 .update(&mut cx, |this, cx| {
1364 let live_kit = this
1365 .live_kit
1366 .as_mut()
1367 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1368
1369 let (canceled, muted) = if let LocalTrack::Pending {
1370 publish_id: cur_publish_id,
1371 muted,
1372 } = &live_kit.screen_track
1373 {
1374 (*cur_publish_id != publish_id, *muted)
1375 } else {
1376 (true, false)
1377 };
1378
1379 match publication {
1380 Ok(publication) => {
1381 if canceled {
1382 live_kit.room.unpublish_track(publication);
1383 } else {
1384 if muted {
1385 cx.background_executor()
1386 .spawn(publication.set_mute(muted))
1387 .detach();
1388 }
1389 live_kit.screen_track = LocalTrack::Published {
1390 track_publication: publication,
1391 muted,
1392 };
1393 cx.notify();
1394 }
1395
1396 Audio::play_sound(Sound::StartScreenshare, cx);
1397
1398 Ok(())
1399 }
1400 Err(error) => {
1401 if canceled {
1402 Ok(())
1403 } else {
1404 live_kit.screen_track = LocalTrack::None;
1405 cx.notify();
1406 Err(error)
1407 }
1408 }
1409 }
1410 })?
1411 })
1412 }
1413
1414 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1415 let should_mute = !self.is_muted(cx);
1416 if let Some(live_kit) = self.live_kit.as_mut() {
1417 if matches!(live_kit.microphone_track, LocalTrack::None) {
1418 return Ok(self.share_microphone(cx));
1419 }
1420
1421 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1422 live_kit.muted_by_user = should_mute;
1423
1424 if old_muted == true && live_kit.deafened == true {
1425 if let Some(task) = self.toggle_deafen(cx).ok() {
1426 task.detach();
1427 }
1428 }
1429
1430 Ok(ret_task)
1431 } else {
1432 Err(anyhow!("LiveKit not started"))
1433 }
1434 }
1435
1436 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1437 if let Some(live_kit) = self.live_kit.as_mut() {
1438 (*live_kit).deafened = !live_kit.deafened;
1439
1440 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1441 // Context notification is sent within set_mute itself.
1442 let mut mute_task = None;
1443 // When deafening, mute user's mic as well.
1444 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1445 if live_kit.deafened || !live_kit.muted_by_user {
1446 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1447 };
1448 for participant in self.remote_participants.values() {
1449 for track in live_kit
1450 .room
1451 .remote_audio_track_publications(&participant.user.id.to_string())
1452 {
1453 let deafened = live_kit.deafened;
1454 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1455 }
1456 }
1457
1458 Ok(cx.foreground_executor().spawn(async move {
1459 if let Some(mute_task) = mute_task {
1460 mute_task.await?;
1461 }
1462 for task in tasks {
1463 task.await?;
1464 }
1465 Ok(())
1466 }))
1467 } else {
1468 Err(anyhow!("LiveKit not started"))
1469 }
1470 }
1471
1472 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1473 if self.status.is_offline() {
1474 return Err(anyhow!("room is offline"));
1475 }
1476
1477 let live_kit = self
1478 .live_kit
1479 .as_mut()
1480 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1481 match mem::take(&mut live_kit.screen_track) {
1482 LocalTrack::None => Err(anyhow!("screen was not shared")),
1483 LocalTrack::Pending { .. } => {
1484 cx.notify();
1485 Ok(())
1486 }
1487 LocalTrack::Published {
1488 track_publication, ..
1489 } => {
1490 live_kit.room.unpublish_track(track_publication);
1491 cx.notify();
1492
1493 Audio::play_sound(Sound::StopScreenshare, cx);
1494 Ok(())
1495 }
1496 }
1497 }
1498
1499 #[cfg(any(test, feature = "test-support"))]
1500 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1501 self.live_kit
1502 .as_ref()
1503 .unwrap()
1504 .room
1505 .set_display_sources(sources);
1506 }
1507}
1508
1509struct LiveKitRoom {
1510 room: Arc<live_kit_client::Room>,
1511 screen_track: LocalTrack,
1512 microphone_track: LocalTrack,
1513 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1514 muted_by_user: bool,
1515 deafened: bool,
1516 speaking: bool,
1517 next_publish_id: usize,
1518 _maintain_room: Task<()>,
1519 _maintain_tracks: [Task<()>; 2],
1520}
1521
1522impl LiveKitRoom {
1523 fn set_mute(
1524 self: &mut LiveKitRoom,
1525 should_mute: bool,
1526 cx: &mut ModelContext<Room>,
1527 ) -> Result<(Task<Result<()>>, bool)> {
1528 if !should_mute {
1529 // clear user muting state.
1530 self.muted_by_user = false;
1531 }
1532
1533 let (result, old_muted) = match &mut self.microphone_track {
1534 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1535 LocalTrack::Pending { muted, .. } => {
1536 let old_muted = *muted;
1537 *muted = should_mute;
1538 cx.notify();
1539 Ok((Task::Ready(Some(Ok(()))), old_muted))
1540 }
1541 LocalTrack::Published {
1542 track_publication,
1543 muted,
1544 } => {
1545 let old_muted = *muted;
1546 *muted = should_mute;
1547 cx.notify();
1548 Ok((
1549 cx.background_executor()
1550 .spawn(track_publication.set_mute(*muted)),
1551 old_muted,
1552 ))
1553 }
1554 }?;
1555
1556 if old_muted != should_mute {
1557 if should_mute {
1558 Audio::play_sound(Sound::Mute, cx);
1559 } else {
1560 Audio::play_sound(Sound::Unmute, cx);
1561 }
1562 }
1563
1564 Ok((result, old_muted))
1565 }
1566}
1567
1568enum LocalTrack {
1569 None,
1570 Pending {
1571 publish_id: usize,
1572 muted: bool,
1573 },
1574 Published {
1575 track_publication: LocalTrackPublication,
1576 muted: bool,
1577 },
1578}
1579
1580impl Default for LocalTrack {
1581 fn default() -> Self {
1582 Self::None
1583 }
1584}
1585
1586#[derive(Copy, Clone, PartialEq, Eq)]
1587pub enum RoomStatus {
1588 Online,
1589 Rejoining,
1590 Offline,
1591}
1592
1593impl RoomStatus {
1594 pub fn is_offline(&self) -> bool {
1595 matches!(self, RoomStatus::Offline)
1596 }
1597
1598 pub fn is_online(&self) -> bool {
1599 matches!(self, RoomStatus::Online)
1600 }
1601}