1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
16};
17use language::LanguageRegistry;
18use live_kit_client::{
19 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
20 RemoteVideoTrackUpdate,
21};
22use postage::{sink::Sink, stream::Stream, watch};
23use project::Project;
24use settings::Settings;
25use std::{future::Future, mem, sync::Arc, time::Duration};
26use util::{post_inc, ResultExt, TryFutureExt};
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub enum Event {
32 ParticipantLocationChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteVideoTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteAudioTracksChanged {
39 participant_id: proto::PeerId,
40 },
41 RemoteProjectShared {
42 owner: Arc<User>,
43 project_id: u64,
44 worktree_root_names: Vec<String>,
45 },
46 RemoteProjectUnshared {
47 project_id: u64,
48 },
49 RemoteProjectJoined {
50 project_id: u64,
51 },
52 RemoteProjectInvitationDiscarded {
53 project_id: u64,
54 },
55 Left,
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<u64>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakModel<Project>>,
64 joined_projects: HashSet<WeakModel<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Model<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter for Room {
83 type Event = Event;
84}
85
86impl Room {
87 pub fn channel_id(&self) -> Option<u64> {
88 self.channel_id
89 }
90
91 pub fn is_sharing_project(&self) -> bool {
92 !self.shared_projects.is_empty()
93 }
94
95 #[cfg(any(test, feature = "test-support"))]
96 pub fn is_connected(&self) -> bool {
97 if let Some(live_kit) = self.live_kit.as_ref() {
98 matches!(
99 *live_kit.room.status().borrow(),
100 live_kit_client::ConnectionState::Connected { .. }
101 )
102 } else {
103 false
104 }
105 }
106
107 fn new(
108 id: u64,
109 channel_id: Option<u64>,
110 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
111 client: Arc<Client>,
112 user_store: Model<UserStore>,
113 cx: &mut ModelContext<Self>,
114 ) -> Self {
115 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
116 let room = live_kit_client::Room::new();
117 let mut status = room.status();
118 // Consume the initial status of the room.
119 let _ = status.try_recv();
120 let _maintain_room = cx.spawn(|this, mut cx| async move {
121 while let Some(status) = status.next().await {
122 let this = if let Some(this) = this.upgrade() {
123 this
124 } else {
125 break;
126 };
127
128 if status == live_kit_client::ConnectionState::Disconnected {
129 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
130 .ok();
131 break;
132 }
133 }
134 });
135
136 let _maintain_video_tracks = cx.spawn({
137 let room = room.clone();
138 move |this, mut cx| async move {
139 let mut track_video_changes = room.remote_video_track_updates();
140 while let Some(track_change) = track_video_changes.next().await {
141 let this = if let Some(this) = this.upgrade() {
142 this
143 } else {
144 break;
145 };
146
147 this.update(&mut cx, |this, cx| {
148 this.remote_video_track_updated(track_change, cx).log_err()
149 })
150 .ok();
151 }
152 }
153 });
154
155 let _maintain_audio_tracks = cx.spawn({
156 let room = room.clone();
157 |this, mut cx| async move {
158 let mut track_audio_changes = room.remote_audio_track_updates();
159 while let Some(track_change) = track_audio_changes.next().await {
160 let this = if let Some(this) = this.upgrade() {
161 this
162 } else {
163 break;
164 };
165
166 this.update(&mut cx, |this, cx| {
167 this.remote_audio_track_updated(track_change, cx).log_err()
168 })
169 .ok();
170 }
171 }
172 });
173
174 let connect = room.connect(&connection_info.server_url, &connection_info.token);
175 cx.spawn(|this, mut cx| async move {
176 connect.await?;
177
178 if !cx.update(|cx| Self::mute_on_join(cx))? {
179 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
180 .await?;
181 }
182
183 anyhow::Ok(())
184 })
185 .detach_and_log_err(cx);
186
187 Some(LiveKitRoom {
188 room,
189 screen_track: LocalTrack::None,
190 microphone_track: LocalTrack::None,
191 next_publish_id: 0,
192 muted_by_user: false,
193 deafened: false,
194 speaking: false,
195 _maintain_room,
196 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
197 })
198 } else {
199 None
200 };
201
202 let maintain_connection = cx.spawn({
203 let client = client.clone();
204 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
205 });
206
207 Audio::play_sound(Sound::Joined, cx);
208
209 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
210
211 Self {
212 id,
213 channel_id,
214 live_kit: live_kit_room,
215 status: RoomStatus::Online,
216 shared_projects: Default::default(),
217 joined_projects: Default::default(),
218 participant_user_ids: Default::default(),
219 local_participant: Default::default(),
220 remote_participants: Default::default(),
221 pending_participants: Default::default(),
222 pending_call_count: 0,
223 client_subscriptions: vec![
224 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
225 ],
226 _subscriptions: vec![
227 cx.on_release(Self::released),
228 cx.on_app_quit(Self::app_will_quit),
229 ],
230 leave_when_empty: false,
231 pending_room_update: None,
232 client,
233 user_store,
234 follows_by_leader_id_project_id: Default::default(),
235 maintain_connection: Some(maintain_connection),
236 room_update_completed_tx,
237 room_update_completed_rx,
238 }
239 }
240
241 pub(crate) fn create(
242 called_user_id: u64,
243 initial_project: Option<Model<Project>>,
244 client: Arc<Client>,
245 user_store: Model<UserStore>,
246 cx: &mut AppContext,
247 ) -> Task<Result<Model<Self>>> {
248 cx.spawn(move |mut cx| async move {
249 let response = client.request(proto::CreateRoom {}).await?;
250 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
251 let room = cx.build_model(|cx| {
252 Self::new(
253 room_proto.id,
254 None,
255 response.live_kit_connection_info,
256 client,
257 user_store,
258 cx,
259 )
260 })?;
261
262 let initial_project_id = if let Some(initial_project) = initial_project {
263 let initial_project_id = room
264 .update(&mut cx, |room, cx| {
265 room.share_project(initial_project.clone(), cx)
266 })?
267 .await?;
268 Some(initial_project_id)
269 } else {
270 None
271 };
272
273 match room
274 .update(&mut cx, |room, cx| {
275 room.leave_when_empty = true;
276 room.call(called_user_id, initial_project_id, cx)
277 })?
278 .await
279 {
280 Ok(()) => Ok(room),
281 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
282 }
283 })
284 }
285
286 pub(crate) async fn join_channel(
287 channel_id: u64,
288 client: Arc<Client>,
289 user_store: Model<UserStore>,
290 cx: AsyncAppContext,
291 ) -> Result<Model<Self>> {
292 Self::from_join_response(
293 client.request(proto::JoinChannel { channel_id }).await?,
294 client,
295 user_store,
296 cx,
297 )
298 }
299
300 pub(crate) async fn join(
301 room_id: u64,
302 client: Arc<Client>,
303 user_store: Model<UserStore>,
304 cx: AsyncAppContext,
305 ) -> Result<Model<Self>> {
306 Self::from_join_response(
307 client.request(proto::JoinRoom { id: room_id }).await?,
308 client,
309 user_store,
310 cx,
311 )
312 }
313
314 fn released(&mut self, cx: &mut AppContext) {
315 if self.status.is_online() {
316 self.leave_internal(cx).detach_and_log_err(cx);
317 }
318 }
319
320 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
321 let task = if self.status.is_online() {
322 let leave = self.leave_internal(cx);
323 Some(cx.background_executor().spawn(async move {
324 leave.await.log_err();
325 }))
326 } else {
327 None
328 };
329
330 async move {
331 if let Some(task) = task {
332 task.await;
333 }
334 }
335 }
336
337 pub fn mute_on_join(cx: &AppContext) -> bool {
338 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
339 }
340
341 fn from_join_response(
342 response: proto::JoinRoomResponse,
343 client: Arc<Client>,
344 user_store: Model<UserStore>,
345 mut cx: AsyncAppContext,
346 ) -> Result<Model<Self>> {
347 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
348 let room = cx.build_model(|cx| {
349 Self::new(
350 room_proto.id,
351 response.channel_id,
352 response.live_kit_connection_info,
353 client,
354 user_store,
355 cx,
356 )
357 })?;
358 room.update(&mut cx, |room, cx| {
359 room.leave_when_empty = room.channel_id.is_none();
360 room.apply_room_update(room_proto, cx)?;
361 anyhow::Ok(())
362 })??;
363 Ok(room)
364 }
365
366 fn should_leave(&self) -> bool {
367 self.leave_when_empty
368 && self.pending_room_update.is_none()
369 && self.pending_participants.is_empty()
370 && self.remote_participants.is_empty()
371 && self.pending_call_count == 0
372 }
373
374 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
375 cx.notify();
376 cx.emit(Event::Left);
377 self.leave_internal(cx)
378 }
379
380 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
381 if self.status.is_offline() {
382 return Task::ready(Err(anyhow!("room is offline")));
383 }
384
385 log::info!("leaving room");
386 Audio::play_sound(Sound::Leave, cx);
387
388 self.clear_state(cx);
389
390 let leave_room = self.client.request(proto::LeaveRoom {});
391 cx.background_executor().spawn(async move {
392 leave_room.await?;
393 anyhow::Ok(())
394 })
395 }
396
397 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
398 for project in self.shared_projects.drain() {
399 if let Some(project) = project.upgrade() {
400 project.update(cx, |project, cx| {
401 project.unshare(cx).log_err();
402 });
403 }
404 }
405 for project in self.joined_projects.drain() {
406 if let Some(project) = project.upgrade() {
407 project.update(cx, |project, cx| {
408 project.disconnected_from_host(cx);
409 project.close(cx);
410 });
411 }
412 }
413
414 self.status = RoomStatus::Offline;
415 self.remote_participants.clear();
416 self.pending_participants.clear();
417 self.participant_user_ids.clear();
418 self.client_subscriptions.clear();
419 self.live_kit.take();
420 self.pending_room_update.take();
421 self.maintain_connection.take();
422 }
423
424 async fn maintain_connection(
425 this: WeakModel<Self>,
426 client: Arc<Client>,
427 mut cx: AsyncAppContext,
428 ) -> Result<()> {
429 let mut client_status = client.status();
430 loop {
431 let _ = client_status.try_recv();
432 let is_connected = client_status.borrow().is_connected();
433 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
434 if !is_connected || client_status.next().await.is_some() {
435 log::info!("detected client disconnection");
436
437 this.upgrade()
438 .ok_or_else(|| anyhow!("room was dropped"))?
439 .update(&mut cx, |this, cx| {
440 this.status = RoomStatus::Rejoining;
441 cx.notify();
442 })?;
443
444 // Wait for client to re-establish a connection to the server.
445 {
446 let mut reconnection_timeout =
447 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
448 let client_reconnection = async {
449 let mut remaining_attempts = 3;
450 while remaining_attempts > 0 {
451 if client_status.borrow().is_connected() {
452 log::info!("client reconnected, attempting to rejoin room");
453
454 let Some(this) = this.upgrade() else { break };
455 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
456 Ok(task) => {
457 if task.await.log_err().is_some() {
458 return true;
459 } else {
460 remaining_attempts -= 1;
461 }
462 }
463 Err(_app_dropped) => return false,
464 }
465 } else if client_status.borrow().is_signed_out() {
466 return false;
467 }
468
469 log::info!(
470 "waiting for client status change, remaining attempts {}",
471 remaining_attempts
472 );
473 client_status.next().await;
474 }
475 false
476 }
477 .fuse();
478 futures::pin_mut!(client_reconnection);
479
480 futures::select_biased! {
481 reconnected = client_reconnection => {
482 if reconnected {
483 log::info!("successfully reconnected to room");
484 // If we successfully joined the room, go back around the loop
485 // waiting for future connection status changes.
486 continue;
487 }
488 }
489 _ = reconnection_timeout => {
490 log::info!("room reconnection timeout expired");
491 }
492 }
493 }
494
495 break;
496 }
497 }
498
499 // The client failed to re-establish a connection to the server
500 // or an error occurred while trying to re-join the room. Either way
501 // we leave the room and return an error.
502 if let Some(this) = this.upgrade() {
503 log::info!("reconnection failed, leaving room");
504 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
505 }
506 Err(anyhow!(
507 "can't reconnect to room: client failed to re-establish connection"
508 ))
509 }
510
511 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
512 let mut projects = HashMap::default();
513 let mut reshared_projects = Vec::new();
514 let mut rejoined_projects = Vec::new();
515 self.shared_projects.retain(|project| {
516 if let Some(handle) = project.upgrade() {
517 let project = handle.read(cx);
518 if let Some(project_id) = project.remote_id() {
519 projects.insert(project_id, handle.clone());
520 reshared_projects.push(proto::UpdateProject {
521 project_id,
522 worktrees: project.worktree_metadata_protos(cx),
523 });
524 return true;
525 }
526 }
527 false
528 });
529 self.joined_projects.retain(|project| {
530 if let Some(handle) = project.upgrade() {
531 let project = handle.read(cx);
532 if let Some(project_id) = project.remote_id() {
533 projects.insert(project_id, handle.clone());
534 rejoined_projects.push(proto::RejoinProject {
535 id: project_id,
536 worktrees: project
537 .worktrees()
538 .map(|worktree| {
539 let worktree = worktree.read(cx);
540 proto::RejoinWorktree {
541 id: worktree.id().to_proto(),
542 scan_id: worktree.completed_scan_id() as u64,
543 }
544 })
545 .collect(),
546 });
547 }
548 return true;
549 }
550 false
551 });
552
553 let response = self.client.request_envelope(proto::RejoinRoom {
554 id: self.id,
555 reshared_projects,
556 rejoined_projects,
557 });
558
559 cx.spawn(|this, mut cx| async move {
560 let response = response.await?;
561 let message_id = response.message_id;
562 let response = response.payload;
563 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
564 this.update(&mut cx, |this, cx| {
565 this.status = RoomStatus::Online;
566 this.apply_room_update(room_proto, cx)?;
567
568 for reshared_project in response.reshared_projects {
569 if let Some(project) = projects.get(&reshared_project.id) {
570 project.update(cx, |project, cx| {
571 project.reshared(reshared_project, cx).log_err();
572 });
573 }
574 }
575
576 for rejoined_project in response.rejoined_projects {
577 if let Some(project) = projects.get(&rejoined_project.id) {
578 project.update(cx, |project, cx| {
579 project.rejoined(rejoined_project, message_id, cx).log_err();
580 });
581 }
582 }
583
584 anyhow::Ok(())
585 })?
586 })
587 }
588
589 pub fn id(&self) -> u64 {
590 self.id
591 }
592
593 pub fn status(&self) -> RoomStatus {
594 self.status
595 }
596
597 pub fn local_participant(&self) -> &LocalParticipant {
598 &self.local_participant
599 }
600
601 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
602 &self.remote_participants
603 }
604
605 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
606 self.remote_participants
607 .values()
608 .find(|p| p.peer_id == peer_id)
609 }
610
611 pub fn pending_participants(&self) -> &[Arc<User>] {
612 &self.pending_participants
613 }
614
615 pub fn contains_participant(&self, user_id: u64) -> bool {
616 self.participant_user_ids.contains(&user_id)
617 }
618
619 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
620 self.follows_by_leader_id_project_id
621 .get(&(leader_id, project_id))
622 .map_or(&[], |v| v.as_slice())
623 }
624
625 /// Returns the most 'active' projects, defined as most people in the project
626 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
627 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
628 for participant in self.remote_participants.values() {
629 match participant.location {
630 ParticipantLocation::SharedProject { project_id } => {
631 project_hosts_and_guest_counts
632 .entry(project_id)
633 .or_default()
634 .1 += 1;
635 }
636 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
637 }
638 for project in &participant.projects {
639 project_hosts_and_guest_counts
640 .entry(project.id)
641 .or_default()
642 .0 = Some(participant.user.id);
643 }
644 }
645
646 if let Some(user) = self.user_store.read(cx).current_user() {
647 for project in &self.local_participant.projects {
648 project_hosts_and_guest_counts
649 .entry(project.id)
650 .or_default()
651 .0 = Some(user.id);
652 }
653 }
654
655 project_hosts_and_guest_counts
656 .into_iter()
657 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
658 .max_by_key(|(_, _, guest_count)| *guest_count)
659 .map(|(id, host, _)| (id, host))
660 }
661
662 async fn handle_room_updated(
663 this: Model<Self>,
664 envelope: TypedEnvelope<proto::RoomUpdated>,
665 _: Arc<Client>,
666 mut cx: AsyncAppContext,
667 ) -> Result<()> {
668 let room = envelope
669 .payload
670 .room
671 .ok_or_else(|| anyhow!("invalid room"))?;
672 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
673 }
674
675 fn apply_room_update(
676 &mut self,
677 mut room: proto::Room,
678 cx: &mut ModelContext<Self>,
679 ) -> Result<()> {
680 // Filter ourselves out from the room's participants.
681 let local_participant_ix = room
682 .participants
683 .iter()
684 .position(|participant| Some(participant.user_id) == self.client.user_id());
685 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
686
687 let pending_participant_user_ids = room
688 .pending_participants
689 .iter()
690 .map(|p| p.user_id)
691 .collect::<Vec<_>>();
692
693 let remote_participant_user_ids = room
694 .participants
695 .iter()
696 .map(|p| p.user_id)
697 .collect::<Vec<_>>();
698
699 let (remote_participants, pending_participants) =
700 self.user_store.update(cx, move |user_store, cx| {
701 (
702 user_store.get_users(remote_participant_user_ids, cx),
703 user_store.get_users(pending_participant_user_ids, cx),
704 )
705 });
706
707 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
708 let (remote_participants, pending_participants) =
709 futures::join!(remote_participants, pending_participants);
710
711 this.update(&mut cx, |this, cx| {
712 this.participant_user_ids.clear();
713
714 if let Some(participant) = local_participant {
715 this.local_participant.projects = participant.projects;
716 } else {
717 this.local_participant.projects.clear();
718 }
719
720 if let Some(participants) = remote_participants.log_err() {
721 for (participant, user) in room.participants.into_iter().zip(participants) {
722 let Some(peer_id) = participant.peer_id else {
723 continue;
724 };
725 let participant_index = ParticipantIndex(participant.participant_index);
726 this.participant_user_ids.insert(participant.user_id);
727
728 let old_projects = this
729 .remote_participants
730 .get(&participant.user_id)
731 .into_iter()
732 .flat_map(|existing| &existing.projects)
733 .map(|project| project.id)
734 .collect::<HashSet<_>>();
735 let new_projects = participant
736 .projects
737 .iter()
738 .map(|project| project.id)
739 .collect::<HashSet<_>>();
740
741 for project in &participant.projects {
742 if !old_projects.contains(&project.id) {
743 cx.emit(Event::RemoteProjectShared {
744 owner: user.clone(),
745 project_id: project.id,
746 worktree_root_names: project.worktree_root_names.clone(),
747 });
748 }
749 }
750
751 for unshared_project_id in old_projects.difference(&new_projects) {
752 this.joined_projects.retain(|project| {
753 if let Some(project) = project.upgrade() {
754 project.update(cx, |project, cx| {
755 if project.remote_id() == Some(*unshared_project_id) {
756 project.disconnected_from_host(cx);
757 false
758 } else {
759 true
760 }
761 })
762 } else {
763 false
764 }
765 });
766 cx.emit(Event::RemoteProjectUnshared {
767 project_id: *unshared_project_id,
768 });
769 }
770
771 let location = ParticipantLocation::from_proto(participant.location)
772 .unwrap_or(ParticipantLocation::External);
773 if let Some(remote_participant) =
774 this.remote_participants.get_mut(&participant.user_id)
775 {
776 remote_participant.peer_id = peer_id;
777 remote_participant.projects = participant.projects;
778 remote_participant.participant_index = participant_index;
779 if location != remote_participant.location {
780 remote_participant.location = location;
781 cx.emit(Event::ParticipantLocationChanged {
782 participant_id: peer_id,
783 });
784 }
785 } else {
786 this.remote_participants.insert(
787 participant.user_id,
788 RemoteParticipant {
789 user: user.clone(),
790 participant_index,
791 peer_id,
792 projects: participant.projects,
793 location,
794 muted: true,
795 speaking: false,
796 video_tracks: Default::default(),
797 audio_tracks: Default::default(),
798 },
799 );
800
801 Audio::play_sound(Sound::Joined, cx);
802
803 if let Some(live_kit) = this.live_kit.as_ref() {
804 let video_tracks =
805 live_kit.room.remote_video_tracks(&user.id.to_string());
806 let audio_tracks =
807 live_kit.room.remote_audio_tracks(&user.id.to_string());
808 let publications = live_kit
809 .room
810 .remote_audio_track_publications(&user.id.to_string());
811
812 for track in video_tracks {
813 this.remote_video_track_updated(
814 RemoteVideoTrackUpdate::Subscribed(track),
815 cx,
816 )
817 .log_err();
818 }
819
820 for (track, publication) in
821 audio_tracks.iter().zip(publications.iter())
822 {
823 this.remote_audio_track_updated(
824 RemoteAudioTrackUpdate::Subscribed(
825 track.clone(),
826 publication.clone(),
827 ),
828 cx,
829 )
830 .log_err();
831 }
832 }
833 }
834 }
835
836 this.remote_participants.retain(|user_id, participant| {
837 if this.participant_user_ids.contains(user_id) {
838 true
839 } else {
840 for project in &participant.projects {
841 cx.emit(Event::RemoteProjectUnshared {
842 project_id: project.id,
843 });
844 }
845 false
846 }
847 });
848 }
849
850 if let Some(pending_participants) = pending_participants.log_err() {
851 this.pending_participants = pending_participants;
852 for participant in &this.pending_participants {
853 this.participant_user_ids.insert(participant.id);
854 }
855 }
856
857 this.follows_by_leader_id_project_id.clear();
858 for follower in room.followers {
859 let project_id = follower.project_id;
860 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
861 (Some(leader), Some(follower)) => (leader, follower),
862
863 _ => {
864 log::error!("Follower message {follower:?} missing some state");
865 continue;
866 }
867 };
868
869 let list = this
870 .follows_by_leader_id_project_id
871 .entry((leader, project_id))
872 .or_insert(Vec::new());
873 if !list.contains(&follower) {
874 list.push(follower);
875 }
876 }
877
878 this.pending_room_update.take();
879 if this.should_leave() {
880 log::info!("room is empty, leaving");
881 let _ = this.leave(cx);
882 }
883
884 this.user_store.update(cx, |user_store, cx| {
885 let participant_indices_by_user_id = this
886 .remote_participants
887 .iter()
888 .map(|(user_id, participant)| (*user_id, participant.participant_index))
889 .collect();
890 user_store.set_participant_indices(participant_indices_by_user_id, cx);
891 });
892
893 this.check_invariants();
894 this.room_update_completed_tx.try_send(Some(())).ok();
895 cx.notify();
896 })
897 .ok();
898 }));
899
900 cx.notify();
901 Ok(())
902 }
903
904 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
905 let mut done_rx = self.room_update_completed_rx.clone();
906 async move {
907 while let Some(result) = done_rx.next().await {
908 if result.is_some() {
909 break;
910 }
911 }
912 }
913 }
914
915 fn remote_video_track_updated(
916 &mut self,
917 change: RemoteVideoTrackUpdate,
918 cx: &mut ModelContext<Self>,
919 ) -> Result<()> {
920 match change {
921 RemoteVideoTrackUpdate::Subscribed(track) => {
922 let user_id = track.publisher_id().parse()?;
923 let track_id = track.sid().to_string();
924 let participant = self
925 .remote_participants
926 .get_mut(&user_id)
927 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
928 participant.video_tracks.insert(track_id.clone(), track);
929 cx.emit(Event::RemoteVideoTracksChanged {
930 participant_id: participant.peer_id,
931 });
932 }
933 RemoteVideoTrackUpdate::Unsubscribed {
934 publisher_id,
935 track_id,
936 } => {
937 let user_id = publisher_id.parse()?;
938 let participant = self
939 .remote_participants
940 .get_mut(&user_id)
941 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
942 participant.video_tracks.remove(&track_id);
943 cx.emit(Event::RemoteVideoTracksChanged {
944 participant_id: participant.peer_id,
945 });
946 }
947 }
948
949 cx.notify();
950 Ok(())
951 }
952
953 fn remote_audio_track_updated(
954 &mut self,
955 change: RemoteAudioTrackUpdate,
956 cx: &mut ModelContext<Self>,
957 ) -> Result<()> {
958 match change {
959 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
960 let mut speaker_ids = speakers
961 .into_iter()
962 .filter_map(|speaker_sid| speaker_sid.parse().ok())
963 .collect::<Vec<u64>>();
964 speaker_ids.sort_unstable();
965 for (sid, participant) in &mut self.remote_participants {
966 if let Ok(_) = speaker_ids.binary_search(sid) {
967 participant.speaking = true;
968 } else {
969 participant.speaking = false;
970 }
971 }
972 if let Some(id) = self.client.user_id() {
973 if let Some(room) = &mut self.live_kit {
974 if let Ok(_) = speaker_ids.binary_search(&id) {
975 room.speaking = true;
976 } else {
977 room.speaking = false;
978 }
979 }
980 }
981 cx.notify();
982 }
983 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
984 let mut found = false;
985 for participant in &mut self.remote_participants.values_mut() {
986 for track in participant.audio_tracks.values() {
987 if track.sid() == track_id {
988 found = true;
989 break;
990 }
991 }
992 if found {
993 participant.muted = muted;
994 break;
995 }
996 }
997
998 cx.notify();
999 }
1000 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1001 let user_id = track.publisher_id().parse()?;
1002 let track_id = track.sid().to_string();
1003 let participant = self
1004 .remote_participants
1005 .get_mut(&user_id)
1006 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1007 participant.audio_tracks.insert(track_id.clone(), track);
1008 participant.muted = publication.is_muted();
1009
1010 cx.emit(Event::RemoteAudioTracksChanged {
1011 participant_id: participant.peer_id,
1012 });
1013 }
1014 RemoteAudioTrackUpdate::Unsubscribed {
1015 publisher_id,
1016 track_id,
1017 } => {
1018 let user_id = publisher_id.parse()?;
1019 let participant = self
1020 .remote_participants
1021 .get_mut(&user_id)
1022 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1023 participant.audio_tracks.remove(&track_id);
1024 cx.emit(Event::RemoteAudioTracksChanged {
1025 participant_id: participant.peer_id,
1026 });
1027 }
1028 }
1029
1030 cx.notify();
1031 Ok(())
1032 }
1033
1034 fn check_invariants(&self) {
1035 #[cfg(any(test, feature = "test-support"))]
1036 {
1037 for participant in self.remote_participants.values() {
1038 assert!(self.participant_user_ids.contains(&participant.user.id));
1039 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1040 }
1041
1042 for participant in &self.pending_participants {
1043 assert!(self.participant_user_ids.contains(&participant.id));
1044 assert_ne!(participant.id, self.client.user_id().unwrap());
1045 }
1046
1047 assert_eq!(
1048 self.participant_user_ids.len(),
1049 self.remote_participants.len() + self.pending_participants.len()
1050 );
1051 }
1052 }
1053
1054 pub(crate) fn call(
1055 &mut self,
1056 called_user_id: u64,
1057 initial_project_id: Option<u64>,
1058 cx: &mut ModelContext<Self>,
1059 ) -> Task<Result<()>> {
1060 if self.status.is_offline() {
1061 return Task::ready(Err(anyhow!("room is offline")));
1062 }
1063
1064 cx.notify();
1065 let client = self.client.clone();
1066 let room_id = self.id;
1067 self.pending_call_count += 1;
1068 cx.spawn(move |this, mut cx| async move {
1069 let result = client
1070 .request(proto::Call {
1071 room_id,
1072 called_user_id,
1073 initial_project_id,
1074 })
1075 .await;
1076 this.update(&mut cx, |this, cx| {
1077 this.pending_call_count -= 1;
1078 if this.should_leave() {
1079 this.leave(cx).detach_and_log_err(cx);
1080 }
1081 })?;
1082 result?;
1083 Ok(())
1084 })
1085 }
1086
1087 pub fn join_project(
1088 &mut self,
1089 id: u64,
1090 language_registry: Arc<LanguageRegistry>,
1091 fs: Arc<dyn Fs>,
1092 cx: &mut ModelContext<Self>,
1093 ) -> Task<Result<Model<Project>>> {
1094 let client = self.client.clone();
1095 let user_store = self.user_store.clone();
1096 cx.emit(Event::RemoteProjectJoined { project_id: id });
1097 cx.spawn(move |this, mut cx| async move {
1098 let project =
1099 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1100
1101 this.update(&mut cx, |this, cx| {
1102 this.joined_projects.retain(|project| {
1103 if let Some(project) = project.upgrade() {
1104 !project.read(cx).is_read_only()
1105 } else {
1106 false
1107 }
1108 });
1109 this.joined_projects.insert(project.downgrade());
1110 })?;
1111 Ok(project)
1112 })
1113 }
1114
1115 pub(crate) fn share_project(
1116 &mut self,
1117 project: Model<Project>,
1118 cx: &mut ModelContext<Self>,
1119 ) -> Task<Result<u64>> {
1120 if let Some(project_id) = project.read(cx).remote_id() {
1121 return Task::ready(Ok(project_id));
1122 }
1123
1124 let request = self.client.request(proto::ShareProject {
1125 room_id: self.id(),
1126 worktrees: project.read(cx).worktree_metadata_protos(cx),
1127 });
1128 cx.spawn(|this, mut cx| async move {
1129 let response = request.await?;
1130
1131 project.update(&mut cx, |project, cx| {
1132 project.shared(response.project_id, cx)
1133 })??;
1134
1135 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1136 this.update(&mut cx, |this, cx| {
1137 this.shared_projects.insert(project.downgrade());
1138 let active_project = this.local_participant.active_project.as_ref();
1139 if active_project.map_or(false, |location| *location == project) {
1140 this.set_location(Some(&project), cx)
1141 } else {
1142 Task::ready(Ok(()))
1143 }
1144 })?
1145 .await?;
1146
1147 Ok(response.project_id)
1148 })
1149 }
1150
1151 pub(crate) fn unshare_project(
1152 &mut self,
1153 project: Model<Project>,
1154 cx: &mut ModelContext<Self>,
1155 ) -> Result<()> {
1156 let project_id = match project.read(cx).remote_id() {
1157 Some(project_id) => project_id,
1158 None => return Ok(()),
1159 };
1160
1161 self.client.send(proto::UnshareProject { project_id })?;
1162 project.update(cx, |this, cx| this.unshare(cx))
1163 }
1164
1165 pub(crate) fn set_location(
1166 &mut self,
1167 project: Option<&Model<Project>>,
1168 cx: &mut ModelContext<Self>,
1169 ) -> Task<Result<()>> {
1170 if self.status.is_offline() {
1171 return Task::ready(Err(anyhow!("room is offline")));
1172 }
1173
1174 let client = self.client.clone();
1175 let room_id = self.id;
1176 let location = if let Some(project) = project {
1177 self.local_participant.active_project = Some(project.downgrade());
1178 if let Some(project_id) = project.read(cx).remote_id() {
1179 proto::participant_location::Variant::SharedProject(
1180 proto::participant_location::SharedProject { id: project_id },
1181 )
1182 } else {
1183 proto::participant_location::Variant::UnsharedProject(
1184 proto::participant_location::UnsharedProject {},
1185 )
1186 }
1187 } else {
1188 self.local_participant.active_project = None;
1189 proto::participant_location::Variant::External(proto::participant_location::External {})
1190 };
1191
1192 cx.notify();
1193 cx.background_executor().spawn(async move {
1194 client
1195 .request(proto::UpdateParticipantLocation {
1196 room_id,
1197 location: Some(proto::ParticipantLocation {
1198 variant: Some(location),
1199 }),
1200 })
1201 .await?;
1202 Ok(())
1203 })
1204 }
1205
1206 pub fn is_screen_sharing(&self) -> bool {
1207 self.live_kit.as_ref().map_or(false, |live_kit| {
1208 !matches!(live_kit.screen_track, LocalTrack::None)
1209 })
1210 }
1211
1212 pub fn is_sharing_mic(&self) -> bool {
1213 self.live_kit.as_ref().map_or(false, |live_kit| {
1214 !matches!(live_kit.microphone_track, LocalTrack::None)
1215 })
1216 }
1217
1218 pub fn is_muted(&self, cx: &AppContext) -> bool {
1219 self.live_kit
1220 .as_ref()
1221 .and_then(|live_kit| match &live_kit.microphone_track {
1222 LocalTrack::None => Some(Self::mute_on_join(cx)),
1223 LocalTrack::Pending { muted, .. } => Some(*muted),
1224 LocalTrack::Published { muted, .. } => Some(*muted),
1225 })
1226 .unwrap_or(false)
1227 }
1228
1229 pub fn is_speaking(&self) -> bool {
1230 self.live_kit
1231 .as_ref()
1232 .map_or(false, |live_kit| live_kit.speaking)
1233 }
1234
1235 pub fn is_deafened(&self) -> Option<bool> {
1236 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1237 }
1238
1239 #[track_caller]
1240 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1241 if self.status.is_offline() {
1242 return Task::ready(Err(anyhow!("room is offline")));
1243 } else if self.is_sharing_mic() {
1244 return Task::ready(Err(anyhow!("microphone was already shared")));
1245 }
1246
1247 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1248 let publish_id = post_inc(&mut live_kit.next_publish_id);
1249 live_kit.microphone_track = LocalTrack::Pending {
1250 publish_id,
1251 muted: false,
1252 };
1253 cx.notify();
1254 publish_id
1255 } else {
1256 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1257 };
1258
1259 cx.spawn(move |this, mut cx| async move {
1260 let publish_track = async {
1261 let track = LocalAudioTrack::create();
1262 this.upgrade()
1263 .ok_or_else(|| anyhow!("room was dropped"))?
1264 .update(&mut cx, |this, _| {
1265 this.live_kit
1266 .as_ref()
1267 .map(|live_kit| live_kit.room.publish_audio_track(track))
1268 })?
1269 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1270 .await
1271 };
1272
1273 let publication = publish_track.await;
1274 this.upgrade()
1275 .ok_or_else(|| anyhow!("room was dropped"))?
1276 .update(&mut cx, |this, cx| {
1277 let live_kit = this
1278 .live_kit
1279 .as_mut()
1280 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1281
1282 let (canceled, muted) = if let LocalTrack::Pending {
1283 publish_id: cur_publish_id,
1284 muted,
1285 } = &live_kit.microphone_track
1286 {
1287 (*cur_publish_id != publish_id, *muted)
1288 } else {
1289 (true, false)
1290 };
1291
1292 match publication {
1293 Ok(publication) => {
1294 if canceled {
1295 live_kit.room.unpublish_track(publication);
1296 } else {
1297 if muted {
1298 cx.background_executor()
1299 .spawn(publication.set_mute(muted))
1300 .detach();
1301 }
1302 live_kit.microphone_track = LocalTrack::Published {
1303 track_publication: publication,
1304 muted,
1305 };
1306 cx.notify();
1307 }
1308 Ok(())
1309 }
1310 Err(error) => {
1311 if canceled {
1312 Ok(())
1313 } else {
1314 live_kit.microphone_track = LocalTrack::None;
1315 cx.notify();
1316 Err(error)
1317 }
1318 }
1319 }
1320 })?
1321 })
1322 }
1323
1324 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1325 if self.status.is_offline() {
1326 return Task::ready(Err(anyhow!("room is offline")));
1327 } else if self.is_screen_sharing() {
1328 return Task::ready(Err(anyhow!("screen was already shared")));
1329 }
1330
1331 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1332 let publish_id = post_inc(&mut live_kit.next_publish_id);
1333 live_kit.screen_track = LocalTrack::Pending {
1334 publish_id,
1335 muted: false,
1336 };
1337 cx.notify();
1338 (live_kit.room.display_sources(), publish_id)
1339 } else {
1340 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1341 };
1342
1343 cx.spawn(move |this, mut cx| async move {
1344 let publish_track = async {
1345 let displays = displays.await?;
1346 let display = displays
1347 .first()
1348 .ok_or_else(|| anyhow!("no display found"))?;
1349 let track = LocalVideoTrack::screen_share_for_display(&display);
1350 this.upgrade()
1351 .ok_or_else(|| anyhow!("room was dropped"))?
1352 .update(&mut cx, |this, _| {
1353 this.live_kit
1354 .as_ref()
1355 .map(|live_kit| live_kit.room.publish_video_track(track))
1356 })?
1357 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1358 .await
1359 };
1360
1361 let publication = publish_track.await;
1362 this.upgrade()
1363 .ok_or_else(|| anyhow!("room was dropped"))?
1364 .update(&mut cx, |this, cx| {
1365 let live_kit = this
1366 .live_kit
1367 .as_mut()
1368 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1369
1370 let (canceled, muted) = if let LocalTrack::Pending {
1371 publish_id: cur_publish_id,
1372 muted,
1373 } = &live_kit.screen_track
1374 {
1375 (*cur_publish_id != publish_id, *muted)
1376 } else {
1377 (true, false)
1378 };
1379
1380 match publication {
1381 Ok(publication) => {
1382 if canceled {
1383 live_kit.room.unpublish_track(publication);
1384 } else {
1385 if muted {
1386 cx.background_executor()
1387 .spawn(publication.set_mute(muted))
1388 .detach();
1389 }
1390 live_kit.screen_track = LocalTrack::Published {
1391 track_publication: publication,
1392 muted,
1393 };
1394 cx.notify();
1395 }
1396
1397 Audio::play_sound(Sound::StartScreenshare, cx);
1398
1399 Ok(())
1400 }
1401 Err(error) => {
1402 if canceled {
1403 Ok(())
1404 } else {
1405 live_kit.screen_track = LocalTrack::None;
1406 cx.notify();
1407 Err(error)
1408 }
1409 }
1410 }
1411 })?
1412 })
1413 }
1414
1415 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1416 let should_mute = !self.is_muted(cx);
1417 if let Some(live_kit) = self.live_kit.as_mut() {
1418 if matches!(live_kit.microphone_track, LocalTrack::None) {
1419 return Ok(self.share_microphone(cx));
1420 }
1421
1422 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1423 live_kit.muted_by_user = should_mute;
1424
1425 if old_muted == true && live_kit.deafened == true {
1426 if let Some(task) = self.toggle_deafen(cx).ok() {
1427 task.detach();
1428 }
1429 }
1430
1431 Ok(ret_task)
1432 } else {
1433 Err(anyhow!("LiveKit not started"))
1434 }
1435 }
1436
1437 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1438 if let Some(live_kit) = self.live_kit.as_mut() {
1439 (*live_kit).deafened = !live_kit.deafened;
1440
1441 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1442 // Context notification is sent within set_mute itself.
1443 let mut mute_task = None;
1444 // When deafening, mute user's mic as well.
1445 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1446 if live_kit.deafened || !live_kit.muted_by_user {
1447 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1448 };
1449 for participant in self.remote_participants.values() {
1450 for track in live_kit
1451 .room
1452 .remote_audio_track_publications(&participant.user.id.to_string())
1453 {
1454 let deafened = live_kit.deafened;
1455 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1456 }
1457 }
1458
1459 Ok(cx.foreground_executor().spawn(async move {
1460 if let Some(mute_task) = mute_task {
1461 mute_task.await?;
1462 }
1463 for task in tasks {
1464 task.await?;
1465 }
1466 Ok(())
1467 }))
1468 } else {
1469 Err(anyhow!("LiveKit not started"))
1470 }
1471 }
1472
1473 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1474 if self.status.is_offline() {
1475 return Err(anyhow!("room is offline"));
1476 }
1477
1478 let live_kit = self
1479 .live_kit
1480 .as_mut()
1481 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1482 match mem::take(&mut live_kit.screen_track) {
1483 LocalTrack::None => Err(anyhow!("screen was not shared")),
1484 LocalTrack::Pending { .. } => {
1485 cx.notify();
1486 Ok(())
1487 }
1488 LocalTrack::Published {
1489 track_publication, ..
1490 } => {
1491 live_kit.room.unpublish_track(track_publication);
1492 cx.notify();
1493
1494 Audio::play_sound(Sound::StopScreenshare, cx);
1495 Ok(())
1496 }
1497 }
1498 }
1499
1500 #[cfg(any(test, feature = "test-support"))]
1501 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1502 self.live_kit
1503 .as_ref()
1504 .unwrap()
1505 .room
1506 .set_display_sources(sources);
1507 }
1508}
1509
1510struct LiveKitRoom {
1511 room: Arc<live_kit_client::Room>,
1512 screen_track: LocalTrack,
1513 microphone_track: LocalTrack,
1514 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1515 muted_by_user: bool,
1516 deafened: bool,
1517 speaking: bool,
1518 next_publish_id: usize,
1519 _maintain_room: Task<()>,
1520 _maintain_tracks: [Task<()>; 2],
1521}
1522
1523impl LiveKitRoom {
1524 fn set_mute(
1525 self: &mut LiveKitRoom,
1526 should_mute: bool,
1527 cx: &mut ModelContext<Room>,
1528 ) -> Result<(Task<Result<()>>, bool)> {
1529 if !should_mute {
1530 // clear user muting state.
1531 self.muted_by_user = false;
1532 }
1533
1534 let (result, old_muted) = match &mut self.microphone_track {
1535 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1536 LocalTrack::Pending { muted, .. } => {
1537 let old_muted = *muted;
1538 *muted = should_mute;
1539 cx.notify();
1540 Ok((Task::Ready(Some(Ok(()))), old_muted))
1541 }
1542 LocalTrack::Published {
1543 track_publication,
1544 muted,
1545 } => {
1546 let old_muted = *muted;
1547 *muted = should_mute;
1548 cx.notify();
1549 Ok((
1550 cx.background_executor()
1551 .spawn(track_publication.set_mute(*muted)),
1552 old_muted,
1553 ))
1554 }
1555 }?;
1556
1557 if old_muted != should_mute {
1558 if should_mute {
1559 Audio::play_sound(Sound::Mute, cx);
1560 } else {
1561 Audio::play_sound(Sound::Unmute, cx);
1562 }
1563 }
1564
1565 Ok((result, old_muted))
1566 }
1567}
1568
1569enum LocalTrack {
1570 None,
1571 Pending {
1572 publish_id: usize,
1573 muted: bool,
1574 },
1575 Published {
1576 track_publication: LocalTrackPublication,
1577 muted: bool,
1578 },
1579}
1580
1581impl Default for LocalTrack {
1582 fn default() -> Self {
1583 Self::None
1584 }
1585}
1586
1587#[derive(Copy, Clone, PartialEq, Eq)]
1588pub enum RoomStatus {
1589 Online,
1590 Rejoining,
1591 Offline,
1592}
1593
1594impl RoomStatus {
1595 pub fn is_offline(&self) -> bool {
1596 matches!(self, RoomStatus::Offline)
1597 }
1598
1599 pub fn is_online(&self) -> bool {
1600 matches!(self, RoomStatus::Online)
1601 }
1602}