1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::{sink::Sink, stream::Stream, watch};
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 Left,
54}
55
56pub struct Room {
57 id: u64,
58 channel_id: Option<u64>,
59 live_kit: Option<LiveKitRoom>,
60 status: RoomStatus,
61 shared_projects: HashSet<WeakModelHandle<Project>>,
62 joined_projects: HashSet<WeakModelHandle<Project>>,
63 local_participant: LocalParticipant,
64 remote_participants: BTreeMap<u64, RemoteParticipant>,
65 pending_participants: Vec<Arc<User>>,
66 participant_user_ids: HashSet<u64>,
67 pending_call_count: usize,
68 leave_when_empty: bool,
69 client: Arc<Client>,
70 user_store: ModelHandle<UserStore>,
71 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
72 subscriptions: Vec<client::Subscription>,
73 room_update_completed_tx: watch::Sender<Option<()>>,
74 room_update_completed_rx: watch::Receiver<Option<()>>,
75 pending_room_update: Option<Task<()>>,
76 maintain_connection: Option<Task<Option<()>>>,
77}
78
79impl Entity for Room {
80 type Event = Event;
81
82 fn release(&mut self, cx: &mut AppContext) {
83 if self.status.is_online() {
84 self.leave_internal(cx).detach_and_log_err(cx);
85 }
86 }
87
88 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
89 if self.status.is_online() {
90 let leave = self.leave_internal(cx);
91 Some(
92 cx.background()
93 .spawn(async move {
94 leave.await.log_err();
95 })
96 .boxed(),
97 )
98 } else {
99 None
100 }
101 }
102}
103
104impl Room {
105 pub fn channel_id(&self) -> Option<u64> {
106 self.channel_id
107 }
108
109 pub fn is_sharing_project(&self) -> bool {
110 !self.shared_projects.is_empty()
111 }
112
113 #[cfg(any(test, feature = "test-support"))]
114 pub fn is_connected(&self) -> bool {
115 if let Some(live_kit) = self.live_kit.as_ref() {
116 matches!(
117 *live_kit.room.status().borrow(),
118 live_kit_client::ConnectionState::Connected { .. }
119 )
120 } else {
121 false
122 }
123 }
124
125 fn new(
126 id: u64,
127 channel_id: Option<u64>,
128 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
129 client: Arc<Client>,
130 user_store: ModelHandle<UserStore>,
131 cx: &mut ModelContext<Self>,
132 ) -> Self {
133 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
134 let room = live_kit_client::Room::new();
135 let mut status = room.status();
136 // Consume the initial status of the room.
137 let _ = status.try_recv();
138 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
139 while let Some(status) = status.next().await {
140 let this = if let Some(this) = this.upgrade(&cx) {
141 this
142 } else {
143 break;
144 };
145
146 if status == live_kit_client::ConnectionState::Disconnected {
147 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
148 break;
149 }
150 }
151 });
152
153 let mut track_video_changes = room.remote_video_track_updates();
154 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
155 while let Some(track_change) = track_video_changes.next().await {
156 let this = if let Some(this) = this.upgrade(&cx) {
157 this
158 } else {
159 break;
160 };
161
162 this.update(&mut cx, |this, cx| {
163 this.remote_video_track_updated(track_change, cx).log_err()
164 });
165 }
166 });
167
168 let mut track_audio_changes = room.remote_audio_track_updates();
169 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
170 while let Some(track_change) = track_audio_changes.next().await {
171 let this = if let Some(this) = this.upgrade(&cx) {
172 this
173 } else {
174 break;
175 };
176
177 this.update(&mut cx, |this, cx| {
178 this.remote_audio_track_updated(track_change, cx).log_err()
179 });
180 }
181 });
182
183 let connect = room.connect(&connection_info.server_url, &connection_info.token);
184 cx.spawn(|this, mut cx| async move {
185 connect.await?;
186
187 if !cx.read(Self::mute_on_join) {
188 this.update(&mut cx, |this, cx| this.share_microphone(cx))
189 .await?;
190 }
191
192 anyhow::Ok(())
193 })
194 .detach_and_log_err(cx);
195
196 Some(LiveKitRoom {
197 room,
198 screen_track: LocalTrack::None,
199 microphone_track: LocalTrack::None,
200 next_publish_id: 0,
201 muted_by_user: false,
202 deafened: false,
203 speaking: false,
204 _maintain_room,
205 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
206 })
207 } else {
208 None
209 };
210
211 let maintain_connection =
212 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
213
214 Audio::play_sound(Sound::Joined, cx);
215
216 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
217
218 Self {
219 id,
220 channel_id,
221 live_kit: live_kit_room,
222 status: RoomStatus::Online,
223 shared_projects: Default::default(),
224 joined_projects: Default::default(),
225 participant_user_ids: Default::default(),
226 local_participant: Default::default(),
227 remote_participants: Default::default(),
228 pending_participants: Default::default(),
229 pending_call_count: 0,
230 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
231 leave_when_empty: false,
232 pending_room_update: None,
233 client,
234 user_store,
235 follows_by_leader_id_project_id: Default::default(),
236 maintain_connection: Some(maintain_connection),
237 room_update_completed_tx,
238 room_update_completed_rx,
239 }
240 }
241
242 pub(crate) fn create(
243 called_user_id: u64,
244 initial_project: Option<ModelHandle<Project>>,
245 client: Arc<Client>,
246 user_store: ModelHandle<UserStore>,
247 cx: &mut AppContext,
248 ) -> Task<Result<ModelHandle<Self>>> {
249 cx.spawn(|mut cx| async move {
250 let response = client.request(proto::CreateRoom {}).await?;
251 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
252 let room = cx.add_model(|cx| {
253 Self::new(
254 room_proto.id,
255 None,
256 response.live_kit_connection_info,
257 client,
258 user_store,
259 cx,
260 )
261 });
262
263 let initial_project_id = if let Some(initial_project) = initial_project {
264 let initial_project_id = room
265 .update(&mut cx, |room, cx| {
266 room.share_project(initial_project.clone(), cx)
267 })
268 .await?;
269 Some(initial_project_id)
270 } else {
271 None
272 };
273
274 match room
275 .update(&mut cx, |room, cx| {
276 room.leave_when_empty = true;
277 room.call(called_user_id, initial_project_id, cx)
278 })
279 .await
280 {
281 Ok(()) => Ok(room),
282 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
283 }
284 })
285 }
286
287 pub(crate) fn join_channel(
288 channel_id: u64,
289 client: Arc<Client>,
290 user_store: ModelHandle<UserStore>,
291 cx: &mut AppContext,
292 ) -> Task<Result<ModelHandle<Self>>> {
293 cx.spawn(|cx| async move {
294 Self::from_join_response(
295 client.request(proto::JoinChannel { channel_id }).await?,
296 client,
297 user_store,
298 cx,
299 )
300 })
301 }
302
303 pub(crate) fn join(
304 call: &IncomingCall,
305 client: Arc<Client>,
306 user_store: ModelHandle<UserStore>,
307 cx: &mut AppContext,
308 ) -> Task<Result<ModelHandle<Self>>> {
309 let id = call.room_id;
310 cx.spawn(|cx| async move {
311 Self::from_join_response(
312 client.request(proto::JoinRoom { id }).await?,
313 client,
314 user_store,
315 cx,
316 )
317 })
318 }
319
320 pub fn mute_on_join(cx: &AppContext) -> bool {
321 settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
322 }
323
324 fn from_join_response(
325 response: proto::JoinRoomResponse,
326 client: Arc<Client>,
327 user_store: ModelHandle<UserStore>,
328 mut cx: AsyncAppContext,
329 ) -> Result<ModelHandle<Self>> {
330 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
331 let room = cx.add_model(|cx| {
332 Self::new(
333 room_proto.id,
334 response.channel_id,
335 response.live_kit_connection_info,
336 client,
337 user_store,
338 cx,
339 )
340 });
341 room.update(&mut cx, |room, cx| {
342 room.leave_when_empty = room.channel_id.is_none();
343 room.apply_room_update(room_proto, cx)?;
344 anyhow::Ok(())
345 })?;
346 Ok(room)
347 }
348
349 fn should_leave(&self) -> bool {
350 self.leave_when_empty
351 && self.pending_room_update.is_none()
352 && self.pending_participants.is_empty()
353 && self.remote_participants.is_empty()
354 && self.pending_call_count == 0
355 }
356
357 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
358 cx.notify();
359 cx.emit(Event::Left);
360 self.leave_internal(cx)
361 }
362
363 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
364 if self.status.is_offline() {
365 return Task::ready(Err(anyhow!("room is offline")));
366 }
367
368 log::info!("leaving room");
369 Audio::play_sound(Sound::Leave, cx);
370
371 self.clear_state(cx);
372
373 let leave_room = self.client.request(proto::LeaveRoom {});
374 cx.background().spawn(async move {
375 leave_room.await?;
376 anyhow::Ok(())
377 })
378 }
379
380 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
381 for project in self.shared_projects.drain() {
382 if let Some(project) = project.upgrade(cx) {
383 project.update(cx, |project, cx| {
384 project.unshare(cx).log_err();
385 });
386 }
387 }
388 for project in self.joined_projects.drain() {
389 if let Some(project) = project.upgrade(cx) {
390 project.update(cx, |project, cx| {
391 project.disconnected_from_host(cx);
392 project.close(cx);
393 });
394 }
395 }
396
397 self.status = RoomStatus::Offline;
398 self.remote_participants.clear();
399 self.pending_participants.clear();
400 self.participant_user_ids.clear();
401 self.subscriptions.clear();
402 self.live_kit.take();
403 self.pending_room_update.take();
404 self.maintain_connection.take();
405 }
406
407 async fn maintain_connection(
408 this: WeakModelHandle<Self>,
409 client: Arc<Client>,
410 mut cx: AsyncAppContext,
411 ) -> Result<()> {
412 let mut client_status = client.status();
413 loop {
414 let _ = client_status.try_recv();
415 let is_connected = client_status.borrow().is_connected();
416 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
417 if !is_connected || client_status.next().await.is_some() {
418 log::info!("detected client disconnection");
419
420 this.upgrade(&cx)
421 .ok_or_else(|| anyhow!("room was dropped"))?
422 .update(&mut cx, |this, cx| {
423 this.status = RoomStatus::Rejoining;
424 cx.notify();
425 });
426
427 // Wait for client to re-establish a connection to the server.
428 {
429 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
430 let client_reconnection = async {
431 let mut remaining_attempts = 3;
432 while remaining_attempts > 0 {
433 if client_status.borrow().is_connected() {
434 log::info!("client reconnected, attempting to rejoin room");
435
436 let Some(this) = this.upgrade(&cx) else { break };
437 if this
438 .update(&mut cx, |this, cx| this.rejoin(cx))
439 .await
440 .log_err()
441 .is_some()
442 {
443 return true;
444 } else {
445 remaining_attempts -= 1;
446 }
447 } else if client_status.borrow().is_signed_out() {
448 return false;
449 }
450
451 log::info!(
452 "waiting for client status change, remaining attempts {}",
453 remaining_attempts
454 );
455 client_status.next().await;
456 }
457 false
458 }
459 .fuse();
460 futures::pin_mut!(client_reconnection);
461
462 futures::select_biased! {
463 reconnected = client_reconnection => {
464 if reconnected {
465 log::info!("successfully reconnected to room");
466 // If we successfully joined the room, go back around the loop
467 // waiting for future connection status changes.
468 continue;
469 }
470 }
471 _ = reconnection_timeout => {
472 log::info!("room reconnection timeout expired");
473 }
474 }
475 }
476
477 break;
478 }
479 }
480
481 // The client failed to re-establish a connection to the server
482 // or an error occurred while trying to re-join the room. Either way
483 // we leave the room and return an error.
484 if let Some(this) = this.upgrade(&cx) {
485 log::info!("reconnection failed, leaving room");
486 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
487 }
488 Err(anyhow!(
489 "can't reconnect to room: client failed to re-establish connection"
490 ))
491 }
492
493 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
494 let mut projects = HashMap::default();
495 let mut reshared_projects = Vec::new();
496 let mut rejoined_projects = Vec::new();
497 self.shared_projects.retain(|project| {
498 if let Some(handle) = project.upgrade(cx) {
499 let project = handle.read(cx);
500 if let Some(project_id) = project.remote_id() {
501 projects.insert(project_id, handle.clone());
502 reshared_projects.push(proto::UpdateProject {
503 project_id,
504 worktrees: project.worktree_metadata_protos(cx),
505 });
506 return true;
507 }
508 }
509 false
510 });
511 self.joined_projects.retain(|project| {
512 if let Some(handle) = project.upgrade(cx) {
513 let project = handle.read(cx);
514 if let Some(project_id) = project.remote_id() {
515 projects.insert(project_id, handle.clone());
516 rejoined_projects.push(proto::RejoinProject {
517 id: project_id,
518 worktrees: project
519 .worktrees(cx)
520 .map(|worktree| {
521 let worktree = worktree.read(cx);
522 proto::RejoinWorktree {
523 id: worktree.id().to_proto(),
524 scan_id: worktree.completed_scan_id() as u64,
525 }
526 })
527 .collect(),
528 });
529 }
530 return true;
531 }
532 false
533 });
534
535 let response = self.client.request_envelope(proto::RejoinRoom {
536 id: self.id,
537 reshared_projects,
538 rejoined_projects,
539 });
540
541 cx.spawn(|this, mut cx| async move {
542 let response = response.await?;
543 let message_id = response.message_id;
544 let response = response.payload;
545 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
546 this.update(&mut cx, |this, cx| {
547 this.status = RoomStatus::Online;
548 this.apply_room_update(room_proto, cx)?;
549
550 for reshared_project in response.reshared_projects {
551 if let Some(project) = projects.get(&reshared_project.id) {
552 project.update(cx, |project, cx| {
553 project.reshared(reshared_project, cx).log_err();
554 });
555 }
556 }
557
558 for rejoined_project in response.rejoined_projects {
559 if let Some(project) = projects.get(&rejoined_project.id) {
560 project.update(cx, |project, cx| {
561 project.rejoined(rejoined_project, message_id, cx).log_err();
562 });
563 }
564 }
565
566 anyhow::Ok(())
567 })
568 })
569 }
570
571 pub fn id(&self) -> u64 {
572 self.id
573 }
574
575 pub fn status(&self) -> RoomStatus {
576 self.status
577 }
578
579 pub fn local_participant(&self) -> &LocalParticipant {
580 &self.local_participant
581 }
582
583 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
584 &self.remote_participants
585 }
586
587 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
588 self.remote_participants
589 .values()
590 .find(|p| p.peer_id == peer_id)
591 }
592
593 pub fn pending_participants(&self) -> &[Arc<User>] {
594 &self.pending_participants
595 }
596
597 pub fn contains_participant(&self, user_id: u64) -> bool {
598 self.participant_user_ids.contains(&user_id)
599 }
600
601 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
602 self.follows_by_leader_id_project_id
603 .get(&(leader_id, project_id))
604 .map_or(&[], |v| v.as_slice())
605 }
606
607 /// Returns the most 'active' projects, defined as most people in the project
608 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
609 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
610 for participant in self.remote_participants.values() {
611 match participant.location {
612 ParticipantLocation::SharedProject { project_id } => {
613 project_hosts_and_guest_counts
614 .entry(project_id)
615 .or_default()
616 .1 += 1;
617 }
618 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
619 }
620 for project in &participant.projects {
621 project_hosts_and_guest_counts
622 .entry(project.id)
623 .or_default()
624 .0 = Some(participant.user.id);
625 }
626 }
627
628 if let Some(user) = self.user_store.read(cx).current_user() {
629 for project in &self.local_participant.projects {
630 project_hosts_and_guest_counts
631 .entry(project.id)
632 .or_default()
633 .0 = Some(user.id);
634 }
635 }
636
637 project_hosts_and_guest_counts
638 .into_iter()
639 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
640 .max_by_key(|(_, _, guest_count)| *guest_count)
641 .map(|(id, host, _)| (id, host))
642 }
643
644 async fn handle_room_updated(
645 this: ModelHandle<Self>,
646 envelope: TypedEnvelope<proto::RoomUpdated>,
647 _: Arc<Client>,
648 mut cx: AsyncAppContext,
649 ) -> Result<()> {
650 let room = envelope
651 .payload
652 .room
653 .ok_or_else(|| anyhow!("invalid room"))?;
654 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
655 }
656
657 fn apply_room_update(
658 &mut self,
659 mut room: proto::Room,
660 cx: &mut ModelContext<Self>,
661 ) -> Result<()> {
662 // Filter ourselves out from the room's participants.
663 let local_participant_ix = room
664 .participants
665 .iter()
666 .position(|participant| Some(participant.user_id) == self.client.user_id());
667 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
668
669 let pending_participant_user_ids = room
670 .pending_participants
671 .iter()
672 .map(|p| p.user_id)
673 .collect::<Vec<_>>();
674
675 let remote_participant_user_ids = room
676 .participants
677 .iter()
678 .map(|p| p.user_id)
679 .collect::<Vec<_>>();
680
681 let (remote_participants, pending_participants) =
682 self.user_store.update(cx, move |user_store, cx| {
683 (
684 user_store.get_users(remote_participant_user_ids, cx),
685 user_store.get_users(pending_participant_user_ids, cx),
686 )
687 });
688
689 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
690 let (remote_participants, pending_participants) =
691 futures::join!(remote_participants, pending_participants);
692
693 this.update(&mut cx, |this, cx| {
694 this.participant_user_ids.clear();
695
696 if let Some(participant) = local_participant {
697 this.local_participant.projects = participant.projects;
698 } else {
699 this.local_participant.projects.clear();
700 }
701
702 if let Some(participants) = remote_participants.log_err() {
703 for (participant, user) in room.participants.into_iter().zip(participants) {
704 let Some(peer_id) = participant.peer_id else {
705 continue;
706 };
707 let participant_index = ParticipantIndex(participant.participant_index);
708 this.participant_user_ids.insert(participant.user_id);
709
710 let old_projects = this
711 .remote_participants
712 .get(&participant.user_id)
713 .into_iter()
714 .flat_map(|existing| &existing.projects)
715 .map(|project| project.id)
716 .collect::<HashSet<_>>();
717 let new_projects = participant
718 .projects
719 .iter()
720 .map(|project| project.id)
721 .collect::<HashSet<_>>();
722
723 for project in &participant.projects {
724 if !old_projects.contains(&project.id) {
725 cx.emit(Event::RemoteProjectShared {
726 owner: user.clone(),
727 project_id: project.id,
728 worktree_root_names: project.worktree_root_names.clone(),
729 });
730 }
731 }
732
733 for unshared_project_id in old_projects.difference(&new_projects) {
734 this.joined_projects.retain(|project| {
735 if let Some(project) = project.upgrade(cx) {
736 project.update(cx, |project, cx| {
737 if project.remote_id() == Some(*unshared_project_id) {
738 project.disconnected_from_host(cx);
739 false
740 } else {
741 true
742 }
743 })
744 } else {
745 false
746 }
747 });
748 cx.emit(Event::RemoteProjectUnshared {
749 project_id: *unshared_project_id,
750 });
751 }
752
753 let location = ParticipantLocation::from_proto(participant.location)
754 .unwrap_or(ParticipantLocation::External);
755 if let Some(remote_participant) =
756 this.remote_participants.get_mut(&participant.user_id)
757 {
758 remote_participant.peer_id = peer_id;
759 remote_participant.projects = participant.projects;
760 remote_participant.participant_index = participant_index;
761 if location != remote_participant.location {
762 remote_participant.location = location;
763 cx.emit(Event::ParticipantLocationChanged {
764 participant_id: peer_id,
765 });
766 }
767 } else {
768 this.remote_participants.insert(
769 participant.user_id,
770 RemoteParticipant {
771 user: user.clone(),
772 participant_index,
773 peer_id,
774 projects: participant.projects,
775 location,
776 muted: true,
777 speaking: false,
778 video_tracks: Default::default(),
779 audio_tracks: Default::default(),
780 },
781 );
782
783 Audio::play_sound(Sound::Joined, cx);
784
785 if let Some(live_kit) = this.live_kit.as_ref() {
786 let video_tracks =
787 live_kit.room.remote_video_tracks(&user.id.to_string());
788 let audio_tracks =
789 live_kit.room.remote_audio_tracks(&user.id.to_string());
790 let publications = live_kit
791 .room
792 .remote_audio_track_publications(&user.id.to_string());
793
794 for track in video_tracks {
795 this.remote_video_track_updated(
796 RemoteVideoTrackUpdate::Subscribed(track),
797 cx,
798 )
799 .log_err();
800 }
801
802 for (track, publication) in
803 audio_tracks.iter().zip(publications.iter())
804 {
805 this.remote_audio_track_updated(
806 RemoteAudioTrackUpdate::Subscribed(
807 track.clone(),
808 publication.clone(),
809 ),
810 cx,
811 )
812 .log_err();
813 }
814 }
815 }
816 }
817
818 this.remote_participants.retain(|user_id, participant| {
819 if this.participant_user_ids.contains(user_id) {
820 true
821 } else {
822 for project in &participant.projects {
823 cx.emit(Event::RemoteProjectUnshared {
824 project_id: project.id,
825 });
826 }
827 false
828 }
829 });
830 }
831
832 if let Some(pending_participants) = pending_participants.log_err() {
833 this.pending_participants = pending_participants;
834 for participant in &this.pending_participants {
835 this.participant_user_ids.insert(participant.id);
836 }
837 }
838
839 this.follows_by_leader_id_project_id.clear();
840 for follower in room.followers {
841 let project_id = follower.project_id;
842 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
843 (Some(leader), Some(follower)) => (leader, follower),
844
845 _ => {
846 log::error!("Follower message {follower:?} missing some state");
847 continue;
848 }
849 };
850
851 let list = this
852 .follows_by_leader_id_project_id
853 .entry((leader, project_id))
854 .or_insert(Vec::new());
855 if !list.contains(&follower) {
856 list.push(follower);
857 }
858 }
859
860 this.pending_room_update.take();
861 if this.should_leave() {
862 log::info!("room is empty, leaving");
863 let _ = this.leave(cx);
864 }
865
866 this.user_store.update(cx, |user_store, cx| {
867 let participant_indices_by_user_id = this
868 .remote_participants
869 .iter()
870 .map(|(user_id, participant)| (*user_id, participant.participant_index))
871 .collect();
872 user_store.set_participant_indices(participant_indices_by_user_id, cx);
873 });
874
875 this.check_invariants();
876 this.room_update_completed_tx.try_send(Some(())).ok();
877 cx.notify();
878 });
879 }));
880
881 cx.notify();
882 Ok(())
883 }
884
885 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
886 let mut done_rx = self.room_update_completed_rx.clone();
887 async move {
888 while let Some(result) = done_rx.next().await {
889 if result.is_some() {
890 break;
891 }
892 }
893 }
894 }
895
896 fn remote_video_track_updated(
897 &mut self,
898 change: RemoteVideoTrackUpdate,
899 cx: &mut ModelContext<Self>,
900 ) -> Result<()> {
901 match change {
902 RemoteVideoTrackUpdate::Subscribed(track) => {
903 let user_id = track.publisher_id().parse()?;
904 let track_id = track.sid().to_string();
905 let participant = self
906 .remote_participants
907 .get_mut(&user_id)
908 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
909 participant.video_tracks.insert(
910 track_id.clone(),
911 Arc::new(RemoteVideoTrack {
912 live_kit_track: track,
913 }),
914 );
915 cx.emit(Event::RemoteVideoTracksChanged {
916 participant_id: participant.peer_id,
917 });
918 }
919 RemoteVideoTrackUpdate::Unsubscribed {
920 publisher_id,
921 track_id,
922 } => {
923 let user_id = publisher_id.parse()?;
924 let participant = self
925 .remote_participants
926 .get_mut(&user_id)
927 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
928 participant.video_tracks.remove(&track_id);
929 cx.emit(Event::RemoteVideoTracksChanged {
930 participant_id: participant.peer_id,
931 });
932 }
933 }
934
935 cx.notify();
936 Ok(())
937 }
938
939 fn remote_audio_track_updated(
940 &mut self,
941 change: RemoteAudioTrackUpdate,
942 cx: &mut ModelContext<Self>,
943 ) -> Result<()> {
944 match change {
945 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
946 let mut speaker_ids = speakers
947 .into_iter()
948 .filter_map(|speaker_sid| speaker_sid.parse().ok())
949 .collect::<Vec<u64>>();
950 speaker_ids.sort_unstable();
951 for (sid, participant) in &mut self.remote_participants {
952 if let Ok(_) = speaker_ids.binary_search(sid) {
953 participant.speaking = true;
954 } else {
955 participant.speaking = false;
956 }
957 }
958 if let Some(id) = self.client.user_id() {
959 if let Some(room) = &mut self.live_kit {
960 if let Ok(_) = speaker_ids.binary_search(&id) {
961 room.speaking = true;
962 } else {
963 room.speaking = false;
964 }
965 }
966 }
967 cx.notify();
968 }
969 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
970 let mut found = false;
971 for participant in &mut self.remote_participants.values_mut() {
972 for track in participant.audio_tracks.values() {
973 if track.sid() == track_id {
974 found = true;
975 break;
976 }
977 }
978 if found {
979 participant.muted = muted;
980 break;
981 }
982 }
983
984 cx.notify();
985 }
986 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
987 let user_id = track.publisher_id().parse()?;
988 let track_id = track.sid().to_string();
989 let participant = self
990 .remote_participants
991 .get_mut(&user_id)
992 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
993
994 participant.audio_tracks.insert(track_id.clone(), track);
995 participant.muted = publication.is_muted();
996
997 cx.emit(Event::RemoteAudioTracksChanged {
998 participant_id: participant.peer_id,
999 });
1000 }
1001 RemoteAudioTrackUpdate::Unsubscribed {
1002 publisher_id,
1003 track_id,
1004 } => {
1005 let user_id = publisher_id.parse()?;
1006 let participant = self
1007 .remote_participants
1008 .get_mut(&user_id)
1009 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1010 participant.audio_tracks.remove(&track_id);
1011 cx.emit(Event::RemoteAudioTracksChanged {
1012 participant_id: participant.peer_id,
1013 });
1014 }
1015 }
1016
1017 cx.notify();
1018 Ok(())
1019 }
1020
1021 fn check_invariants(&self) {
1022 #[cfg(any(test, feature = "test-support"))]
1023 {
1024 for participant in self.remote_participants.values() {
1025 assert!(self.participant_user_ids.contains(&participant.user.id));
1026 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1027 }
1028
1029 for participant in &self.pending_participants {
1030 assert!(self.participant_user_ids.contains(&participant.id));
1031 assert_ne!(participant.id, self.client.user_id().unwrap());
1032 }
1033
1034 assert_eq!(
1035 self.participant_user_ids.len(),
1036 self.remote_participants.len() + self.pending_participants.len()
1037 );
1038 }
1039 }
1040
1041 pub(crate) fn call(
1042 &mut self,
1043 called_user_id: u64,
1044 initial_project_id: Option<u64>,
1045 cx: &mut ModelContext<Self>,
1046 ) -> Task<Result<()>> {
1047 if self.status.is_offline() {
1048 return Task::ready(Err(anyhow!("room is offline")));
1049 }
1050
1051 cx.notify();
1052 let client = self.client.clone();
1053 let room_id = self.id;
1054 self.pending_call_count += 1;
1055 cx.spawn(|this, mut cx| async move {
1056 let result = client
1057 .request(proto::Call {
1058 room_id,
1059 called_user_id,
1060 initial_project_id,
1061 })
1062 .await;
1063 this.update(&mut cx, |this, cx| {
1064 this.pending_call_count -= 1;
1065 if this.should_leave() {
1066 this.leave(cx).detach_and_log_err(cx);
1067 }
1068 });
1069 result?;
1070 Ok(())
1071 })
1072 }
1073
1074 pub fn join_project(
1075 &mut self,
1076 id: u64,
1077 language_registry: Arc<LanguageRegistry>,
1078 fs: Arc<dyn Fs>,
1079 cx: &mut ModelContext<Self>,
1080 ) -> Task<Result<ModelHandle<Project>>> {
1081 let client = self.client.clone();
1082 let user_store = self.user_store.clone();
1083 cx.emit(Event::RemoteProjectJoined { project_id: id });
1084 cx.spawn(|this, mut cx| async move {
1085 let project =
1086 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1087
1088 this.update(&mut cx, |this, cx| {
1089 this.joined_projects.retain(|project| {
1090 if let Some(project) = project.upgrade(cx) {
1091 !project.read(cx).is_read_only()
1092 } else {
1093 false
1094 }
1095 });
1096 this.joined_projects.insert(project.downgrade());
1097 });
1098 Ok(project)
1099 })
1100 }
1101
1102 pub(crate) fn share_project(
1103 &mut self,
1104 project: ModelHandle<Project>,
1105 cx: &mut ModelContext<Self>,
1106 ) -> Task<Result<u64>> {
1107 if let Some(project_id) = project.read(cx).remote_id() {
1108 return Task::ready(Ok(project_id));
1109 }
1110
1111 let request = self.client.request(proto::ShareProject {
1112 room_id: self.id(),
1113 worktrees: project.read(cx).worktree_metadata_protos(cx),
1114 });
1115 cx.spawn(|this, mut cx| async move {
1116 let response = request.await?;
1117
1118 project.update(&mut cx, |project, cx| {
1119 project.shared(response.project_id, cx)
1120 })?;
1121
1122 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1123 this.update(&mut cx, |this, cx| {
1124 this.shared_projects.insert(project.downgrade());
1125 let active_project = this.local_participant.active_project.as_ref();
1126 if active_project.map_or(false, |location| *location == project) {
1127 this.set_location(Some(&project), cx)
1128 } else {
1129 Task::ready(Ok(()))
1130 }
1131 })
1132 .await?;
1133
1134 Ok(response.project_id)
1135 })
1136 }
1137
1138 pub(crate) fn unshare_project(
1139 &mut self,
1140 project: ModelHandle<Project>,
1141 cx: &mut ModelContext<Self>,
1142 ) -> Result<()> {
1143 let project_id = match project.read(cx).remote_id() {
1144 Some(project_id) => project_id,
1145 None => return Ok(()),
1146 };
1147
1148 self.client.send(proto::UnshareProject { project_id })?;
1149 project.update(cx, |this, cx| this.unshare(cx))
1150 }
1151
1152 pub(crate) fn set_location(
1153 &mut self,
1154 project: Option<&ModelHandle<Project>>,
1155 cx: &mut ModelContext<Self>,
1156 ) -> Task<Result<()>> {
1157 if self.status.is_offline() {
1158 return Task::ready(Err(anyhow!("room is offline")));
1159 }
1160
1161 let client = self.client.clone();
1162 let room_id = self.id;
1163 let location = if let Some(project) = project {
1164 self.local_participant.active_project = Some(project.downgrade());
1165 if let Some(project_id) = project.read(cx).remote_id() {
1166 proto::participant_location::Variant::SharedProject(
1167 proto::participant_location::SharedProject { id: project_id },
1168 )
1169 } else {
1170 proto::participant_location::Variant::UnsharedProject(
1171 proto::participant_location::UnsharedProject {},
1172 )
1173 }
1174 } else {
1175 self.local_participant.active_project = None;
1176 proto::participant_location::Variant::External(proto::participant_location::External {})
1177 };
1178
1179 cx.notify();
1180 cx.foreground().spawn(async move {
1181 client
1182 .request(proto::UpdateParticipantLocation {
1183 room_id,
1184 location: Some(proto::ParticipantLocation {
1185 variant: Some(location),
1186 }),
1187 })
1188 .await?;
1189 Ok(())
1190 })
1191 }
1192
1193 pub fn is_screen_sharing(&self) -> bool {
1194 self.live_kit.as_ref().map_or(false, |live_kit| {
1195 !matches!(live_kit.screen_track, LocalTrack::None)
1196 })
1197 }
1198
1199 pub fn is_sharing_mic(&self) -> bool {
1200 self.live_kit.as_ref().map_or(false, |live_kit| {
1201 !matches!(live_kit.microphone_track, LocalTrack::None)
1202 })
1203 }
1204
1205 pub fn is_muted(&self, cx: &AppContext) -> bool {
1206 self.live_kit
1207 .as_ref()
1208 .and_then(|live_kit| match &live_kit.microphone_track {
1209 LocalTrack::None => Some(Self::mute_on_join(cx)),
1210 LocalTrack::Pending { muted, .. } => Some(*muted),
1211 LocalTrack::Published { muted, .. } => Some(*muted),
1212 })
1213 .unwrap_or(false)
1214 }
1215
1216 pub fn is_speaking(&self) -> bool {
1217 self.live_kit
1218 .as_ref()
1219 .map_or(false, |live_kit| live_kit.speaking)
1220 }
1221
1222 pub fn is_deafened(&self) -> Option<bool> {
1223 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1224 }
1225
1226 #[track_caller]
1227 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1228 if self.status.is_offline() {
1229 return Task::ready(Err(anyhow!("room is offline")));
1230 } else if self.is_sharing_mic() {
1231 return Task::ready(Err(anyhow!("microphone was already shared")));
1232 }
1233
1234 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1235 let publish_id = post_inc(&mut live_kit.next_publish_id);
1236 live_kit.microphone_track = LocalTrack::Pending {
1237 publish_id,
1238 muted: false,
1239 };
1240 cx.notify();
1241 publish_id
1242 } else {
1243 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1244 };
1245
1246 cx.spawn_weak(|this, mut cx| async move {
1247 let publish_track = async {
1248 let track = LocalAudioTrack::create();
1249 this.upgrade(&cx)
1250 .ok_or_else(|| anyhow!("room was dropped"))?
1251 .read_with(&cx, |this, _| {
1252 this.live_kit
1253 .as_ref()
1254 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1255 })
1256 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1257 .await
1258 };
1259
1260 let publication = publish_track.await;
1261 this.upgrade(&cx)
1262 .ok_or_else(|| anyhow!("room was dropped"))?
1263 .update(&mut cx, |this, cx| {
1264 let live_kit = this
1265 .live_kit
1266 .as_mut()
1267 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1268
1269 let (canceled, muted) = if let LocalTrack::Pending {
1270 publish_id: cur_publish_id,
1271 muted,
1272 } = &live_kit.microphone_track
1273 {
1274 (*cur_publish_id != publish_id, *muted)
1275 } else {
1276 (true, false)
1277 };
1278
1279 match publication {
1280 Ok(publication) => {
1281 if canceled {
1282 live_kit.room.unpublish_track(publication);
1283 } else {
1284 if muted {
1285 cx.background().spawn(publication.set_mute(muted)).detach();
1286 }
1287 live_kit.microphone_track = LocalTrack::Published {
1288 track_publication: publication,
1289 muted,
1290 };
1291 cx.notify();
1292 }
1293 Ok(())
1294 }
1295 Err(error) => {
1296 if canceled {
1297 Ok(())
1298 } else {
1299 live_kit.microphone_track = LocalTrack::None;
1300 cx.notify();
1301 Err(error)
1302 }
1303 }
1304 }
1305 })
1306 })
1307 }
1308
1309 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1310 if self.status.is_offline() {
1311 return Task::ready(Err(anyhow!("room is offline")));
1312 } else if self.is_screen_sharing() {
1313 return Task::ready(Err(anyhow!("screen was already shared")));
1314 }
1315
1316 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1317 let publish_id = post_inc(&mut live_kit.next_publish_id);
1318 live_kit.screen_track = LocalTrack::Pending {
1319 publish_id,
1320 muted: false,
1321 };
1322 cx.notify();
1323 (live_kit.room.display_sources(), publish_id)
1324 } else {
1325 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1326 };
1327
1328 cx.spawn_weak(|this, mut cx| async move {
1329 let publish_track = async {
1330 let displays = displays.await?;
1331 let display = displays
1332 .first()
1333 .ok_or_else(|| anyhow!("no display found"))?;
1334 let track = LocalVideoTrack::screen_share_for_display(&display);
1335 this.upgrade(&cx)
1336 .ok_or_else(|| anyhow!("room was dropped"))?
1337 .read_with(&cx, |this, _| {
1338 this.live_kit
1339 .as_ref()
1340 .map(|live_kit| live_kit.room.publish_video_track(&track))
1341 })
1342 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1343 .await
1344 };
1345
1346 let publication = publish_track.await;
1347 this.upgrade(&cx)
1348 .ok_or_else(|| anyhow!("room was dropped"))?
1349 .update(&mut cx, |this, cx| {
1350 let live_kit = this
1351 .live_kit
1352 .as_mut()
1353 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1354
1355 let (canceled, muted) = if let LocalTrack::Pending {
1356 publish_id: cur_publish_id,
1357 muted,
1358 } = &live_kit.screen_track
1359 {
1360 (*cur_publish_id != publish_id, *muted)
1361 } else {
1362 (true, false)
1363 };
1364
1365 match publication {
1366 Ok(publication) => {
1367 if canceled {
1368 live_kit.room.unpublish_track(publication);
1369 } else {
1370 if muted {
1371 cx.background().spawn(publication.set_mute(muted)).detach();
1372 }
1373 live_kit.screen_track = LocalTrack::Published {
1374 track_publication: publication,
1375 muted,
1376 };
1377 cx.notify();
1378 }
1379
1380 Audio::play_sound(Sound::StartScreenshare, cx);
1381
1382 Ok(())
1383 }
1384 Err(error) => {
1385 if canceled {
1386 Ok(())
1387 } else {
1388 live_kit.screen_track = LocalTrack::None;
1389 cx.notify();
1390 Err(error)
1391 }
1392 }
1393 }
1394 })
1395 })
1396 }
1397
1398 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1399 let should_mute = !self.is_muted(cx);
1400 if let Some(live_kit) = self.live_kit.as_mut() {
1401 if matches!(live_kit.microphone_track, LocalTrack::None) {
1402 return Ok(self.share_microphone(cx));
1403 }
1404
1405 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1406 live_kit.muted_by_user = should_mute;
1407
1408 if old_muted == true && live_kit.deafened == true {
1409 if let Some(task) = self.toggle_deafen(cx).ok() {
1410 task.detach();
1411 }
1412 }
1413
1414 Ok(ret_task)
1415 } else {
1416 Err(anyhow!("LiveKit not started"))
1417 }
1418 }
1419
1420 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1421 if let Some(live_kit) = self.live_kit.as_mut() {
1422 (*live_kit).deafened = !live_kit.deafened;
1423
1424 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1425 // Context notification is sent within set_mute itself.
1426 let mut mute_task = None;
1427 // When deafening, mute user's mic as well.
1428 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1429 if live_kit.deafened || !live_kit.muted_by_user {
1430 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1431 };
1432 for participant in self.remote_participants.values() {
1433 for track in live_kit
1434 .room
1435 .remote_audio_track_publications(&participant.user.id.to_string())
1436 {
1437 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1438 }
1439 }
1440
1441 Ok(cx.foreground().spawn(async move {
1442 if let Some(mute_task) = mute_task {
1443 mute_task.await?;
1444 }
1445 for task in tasks {
1446 task.await?;
1447 }
1448 Ok(())
1449 }))
1450 } else {
1451 Err(anyhow!("LiveKit not started"))
1452 }
1453 }
1454
1455 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1456 if self.status.is_offline() {
1457 return Err(anyhow!("room is offline"));
1458 }
1459
1460 let live_kit = self
1461 .live_kit
1462 .as_mut()
1463 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1464 match mem::take(&mut live_kit.screen_track) {
1465 LocalTrack::None => Err(anyhow!("screen was not shared")),
1466 LocalTrack::Pending { .. } => {
1467 cx.notify();
1468 Ok(())
1469 }
1470 LocalTrack::Published {
1471 track_publication, ..
1472 } => {
1473 live_kit.room.unpublish_track(track_publication);
1474 cx.notify();
1475
1476 Audio::play_sound(Sound::StopScreenshare, cx);
1477 Ok(())
1478 }
1479 }
1480 }
1481
1482 #[cfg(any(test, feature = "test-support"))]
1483 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1484 self.live_kit
1485 .as_ref()
1486 .unwrap()
1487 .room
1488 .set_display_sources(sources);
1489 }
1490}
1491
1492struct LiveKitRoom {
1493 room: Arc<live_kit_client::Room>,
1494 screen_track: LocalTrack,
1495 microphone_track: LocalTrack,
1496 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1497 muted_by_user: bool,
1498 deafened: bool,
1499 speaking: bool,
1500 next_publish_id: usize,
1501 _maintain_room: Task<()>,
1502 _maintain_tracks: [Task<()>; 2],
1503}
1504
1505impl LiveKitRoom {
1506 fn set_mute(
1507 self: &mut LiveKitRoom,
1508 should_mute: bool,
1509 cx: &mut ModelContext<Room>,
1510 ) -> Result<(Task<Result<()>>, bool)> {
1511 if !should_mute {
1512 // clear user muting state.
1513 self.muted_by_user = false;
1514 }
1515
1516 let (result, old_muted) = match &mut self.microphone_track {
1517 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1518 LocalTrack::Pending { muted, .. } => {
1519 let old_muted = *muted;
1520 *muted = should_mute;
1521 cx.notify();
1522 Ok((Task::Ready(Some(Ok(()))), old_muted))
1523 }
1524 LocalTrack::Published {
1525 track_publication,
1526 muted,
1527 } => {
1528 let old_muted = *muted;
1529 *muted = should_mute;
1530 cx.notify();
1531 Ok((
1532 cx.background().spawn(track_publication.set_mute(*muted)),
1533 old_muted,
1534 ))
1535 }
1536 }?;
1537
1538 if old_muted != should_mute {
1539 if should_mute {
1540 Audio::play_sound(Sound::Mute, cx);
1541 } else {
1542 Audio::play_sound(Sound::Unmute, cx);
1543 }
1544 }
1545
1546 Ok((result, old_muted))
1547 }
1548}
1549
1550enum LocalTrack {
1551 None,
1552 Pending {
1553 publish_id: usize,
1554 muted: bool,
1555 },
1556 Published {
1557 track_publication: LocalTrackPublication,
1558 muted: bool,
1559 },
1560}
1561
1562impl Default for LocalTrack {
1563 fn default() -> Self {
1564 Self::None
1565 }
1566}
1567
1568#[derive(Copy, Clone, PartialEq, Eq)]
1569pub enum RoomStatus {
1570 Online,
1571 Rejoining,
1572 Offline,
1573}
1574
1575impl RoomStatus {
1576 pub fn is_offline(&self) -> bool {
1577 matches!(self, RoomStatus::Offline)
1578 }
1579
1580 pub fn is_online(&self) -> bool {
1581 matches!(self, RoomStatus::Online)
1582 }
1583}