1use crate::participant::{LocalParticipant, ParticipantLocation, RemoteParticipant};
2use anyhow::{anyhow, Result};
3use audio::{Audio, Sound};
4use client::{
5 proto::{self, PeerId},
6 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
7};
8use collections::{BTreeMap, HashMap, HashSet};
9use fs::Fs;
10use futures::{FutureExt, StreamExt};
11use gpui::{
12 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
13};
14use language::LanguageRegistry;
15use live_kit_client::{
16 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
17 RemoteVideoTrackUpdate,
18};
19use postage::{sink::Sink, stream::Stream, watch};
20use project::Project;
21use std::{future::Future, mem, sync::Arc, time::Duration};
22use util::{post_inc, ResultExt, TryFutureExt};
23
24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25
26#[derive(Clone, Debug, PartialEq, Eq)]
27pub enum Event {
28 ParticipantLocationChanged {
29 participant_id: proto::PeerId,
30 },
31 RemoteVideoTracksChanged {
32 participant_id: proto::PeerId,
33 },
34 RemoteAudioTracksChanged {
35 participant_id: proto::PeerId,
36 },
37 RemoteProjectShared {
38 owner: Arc<User>,
39 project_id: u64,
40 worktree_root_names: Vec<String>,
41 },
42 RemoteProjectUnshared {
43 project_id: u64,
44 },
45 RemoteProjectJoined {
46 project_id: u64,
47 },
48 RemoteProjectInvitationDiscarded {
49 project_id: u64,
50 },
51 Left,
52}
53
54pub struct Room {
55 id: u64,
56 channel_id: Option<u64>,
57 live_kit: Option<LiveKitRoom>,
58 status: RoomStatus,
59 shared_projects: HashSet<WeakModel<Project>>,
60 joined_projects: HashSet<WeakModel<Project>>,
61 local_participant: LocalParticipant,
62 remote_participants: BTreeMap<u64, RemoteParticipant>,
63 pending_participants: Vec<Arc<User>>,
64 participant_user_ids: HashSet<u64>,
65 pending_call_count: usize,
66 leave_when_empty: bool,
67 client: Arc<Client>,
68 user_store: Model<UserStore>,
69 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
70 client_subscriptions: Vec<client::Subscription>,
71 _subscriptions: Vec<gpui::Subscription>,
72 room_update_completed_tx: watch::Sender<Option<()>>,
73 room_update_completed_rx: watch::Receiver<Option<()>>,
74 pending_room_update: Option<Task<()>>,
75 maintain_connection: Option<Task<Option<()>>>,
76}
77
78impl EventEmitter<Event> for Room {}
79
80impl Room {
81 pub fn channel_id(&self) -> Option<u64> {
82 self.channel_id
83 }
84
85 pub fn is_sharing_project(&self) -> bool {
86 !self.shared_projects.is_empty()
87 }
88
89 #[cfg(any(test, feature = "test-support"))]
90 pub fn is_connected(&self) -> bool {
91 if let Some(live_kit) = self.live_kit.as_ref() {
92 matches!(
93 *live_kit.room.status().borrow(),
94 live_kit_client::ConnectionState::Connected { .. }
95 )
96 } else {
97 false
98 }
99 }
100
101 fn new(
102 id: u64,
103 channel_id: Option<u64>,
104 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
105 client: Arc<Client>,
106 user_store: Model<UserStore>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
110 let room = live_kit_client::Room::new();
111 let mut status = room.status();
112 // Consume the initial status of the room.
113 let _ = status.try_recv();
114 let _maintain_room = cx.spawn(|this, mut cx| async move {
115 while let Some(status) = status.next().await {
116 let this = if let Some(this) = this.upgrade() {
117 this
118 } else {
119 break;
120 };
121
122 if status == live_kit_client::ConnectionState::Disconnected {
123 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
124 .ok();
125 break;
126 }
127 }
128 });
129
130 let _maintain_video_tracks = cx.spawn({
131 let room = room.clone();
132 move |this, mut cx| async move {
133 let mut track_video_changes = room.remote_video_track_updates();
134 while let Some(track_change) = track_video_changes.next().await {
135 let this = if let Some(this) = this.upgrade() {
136 this
137 } else {
138 break;
139 };
140
141 this.update(&mut cx, |this, cx| {
142 this.remote_video_track_updated(track_change, cx).log_err()
143 })
144 .ok();
145 }
146 }
147 });
148
149 let _maintain_audio_tracks = cx.spawn({
150 let room = room.clone();
151 |this, mut cx| async move {
152 let mut track_audio_changes = room.remote_audio_track_updates();
153 while let Some(track_change) = track_audio_changes.next().await {
154 let this = if let Some(this) = this.upgrade() {
155 this
156 } else {
157 break;
158 };
159
160 this.update(&mut cx, |this, cx| {
161 this.remote_audio_track_updated(track_change, cx).log_err()
162 })
163 .ok();
164 }
165 }
166 });
167
168 let connect = room.connect(&connection_info.server_url, &connection_info.token);
169 cx.spawn(|this, mut cx| async move {
170 connect.await?;
171
172 if !cx.update(|cx| Self::mute_on_join(cx))? {
173 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
174 .await?;
175 }
176
177 anyhow::Ok(())
178 })
179 .detach_and_log_err(cx);
180
181 Some(LiveKitRoom {
182 room,
183 screen_track: LocalTrack::None,
184 microphone_track: LocalTrack::None,
185 next_publish_id: 0,
186 muted_by_user: false,
187 deafened: false,
188 speaking: false,
189 _maintain_room,
190 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
191 })
192 } else {
193 None
194 };
195
196 let maintain_connection = cx.spawn({
197 let client = client.clone();
198 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
199 });
200
201 Audio::play_sound(Sound::Joined, cx);
202
203 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
204
205 Self {
206 id,
207 channel_id,
208 live_kit: live_kit_room,
209 status: RoomStatus::Online,
210 shared_projects: Default::default(),
211 joined_projects: Default::default(),
212 participant_user_ids: Default::default(),
213 local_participant: Default::default(),
214 remote_participants: Default::default(),
215 pending_participants: Default::default(),
216 pending_call_count: 0,
217 client_subscriptions: vec![
218 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
219 ],
220 _subscriptions: vec![
221 cx.on_release(Self::released),
222 cx.on_app_quit(Self::app_will_quit),
223 ],
224 leave_when_empty: false,
225 pending_room_update: None,
226 client,
227 user_store,
228 follows_by_leader_id_project_id: Default::default(),
229 maintain_connection: Some(maintain_connection),
230 room_update_completed_tx,
231 room_update_completed_rx,
232 }
233 }
234
235 pub(crate) fn create(
236 called_user_id: u64,
237 initial_project: Option<Model<Project>>,
238 client: Arc<Client>,
239 user_store: Model<UserStore>,
240 cx: &mut AppContext,
241 ) -> Task<Result<Model<Self>>> {
242 cx.spawn(move |mut cx| async move {
243 let response = client.request(proto::CreateRoom {}).await?;
244 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
245 let room = cx.build_model(|cx| {
246 Self::new(
247 room_proto.id,
248 None,
249 response.live_kit_connection_info,
250 client,
251 user_store,
252 cx,
253 )
254 })?;
255
256 let initial_project_id = if let Some(initial_project) = initial_project {
257 let initial_project_id = room
258 .update(&mut cx, |room, cx| {
259 room.share_project(initial_project.clone(), cx)
260 })?
261 .await?;
262 Some(initial_project_id)
263 } else {
264 None
265 };
266
267 match room
268 .update(&mut cx, |room, cx| {
269 room.leave_when_empty = true;
270 room.call(called_user_id, initial_project_id, cx)
271 })?
272 .await
273 {
274 Ok(()) => Ok(room),
275 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
276 }
277 })
278 }
279
280 pub(crate) async fn join_channel(
281 channel_id: u64,
282 client: Arc<Client>,
283 user_store: Model<UserStore>,
284 cx: AsyncAppContext,
285 ) -> Result<Model<Self>> {
286 Self::from_join_response(
287 client.request(proto::JoinChannel { channel_id }).await?,
288 client,
289 user_store,
290 cx,
291 )
292 }
293
294 pub(crate) async fn join(
295 room_id: u64,
296 client: Arc<Client>,
297 user_store: Model<UserStore>,
298 cx: AsyncAppContext,
299 ) -> Result<Model<Self>> {
300 Self::from_join_response(
301 client.request(proto::JoinRoom { id: room_id }).await?,
302 client,
303 user_store,
304 cx,
305 )
306 }
307
308 fn released(&mut self, cx: &mut AppContext) {
309 if self.status.is_online() {
310 self.leave_internal(cx).detach_and_log_err(cx);
311 }
312 }
313
314 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
315 let task = if self.status.is_online() {
316 let leave = self.leave_internal(cx);
317 Some(cx.background_executor().spawn(async move {
318 leave.await.log_err();
319 }))
320 } else {
321 None
322 };
323
324 async move {
325 if let Some(task) = task {
326 task.await;
327 }
328 }
329 }
330
331 pub fn mute_on_join(_cx: &AppContext) -> bool {
332 // todo!() po: This should be uncommented, though then unmuting does not work
333 false
334 //CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
335 }
336
337 fn from_join_response(
338 response: proto::JoinRoomResponse,
339 client: Arc<Client>,
340 user_store: Model<UserStore>,
341 mut cx: AsyncAppContext,
342 ) -> Result<Model<Self>> {
343 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
344 let room = cx.build_model(|cx| {
345 Self::new(
346 room_proto.id,
347 response.channel_id,
348 response.live_kit_connection_info,
349 client,
350 user_store,
351 cx,
352 )
353 })?;
354 room.update(&mut cx, |room, cx| {
355 room.leave_when_empty = room.channel_id.is_none();
356 room.apply_room_update(room_proto, cx)?;
357 anyhow::Ok(())
358 })??;
359 Ok(room)
360 }
361
362 fn should_leave(&self) -> bool {
363 self.leave_when_empty
364 && self.pending_room_update.is_none()
365 && self.pending_participants.is_empty()
366 && self.remote_participants.is_empty()
367 && self.pending_call_count == 0
368 }
369
370 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
371 cx.notify();
372 cx.emit(Event::Left);
373 self.leave_internal(cx)
374 }
375
376 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
377 if self.status.is_offline() {
378 return Task::ready(Err(anyhow!("room is offline")));
379 }
380
381 log::info!("leaving room");
382 Audio::play_sound(Sound::Leave, cx);
383
384 self.clear_state(cx);
385
386 let leave_room = self.client.request(proto::LeaveRoom {});
387 cx.background_executor().spawn(async move {
388 leave_room.await?;
389 anyhow::Ok(())
390 })
391 }
392
393 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
394 for project in self.shared_projects.drain() {
395 if let Some(project) = project.upgrade() {
396 project.update(cx, |project, cx| {
397 project.unshare(cx).log_err();
398 });
399 }
400 }
401 for project in self.joined_projects.drain() {
402 if let Some(project) = project.upgrade() {
403 project.update(cx, |project, cx| {
404 project.disconnected_from_host(cx);
405 project.close(cx);
406 });
407 }
408 }
409
410 self.status = RoomStatus::Offline;
411 self.remote_participants.clear();
412 self.pending_participants.clear();
413 self.participant_user_ids.clear();
414 self.client_subscriptions.clear();
415 self.live_kit.take();
416 self.pending_room_update.take();
417 self.maintain_connection.take();
418 }
419
420 async fn maintain_connection(
421 this: WeakModel<Self>,
422 client: Arc<Client>,
423 mut cx: AsyncAppContext,
424 ) -> Result<()> {
425 let mut client_status = client.status();
426 loop {
427 let _ = client_status.try_recv();
428 let is_connected = client_status.borrow().is_connected();
429 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
430 if !is_connected || client_status.next().await.is_some() {
431 log::info!("detected client disconnection");
432
433 this.upgrade()
434 .ok_or_else(|| anyhow!("room was dropped"))?
435 .update(&mut cx, |this, cx| {
436 this.status = RoomStatus::Rejoining;
437 cx.notify();
438 })?;
439
440 // Wait for client to re-establish a connection to the server.
441 {
442 let mut reconnection_timeout =
443 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
444 let client_reconnection = async {
445 let mut remaining_attempts = 3;
446 while remaining_attempts > 0 {
447 if client_status.borrow().is_connected() {
448 log::info!("client reconnected, attempting to rejoin room");
449
450 let Some(this) = this.upgrade() else { break };
451 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
452 Ok(task) => {
453 if task.await.log_err().is_some() {
454 return true;
455 } else {
456 remaining_attempts -= 1;
457 }
458 }
459 Err(_app_dropped) => return false,
460 }
461 } else if client_status.borrow().is_signed_out() {
462 return false;
463 }
464
465 log::info!(
466 "waiting for client status change, remaining attempts {}",
467 remaining_attempts
468 );
469 client_status.next().await;
470 }
471 false
472 }
473 .fuse();
474 futures::pin_mut!(client_reconnection);
475
476 futures::select_biased! {
477 reconnected = client_reconnection => {
478 if reconnected {
479 log::info!("successfully reconnected to room");
480 // If we successfully joined the room, go back around the loop
481 // waiting for future connection status changes.
482 continue;
483 }
484 }
485 _ = reconnection_timeout => {
486 log::info!("room reconnection timeout expired");
487 }
488 }
489 }
490
491 break;
492 }
493 }
494
495 // The client failed to re-establish a connection to the server
496 // or an error occurred while trying to re-join the room. Either way
497 // we leave the room and return an error.
498 if let Some(this) = this.upgrade() {
499 log::info!("reconnection failed, leaving room");
500 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
501 }
502 Err(anyhow!(
503 "can't reconnect to room: client failed to re-establish connection"
504 ))
505 }
506
507 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
508 let mut projects = HashMap::default();
509 let mut reshared_projects = Vec::new();
510 let mut rejoined_projects = Vec::new();
511 self.shared_projects.retain(|project| {
512 if let Some(handle) = project.upgrade() {
513 let project = handle.read(cx);
514 if let Some(project_id) = project.remote_id() {
515 projects.insert(project_id, handle.clone());
516 reshared_projects.push(proto::UpdateProject {
517 project_id,
518 worktrees: project.worktree_metadata_protos(cx),
519 });
520 return true;
521 }
522 }
523 false
524 });
525 self.joined_projects.retain(|project| {
526 if let Some(handle) = project.upgrade() {
527 let project = handle.read(cx);
528 if let Some(project_id) = project.remote_id() {
529 projects.insert(project_id, handle.clone());
530 rejoined_projects.push(proto::RejoinProject {
531 id: project_id,
532 worktrees: project
533 .worktrees()
534 .map(|worktree| {
535 let worktree = worktree.read(cx);
536 proto::RejoinWorktree {
537 id: worktree.id().to_proto(),
538 scan_id: worktree.completed_scan_id() as u64,
539 }
540 })
541 .collect(),
542 });
543 }
544 return true;
545 }
546 false
547 });
548
549 let response = self.client.request_envelope(proto::RejoinRoom {
550 id: self.id,
551 reshared_projects,
552 rejoined_projects,
553 });
554
555 cx.spawn(|this, mut cx| async move {
556 let response = response.await?;
557 let message_id = response.message_id;
558 let response = response.payload;
559 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
560 this.update(&mut cx, |this, cx| {
561 this.status = RoomStatus::Online;
562 this.apply_room_update(room_proto, cx)?;
563
564 for reshared_project in response.reshared_projects {
565 if let Some(project) = projects.get(&reshared_project.id) {
566 project.update(cx, |project, cx| {
567 project.reshared(reshared_project, cx).log_err();
568 });
569 }
570 }
571
572 for rejoined_project in response.rejoined_projects {
573 if let Some(project) = projects.get(&rejoined_project.id) {
574 project.update(cx, |project, cx| {
575 project.rejoined(rejoined_project, message_id, cx).log_err();
576 });
577 }
578 }
579
580 anyhow::Ok(())
581 })?
582 })
583 }
584
585 pub fn id(&self) -> u64 {
586 self.id
587 }
588
589 pub fn status(&self) -> RoomStatus {
590 self.status
591 }
592
593 pub fn local_participant(&self) -> &LocalParticipant {
594 &self.local_participant
595 }
596
597 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
598 &self.remote_participants
599 }
600
601 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
602 self.remote_participants
603 .values()
604 .find(|p| p.peer_id == peer_id)
605 }
606
607 pub fn pending_participants(&self) -> &[Arc<User>] {
608 &self.pending_participants
609 }
610
611 pub fn contains_participant(&self, user_id: u64) -> bool {
612 self.participant_user_ids.contains(&user_id)
613 }
614
615 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
616 self.follows_by_leader_id_project_id
617 .get(&(leader_id, project_id))
618 .map_or(&[], |v| v.as_slice())
619 }
620
621 /// Returns the most 'active' projects, defined as most people in the project
622 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
623 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
624 for participant in self.remote_participants.values() {
625 match participant.location {
626 ParticipantLocation::SharedProject { project_id } => {
627 project_hosts_and_guest_counts
628 .entry(project_id)
629 .or_default()
630 .1 += 1;
631 }
632 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
633 }
634 for project in &participant.projects {
635 project_hosts_and_guest_counts
636 .entry(project.id)
637 .or_default()
638 .0 = Some(participant.user.id);
639 }
640 }
641
642 if let Some(user) = self.user_store.read(cx).current_user() {
643 for project in &self.local_participant.projects {
644 project_hosts_and_guest_counts
645 .entry(project.id)
646 .or_default()
647 .0 = Some(user.id);
648 }
649 }
650
651 project_hosts_and_guest_counts
652 .into_iter()
653 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
654 .max_by_key(|(_, _, guest_count)| *guest_count)
655 .map(|(id, host, _)| (id, host))
656 }
657
658 async fn handle_room_updated(
659 this: Model<Self>,
660 envelope: TypedEnvelope<proto::RoomUpdated>,
661 _: Arc<Client>,
662 mut cx: AsyncAppContext,
663 ) -> Result<()> {
664 let room = envelope
665 .payload
666 .room
667 .ok_or_else(|| anyhow!("invalid room"))?;
668 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
669 }
670
671 fn apply_room_update(
672 &mut self,
673 mut room: proto::Room,
674 cx: &mut ModelContext<Self>,
675 ) -> Result<()> {
676 // Filter ourselves out from the room's participants.
677 let local_participant_ix = room
678 .participants
679 .iter()
680 .position(|participant| Some(participant.user_id) == self.client.user_id());
681 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
682
683 let pending_participant_user_ids = room
684 .pending_participants
685 .iter()
686 .map(|p| p.user_id)
687 .collect::<Vec<_>>();
688
689 let remote_participant_user_ids = room
690 .participants
691 .iter()
692 .map(|p| p.user_id)
693 .collect::<Vec<_>>();
694
695 let (remote_participants, pending_participants) =
696 self.user_store.update(cx, move |user_store, cx| {
697 (
698 user_store.get_users(remote_participant_user_ids, cx),
699 user_store.get_users(pending_participant_user_ids, cx),
700 )
701 });
702
703 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
704 let (remote_participants, pending_participants) =
705 futures::join!(remote_participants, pending_participants);
706
707 this.update(&mut cx, |this, cx| {
708 this.participant_user_ids.clear();
709
710 if let Some(participant) = local_participant {
711 this.local_participant.projects = participant.projects;
712 } else {
713 this.local_participant.projects.clear();
714 }
715
716 if let Some(participants) = remote_participants.log_err() {
717 for (participant, user) in room.participants.into_iter().zip(participants) {
718 let Some(peer_id) = participant.peer_id else {
719 continue;
720 };
721 let participant_index = ParticipantIndex(participant.participant_index);
722 this.participant_user_ids.insert(participant.user_id);
723
724 let old_projects = this
725 .remote_participants
726 .get(&participant.user_id)
727 .into_iter()
728 .flat_map(|existing| &existing.projects)
729 .map(|project| project.id)
730 .collect::<HashSet<_>>();
731 let new_projects = participant
732 .projects
733 .iter()
734 .map(|project| project.id)
735 .collect::<HashSet<_>>();
736
737 for project in &participant.projects {
738 if !old_projects.contains(&project.id) {
739 cx.emit(Event::RemoteProjectShared {
740 owner: user.clone(),
741 project_id: project.id,
742 worktree_root_names: project.worktree_root_names.clone(),
743 });
744 }
745 }
746
747 for unshared_project_id in old_projects.difference(&new_projects) {
748 this.joined_projects.retain(|project| {
749 if let Some(project) = project.upgrade() {
750 project.update(cx, |project, cx| {
751 if project.remote_id() == Some(*unshared_project_id) {
752 project.disconnected_from_host(cx);
753 false
754 } else {
755 true
756 }
757 })
758 } else {
759 false
760 }
761 });
762 cx.emit(Event::RemoteProjectUnshared {
763 project_id: *unshared_project_id,
764 });
765 }
766
767 let location = ParticipantLocation::from_proto(participant.location)
768 .unwrap_or(ParticipantLocation::External);
769 if let Some(remote_participant) =
770 this.remote_participants.get_mut(&participant.user_id)
771 {
772 remote_participant.peer_id = peer_id;
773 remote_participant.projects = participant.projects;
774 remote_participant.participant_index = participant_index;
775 if location != remote_participant.location {
776 remote_participant.location = location;
777 cx.emit(Event::ParticipantLocationChanged {
778 participant_id: peer_id,
779 });
780 }
781 } else {
782 this.remote_participants.insert(
783 participant.user_id,
784 RemoteParticipant {
785 user: user.clone(),
786 participant_index,
787 peer_id,
788 projects: participant.projects,
789 location,
790 muted: true,
791 speaking: false,
792 video_tracks: Default::default(),
793 audio_tracks: Default::default(),
794 },
795 );
796
797 Audio::play_sound(Sound::Joined, cx);
798
799 if let Some(live_kit) = this.live_kit.as_ref() {
800 let video_tracks =
801 live_kit.room.remote_video_tracks(&user.id.to_string());
802 let audio_tracks =
803 live_kit.room.remote_audio_tracks(&user.id.to_string());
804 let publications = live_kit
805 .room
806 .remote_audio_track_publications(&user.id.to_string());
807
808 for track in video_tracks {
809 this.remote_video_track_updated(
810 RemoteVideoTrackUpdate::Subscribed(track),
811 cx,
812 )
813 .log_err();
814 }
815
816 for (track, publication) in
817 audio_tracks.iter().zip(publications.iter())
818 {
819 this.remote_audio_track_updated(
820 RemoteAudioTrackUpdate::Subscribed(
821 track.clone(),
822 publication.clone(),
823 ),
824 cx,
825 )
826 .log_err();
827 }
828 }
829 }
830 }
831
832 this.remote_participants.retain(|user_id, participant| {
833 if this.participant_user_ids.contains(user_id) {
834 true
835 } else {
836 for project in &participant.projects {
837 cx.emit(Event::RemoteProjectUnshared {
838 project_id: project.id,
839 });
840 }
841 false
842 }
843 });
844 }
845
846 if let Some(pending_participants) = pending_participants.log_err() {
847 this.pending_participants = pending_participants;
848 for participant in &this.pending_participants {
849 this.participant_user_ids.insert(participant.id);
850 }
851 }
852
853 this.follows_by_leader_id_project_id.clear();
854 for follower in room.followers {
855 let project_id = follower.project_id;
856 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
857 (Some(leader), Some(follower)) => (leader, follower),
858
859 _ => {
860 log::error!("Follower message {follower:?} missing some state");
861 continue;
862 }
863 };
864
865 let list = this
866 .follows_by_leader_id_project_id
867 .entry((leader, project_id))
868 .or_insert(Vec::new());
869 if !list.contains(&follower) {
870 list.push(follower);
871 }
872 }
873
874 this.pending_room_update.take();
875 if this.should_leave() {
876 log::info!("room is empty, leaving");
877 let _ = this.leave(cx);
878 }
879
880 this.user_store.update(cx, |user_store, cx| {
881 let participant_indices_by_user_id = this
882 .remote_participants
883 .iter()
884 .map(|(user_id, participant)| (*user_id, participant.participant_index))
885 .collect();
886 user_store.set_participant_indices(participant_indices_by_user_id, cx);
887 });
888
889 this.check_invariants();
890 this.room_update_completed_tx.try_send(Some(())).ok();
891 cx.notify();
892 })
893 .ok();
894 }));
895
896 cx.notify();
897 Ok(())
898 }
899
900 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
901 let mut done_rx = self.room_update_completed_rx.clone();
902 async move {
903 while let Some(result) = done_rx.next().await {
904 if result.is_some() {
905 break;
906 }
907 }
908 }
909 }
910
911 fn remote_video_track_updated(
912 &mut self,
913 change: RemoteVideoTrackUpdate,
914 cx: &mut ModelContext<Self>,
915 ) -> Result<()> {
916 match change {
917 RemoteVideoTrackUpdate::Subscribed(track) => {
918 let user_id = track.publisher_id().parse()?;
919 let track_id = track.sid().to_string();
920 let participant = self
921 .remote_participants
922 .get_mut(&user_id)
923 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
924 participant.video_tracks.insert(track_id.clone(), track);
925 cx.emit(Event::RemoteVideoTracksChanged {
926 participant_id: participant.peer_id,
927 });
928 }
929 RemoteVideoTrackUpdate::Unsubscribed {
930 publisher_id,
931 track_id,
932 } => {
933 let user_id = publisher_id.parse()?;
934 let participant = self
935 .remote_participants
936 .get_mut(&user_id)
937 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
938 participant.video_tracks.remove(&track_id);
939 cx.emit(Event::RemoteVideoTracksChanged {
940 participant_id: participant.peer_id,
941 });
942 }
943 }
944
945 cx.notify();
946 Ok(())
947 }
948
949 fn remote_audio_track_updated(
950 &mut self,
951 change: RemoteAudioTrackUpdate,
952 cx: &mut ModelContext<Self>,
953 ) -> Result<()> {
954 match change {
955 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
956 let mut speaker_ids = speakers
957 .into_iter()
958 .filter_map(|speaker_sid| speaker_sid.parse().ok())
959 .collect::<Vec<u64>>();
960 speaker_ids.sort_unstable();
961 for (sid, participant) in &mut self.remote_participants {
962 if let Ok(_) = speaker_ids.binary_search(sid) {
963 participant.speaking = true;
964 } else {
965 participant.speaking = false;
966 }
967 }
968 if let Some(id) = self.client.user_id() {
969 if let Some(room) = &mut self.live_kit {
970 if let Ok(_) = speaker_ids.binary_search(&id) {
971 room.speaking = true;
972 } else {
973 room.speaking = false;
974 }
975 }
976 }
977 cx.notify();
978 }
979 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
980 let mut found = false;
981 for participant in &mut self.remote_participants.values_mut() {
982 for track in participant.audio_tracks.values() {
983 if track.sid() == track_id {
984 found = true;
985 break;
986 }
987 }
988 if found {
989 participant.muted = muted;
990 break;
991 }
992 }
993
994 cx.notify();
995 }
996 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
997 let user_id = track.publisher_id().parse()?;
998 let track_id = track.sid().to_string();
999 let participant = self
1000 .remote_participants
1001 .get_mut(&user_id)
1002 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1003 participant.audio_tracks.insert(track_id.clone(), track);
1004 participant.muted = publication.is_muted();
1005
1006 cx.emit(Event::RemoteAudioTracksChanged {
1007 participant_id: participant.peer_id,
1008 });
1009 }
1010 RemoteAudioTrackUpdate::Unsubscribed {
1011 publisher_id,
1012 track_id,
1013 } => {
1014 let user_id = publisher_id.parse()?;
1015 let participant = self
1016 .remote_participants
1017 .get_mut(&user_id)
1018 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1019 participant.audio_tracks.remove(&track_id);
1020 cx.emit(Event::RemoteAudioTracksChanged {
1021 participant_id: participant.peer_id,
1022 });
1023 }
1024 }
1025
1026 cx.notify();
1027 Ok(())
1028 }
1029
1030 fn check_invariants(&self) {
1031 #[cfg(any(test, feature = "test-support"))]
1032 {
1033 for participant in self.remote_participants.values() {
1034 assert!(self.participant_user_ids.contains(&participant.user.id));
1035 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1036 }
1037
1038 for participant in &self.pending_participants {
1039 assert!(self.participant_user_ids.contains(&participant.id));
1040 assert_ne!(participant.id, self.client.user_id().unwrap());
1041 }
1042
1043 assert_eq!(
1044 self.participant_user_ids.len(),
1045 self.remote_participants.len() + self.pending_participants.len()
1046 );
1047 }
1048 }
1049
1050 pub(crate) fn call(
1051 &mut self,
1052 called_user_id: u64,
1053 initial_project_id: Option<u64>,
1054 cx: &mut ModelContext<Self>,
1055 ) -> Task<Result<()>> {
1056 if self.status.is_offline() {
1057 return Task::ready(Err(anyhow!("room is offline")));
1058 }
1059
1060 cx.notify();
1061 let client = self.client.clone();
1062 let room_id = self.id;
1063 self.pending_call_count += 1;
1064 cx.spawn(move |this, mut cx| async move {
1065 let result = client
1066 .request(proto::Call {
1067 room_id,
1068 called_user_id,
1069 initial_project_id,
1070 })
1071 .await;
1072 this.update(&mut cx, |this, cx| {
1073 this.pending_call_count -= 1;
1074 if this.should_leave() {
1075 this.leave(cx).detach_and_log_err(cx);
1076 }
1077 })?;
1078 result?;
1079 Ok(())
1080 })
1081 }
1082
1083 pub fn join_project(
1084 &mut self,
1085 id: u64,
1086 language_registry: Arc<LanguageRegistry>,
1087 fs: Arc<dyn Fs>,
1088 cx: &mut ModelContext<Self>,
1089 ) -> Task<Result<Model<Project>>> {
1090 let client = self.client.clone();
1091 let user_store = self.user_store.clone();
1092 cx.emit(Event::RemoteProjectJoined { project_id: id });
1093 cx.spawn(move |this, mut cx| async move {
1094 let project =
1095 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1096
1097 this.update(&mut cx, |this, cx| {
1098 this.joined_projects.retain(|project| {
1099 if let Some(project) = project.upgrade() {
1100 !project.read(cx).is_read_only()
1101 } else {
1102 false
1103 }
1104 });
1105 this.joined_projects.insert(project.downgrade());
1106 })?;
1107 Ok(project)
1108 })
1109 }
1110
1111 pub(crate) fn share_project(
1112 &mut self,
1113 project: Model<Project>,
1114 cx: &mut ModelContext<Self>,
1115 ) -> Task<Result<u64>> {
1116 if let Some(project_id) = project.read(cx).remote_id() {
1117 return Task::ready(Ok(project_id));
1118 }
1119
1120 let request = self.client.request(proto::ShareProject {
1121 room_id: self.id(),
1122 worktrees: project.read(cx).worktree_metadata_protos(cx),
1123 });
1124 cx.spawn(|this, mut cx| async move {
1125 let response = request.await?;
1126
1127 project.update(&mut cx, |project, cx| {
1128 project.shared(response.project_id, cx)
1129 })??;
1130
1131 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1132 this.update(&mut cx, |this, cx| {
1133 this.shared_projects.insert(project.downgrade());
1134 let active_project = this.local_participant.active_project.as_ref();
1135 if active_project.map_or(false, |location| *location == project) {
1136 this.set_location(Some(&project), cx)
1137 } else {
1138 Task::ready(Ok(()))
1139 }
1140 })?
1141 .await?;
1142
1143 Ok(response.project_id)
1144 })
1145 }
1146
1147 pub(crate) fn unshare_project(
1148 &mut self,
1149 project: Model<Project>,
1150 cx: &mut ModelContext<Self>,
1151 ) -> Result<()> {
1152 let project_id = match project.read(cx).remote_id() {
1153 Some(project_id) => project_id,
1154 None => return Ok(()),
1155 };
1156
1157 self.client.send(proto::UnshareProject { project_id })?;
1158 project.update(cx, |this, cx| this.unshare(cx))
1159 }
1160
1161 pub(crate) fn set_location(
1162 &mut self,
1163 project: Option<&Model<Project>>,
1164 cx: &mut ModelContext<Self>,
1165 ) -> Task<Result<()>> {
1166 if self.status.is_offline() {
1167 return Task::ready(Err(anyhow!("room is offline")));
1168 }
1169
1170 let client = self.client.clone();
1171 let room_id = self.id;
1172 let location = if let Some(project) = project {
1173 self.local_participant.active_project = Some(project.downgrade());
1174 if let Some(project_id) = project.read(cx).remote_id() {
1175 proto::participant_location::Variant::SharedProject(
1176 proto::participant_location::SharedProject { id: project_id },
1177 )
1178 } else {
1179 proto::participant_location::Variant::UnsharedProject(
1180 proto::participant_location::UnsharedProject {},
1181 )
1182 }
1183 } else {
1184 self.local_participant.active_project = None;
1185 proto::participant_location::Variant::External(proto::participant_location::External {})
1186 };
1187
1188 cx.notify();
1189 cx.background_executor().spawn(async move {
1190 client
1191 .request(proto::UpdateParticipantLocation {
1192 room_id,
1193 location: Some(proto::ParticipantLocation {
1194 variant: Some(location),
1195 }),
1196 })
1197 .await?;
1198 Ok(())
1199 })
1200 }
1201
1202 pub fn is_screen_sharing(&self) -> bool {
1203 self.live_kit.as_ref().map_or(false, |live_kit| {
1204 !matches!(live_kit.screen_track, LocalTrack::None)
1205 })
1206 }
1207
1208 pub fn is_sharing_mic(&self) -> bool {
1209 self.live_kit.as_ref().map_or(false, |live_kit| {
1210 !matches!(live_kit.microphone_track, LocalTrack::None)
1211 })
1212 }
1213
1214 pub fn is_muted(&self, cx: &AppContext) -> bool {
1215 self.live_kit
1216 .as_ref()
1217 .and_then(|live_kit| match &live_kit.microphone_track {
1218 LocalTrack::None => Some(Self::mute_on_join(cx)),
1219 LocalTrack::Pending { muted, .. } => Some(*muted),
1220 LocalTrack::Published { muted, .. } => Some(*muted),
1221 })
1222 .unwrap_or(false)
1223 }
1224
1225 pub fn is_speaking(&self) -> bool {
1226 self.live_kit
1227 .as_ref()
1228 .map_or(false, |live_kit| live_kit.speaking)
1229 }
1230
1231 pub fn is_deafened(&self) -> Option<bool> {
1232 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1233 }
1234
1235 #[track_caller]
1236 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1237 if self.status.is_offline() {
1238 return Task::ready(Err(anyhow!("room is offline")));
1239 } else if self.is_sharing_mic() {
1240 return Task::ready(Err(anyhow!("microphone was already shared")));
1241 }
1242
1243 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1244 let publish_id = post_inc(&mut live_kit.next_publish_id);
1245 live_kit.microphone_track = LocalTrack::Pending {
1246 publish_id,
1247 muted: false,
1248 };
1249 cx.notify();
1250 publish_id
1251 } else {
1252 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1253 };
1254
1255 cx.spawn(move |this, mut cx| async move {
1256 let publish_track = async {
1257 let track = LocalAudioTrack::create();
1258 this.upgrade()
1259 .ok_or_else(|| anyhow!("room was dropped"))?
1260 .update(&mut cx, |this, _| {
1261 this.live_kit
1262 .as_ref()
1263 .map(|live_kit| live_kit.room.publish_audio_track(track))
1264 })?
1265 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1266 .await
1267 };
1268
1269 let publication = publish_track.await;
1270 this.upgrade()
1271 .ok_or_else(|| anyhow!("room was dropped"))?
1272 .update(&mut cx, |this, cx| {
1273 let live_kit = this
1274 .live_kit
1275 .as_mut()
1276 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1277
1278 let (canceled, muted) = if let LocalTrack::Pending {
1279 publish_id: cur_publish_id,
1280 muted,
1281 } = &live_kit.microphone_track
1282 {
1283 (*cur_publish_id != publish_id, *muted)
1284 } else {
1285 (true, false)
1286 };
1287
1288 match publication {
1289 Ok(publication) => {
1290 if canceled {
1291 live_kit.room.unpublish_track(publication);
1292 } else {
1293 if muted {
1294 cx.background_executor()
1295 .spawn(publication.set_mute(muted))
1296 .detach();
1297 }
1298 live_kit.microphone_track = LocalTrack::Published {
1299 track_publication: publication,
1300 muted,
1301 };
1302 cx.notify();
1303 }
1304 Ok(())
1305 }
1306 Err(error) => {
1307 if canceled {
1308 Ok(())
1309 } else {
1310 live_kit.microphone_track = LocalTrack::None;
1311 cx.notify();
1312 Err(error)
1313 }
1314 }
1315 }
1316 })?
1317 })
1318 }
1319
1320 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1321 if self.status.is_offline() {
1322 return Task::ready(Err(anyhow!("room is offline")));
1323 } else if self.is_screen_sharing() {
1324 return Task::ready(Err(anyhow!("screen was already shared")));
1325 }
1326
1327 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1328 let publish_id = post_inc(&mut live_kit.next_publish_id);
1329 live_kit.screen_track = LocalTrack::Pending {
1330 publish_id,
1331 muted: false,
1332 };
1333 cx.notify();
1334 (live_kit.room.display_sources(), publish_id)
1335 } else {
1336 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1337 };
1338
1339 cx.spawn(move |this, mut cx| async move {
1340 let publish_track = async {
1341 let displays = displays.await?;
1342 let display = displays
1343 .first()
1344 .ok_or_else(|| anyhow!("no display found"))?;
1345 let track = LocalVideoTrack::screen_share_for_display(&display);
1346 this.upgrade()
1347 .ok_or_else(|| anyhow!("room was dropped"))?
1348 .update(&mut cx, |this, _| {
1349 this.live_kit
1350 .as_ref()
1351 .map(|live_kit| live_kit.room.publish_video_track(track))
1352 })?
1353 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1354 .await
1355 };
1356
1357 let publication = publish_track.await;
1358 this.upgrade()
1359 .ok_or_else(|| anyhow!("room was dropped"))?
1360 .update(&mut cx, |this, cx| {
1361 let live_kit = this
1362 .live_kit
1363 .as_mut()
1364 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1365
1366 let (canceled, muted) = if let LocalTrack::Pending {
1367 publish_id: cur_publish_id,
1368 muted,
1369 } = &live_kit.screen_track
1370 {
1371 (*cur_publish_id != publish_id, *muted)
1372 } else {
1373 (true, false)
1374 };
1375
1376 match publication {
1377 Ok(publication) => {
1378 if canceled {
1379 live_kit.room.unpublish_track(publication);
1380 } else {
1381 if muted {
1382 cx.background_executor()
1383 .spawn(publication.set_mute(muted))
1384 .detach();
1385 }
1386 live_kit.screen_track = LocalTrack::Published {
1387 track_publication: publication,
1388 muted,
1389 };
1390 cx.notify();
1391 }
1392
1393 Audio::play_sound(Sound::StartScreenshare, cx);
1394
1395 Ok(())
1396 }
1397 Err(error) => {
1398 if canceled {
1399 Ok(())
1400 } else {
1401 live_kit.screen_track = LocalTrack::None;
1402 cx.notify();
1403 Err(error)
1404 }
1405 }
1406 }
1407 })?
1408 })
1409 }
1410
1411 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1412 let should_mute = !self.is_muted(cx);
1413 if let Some(live_kit) = self.live_kit.as_mut() {
1414 if matches!(live_kit.microphone_track, LocalTrack::None) {
1415 return Ok(self.share_microphone(cx));
1416 }
1417
1418 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1419 live_kit.muted_by_user = should_mute;
1420
1421 if old_muted == true && live_kit.deafened == true {
1422 if let Some(task) = self.toggle_deafen(cx).ok() {
1423 task.detach();
1424 }
1425 }
1426
1427 Ok(ret_task)
1428 } else {
1429 Err(anyhow!("LiveKit not started"))
1430 }
1431 }
1432
1433 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1434 if let Some(live_kit) = self.live_kit.as_mut() {
1435 (*live_kit).deafened = !live_kit.deafened;
1436
1437 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1438 // Context notification is sent within set_mute itself.
1439 let mut mute_task = None;
1440 // When deafening, mute user's mic as well.
1441 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1442 if live_kit.deafened || !live_kit.muted_by_user {
1443 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1444 };
1445 for participant in self.remote_participants.values() {
1446 for track in live_kit
1447 .room
1448 .remote_audio_track_publications(&participant.user.id.to_string())
1449 {
1450 let deafened = live_kit.deafened;
1451 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1452 }
1453 }
1454
1455 Ok(cx.foreground_executor().spawn(async move {
1456 if let Some(mute_task) = mute_task {
1457 mute_task.await?;
1458 }
1459 for task in tasks {
1460 task.await?;
1461 }
1462 Ok(())
1463 }))
1464 } else {
1465 Err(anyhow!("LiveKit not started"))
1466 }
1467 }
1468
1469 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1470 if self.status.is_offline() {
1471 return Err(anyhow!("room is offline"));
1472 }
1473
1474 let live_kit = self
1475 .live_kit
1476 .as_mut()
1477 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1478 match mem::take(&mut live_kit.screen_track) {
1479 LocalTrack::None => Err(anyhow!("screen was not shared")),
1480 LocalTrack::Pending { .. } => {
1481 cx.notify();
1482 Ok(())
1483 }
1484 LocalTrack::Published {
1485 track_publication, ..
1486 } => {
1487 live_kit.room.unpublish_track(track_publication);
1488 cx.notify();
1489
1490 Audio::play_sound(Sound::StopScreenshare, cx);
1491 Ok(())
1492 }
1493 }
1494 }
1495
1496 #[cfg(any(test, feature = "test-support"))]
1497 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1498 self.live_kit
1499 .as_ref()
1500 .unwrap()
1501 .room
1502 .set_display_sources(sources);
1503 }
1504}
1505
1506struct LiveKitRoom {
1507 room: Arc<live_kit_client::Room>,
1508 screen_track: LocalTrack,
1509 microphone_track: LocalTrack,
1510 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1511 muted_by_user: bool,
1512 deafened: bool,
1513 speaking: bool,
1514 next_publish_id: usize,
1515 _maintain_room: Task<()>,
1516 _maintain_tracks: [Task<()>; 2],
1517}
1518
1519impl LiveKitRoom {
1520 fn set_mute(
1521 self: &mut LiveKitRoom,
1522 should_mute: bool,
1523 cx: &mut ModelContext<Room>,
1524 ) -> Result<(Task<Result<()>>, bool)> {
1525 if !should_mute {
1526 // clear user muting state.
1527 self.muted_by_user = false;
1528 }
1529
1530 let (result, old_muted) = match &mut self.microphone_track {
1531 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1532 LocalTrack::Pending { muted, .. } => {
1533 let old_muted = *muted;
1534 *muted = should_mute;
1535 cx.notify();
1536 Ok((Task::Ready(Some(Ok(()))), old_muted))
1537 }
1538 LocalTrack::Published {
1539 track_publication,
1540 muted,
1541 } => {
1542 let old_muted = *muted;
1543 *muted = should_mute;
1544 cx.notify();
1545 Ok((
1546 cx.background_executor()
1547 .spawn(track_publication.set_mute(*muted)),
1548 old_muted,
1549 ))
1550 }
1551 }?;
1552
1553 if old_muted != should_mute {
1554 if should_mute {
1555 Audio::play_sound(Sound::Mute, cx);
1556 } else {
1557 Audio::play_sound(Sound::Unmute, cx);
1558 }
1559 }
1560
1561 Ok((result, old_muted))
1562 }
1563}
1564
1565enum LocalTrack {
1566 None,
1567 Pending {
1568 publish_id: usize,
1569 muted: bool,
1570 },
1571 Published {
1572 track_publication: LocalTrackPublication,
1573 muted: bool,
1574 },
1575}
1576
1577impl Default for LocalTrack {
1578 fn default() -> Self {
1579 Self::None
1580 }
1581}
1582
1583#[derive(Copy, Clone, PartialEq, Eq)]
1584pub enum RoomStatus {
1585 Online,
1586 Rejoining,
1587 Offline,
1588}
1589
1590impl RoomStatus {
1591 pub fn is_offline(&self) -> bool {
1592 matches!(self, RoomStatus::Offline)
1593 }
1594
1595 pub fn is_online(&self) -> bool {
1596 matches!(self, RoomStatus::Online)
1597 }
1598}