1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio2::{Audio, Sound};
8use client2::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs2::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui2::{
16 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
17};
18use language2::LanguageRegistry;
19use live_kit_client2::{
20 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
21 RemoteVideoTrackUpdate,
22};
23use postage::{sink::Sink, stream::Stream, watch};
24use project2::Project;
25use settings2::Settings;
26use std::{future::Future, mem, sync::Arc, time::Duration};
27use util::{post_inc, ResultExt, TryFutureExt};
28
29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub enum Event {
33 ParticipantLocationChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteVideoTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteAudioTracksChanged {
40 participant_id: proto::PeerId,
41 },
42 RemoteProjectShared {
43 owner: Arc<User>,
44 project_id: u64,
45 worktree_root_names: Vec<String>,
46 },
47 RemoteProjectUnshared {
48 project_id: u64,
49 },
50 RemoteProjectJoined {
51 project_id: u64,
52 },
53 RemoteProjectInvitationDiscarded {
54 project_id: u64,
55 },
56 Left,
57}
58
59pub struct Room {
60 id: u64,
61 channel_id: Option<u64>,
62 live_kit: Option<LiveKitRoom>,
63 status: RoomStatus,
64 shared_projects: HashSet<WeakModel<Project>>,
65 joined_projects: HashSet<WeakModel<Project>>,
66 local_participant: LocalParticipant,
67 remote_participants: BTreeMap<u64, RemoteParticipant>,
68 pending_participants: Vec<Arc<User>>,
69 participant_user_ids: HashSet<u64>,
70 pending_call_count: usize,
71 leave_when_empty: bool,
72 client: Arc<Client>,
73 user_store: Model<UserStore>,
74 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
75 client_subscriptions: Vec<client2::Subscription>,
76 _subscriptions: Vec<gpui2::Subscription>,
77 room_update_completed_tx: watch::Sender<Option<()>>,
78 room_update_completed_rx: watch::Receiver<Option<()>>,
79 pending_room_update: Option<Task<()>>,
80 maintain_connection: Option<Task<Option<()>>>,
81}
82
83impl EventEmitter for Room {
84 type Event = Event;
85}
86
87impl Room {
88 pub fn channel_id(&self) -> Option<u64> {
89 self.channel_id
90 }
91
92 pub fn is_sharing_project(&self) -> bool {
93 !self.shared_projects.is_empty()
94 }
95
96 #[cfg(any(test, feature = "test-support"))]
97 pub fn is_connected(&self) -> bool {
98 if let Some(live_kit) = self.live_kit.as_ref() {
99 matches!(
100 *live_kit.room.status().borrow(),
101 live_kit_client2::ConnectionState::Connected { .. }
102 )
103 } else {
104 false
105 }
106 }
107
108 fn new(
109 id: u64,
110 channel_id: Option<u64>,
111 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
112 client: Arc<Client>,
113 user_store: Model<UserStore>,
114 cx: &mut ModelContext<Self>,
115 ) -> Self {
116 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
117 let room = live_kit_client2::Room::new();
118 let mut status = room.status();
119 // Consume the initial status of the room.
120 let _ = status.try_recv();
121 let _maintain_room = cx.spawn(|this, mut cx| async move {
122 while let Some(status) = status.next().await {
123 let this = if let Some(this) = this.upgrade() {
124 this
125 } else {
126 break;
127 };
128
129 if status == live_kit_client2::ConnectionState::Disconnected {
130 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
131 .ok();
132 break;
133 }
134 }
135 });
136
137 let _maintain_video_tracks = cx.spawn_on_main({
138 let room = room.clone();
139 move |this, mut cx| async move {
140 let mut track_video_changes = room.remote_video_track_updates();
141 while let Some(track_change) = track_video_changes.next().await {
142 let this = if let Some(this) = this.upgrade() {
143 this
144 } else {
145 break;
146 };
147
148 this.update(&mut cx, |this, cx| {
149 this.remote_video_track_updated(track_change, cx).log_err()
150 })
151 .ok();
152 }
153 }
154 });
155
156 let _maintain_audio_tracks = cx.spawn_on_main({
157 let room = room.clone();
158 |this, mut cx| async move {
159 let mut track_audio_changes = room.remote_audio_track_updates();
160 while let Some(track_change) = track_audio_changes.next().await {
161 let this = if let Some(this) = this.upgrade() {
162 this
163 } else {
164 break;
165 };
166
167 this.update(&mut cx, |this, cx| {
168 this.remote_audio_track_updated(track_change, cx).log_err()
169 })
170 .ok();
171 }
172 }
173 });
174
175 let connect = room.connect(&connection_info.server_url, &connection_info.token);
176 cx.spawn(|this, mut cx| async move {
177 connect.await?;
178
179 if !cx.update(|cx| Self::mute_on_join(cx))? {
180 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
181 .await?;
182 }
183
184 anyhow::Ok(())
185 })
186 .detach_and_log_err(cx);
187
188 Some(LiveKitRoom {
189 room,
190 screen_track: LocalTrack::None,
191 microphone_track: LocalTrack::None,
192 next_publish_id: 0,
193 muted_by_user: false,
194 deafened: false,
195 speaking: false,
196 _maintain_room,
197 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
198 })
199 } else {
200 None
201 };
202
203 let maintain_connection = cx.spawn({
204 let client = client.clone();
205 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
206 });
207
208 Audio::play_sound(Sound::Joined, cx);
209
210 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
211
212 Self {
213 id,
214 channel_id,
215 live_kit: live_kit_room,
216 status: RoomStatus::Online,
217 shared_projects: Default::default(),
218 joined_projects: Default::default(),
219 participant_user_ids: Default::default(),
220 local_participant: Default::default(),
221 remote_participants: Default::default(),
222 pending_participants: Default::default(),
223 pending_call_count: 0,
224 client_subscriptions: vec![
225 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
226 ],
227 _subscriptions: vec![
228 cx.on_release(Self::released),
229 cx.on_app_quit(Self::app_will_quit),
230 ],
231 leave_when_empty: false,
232 pending_room_update: None,
233 client,
234 user_store,
235 follows_by_leader_id_project_id: Default::default(),
236 maintain_connection: Some(maintain_connection),
237 room_update_completed_tx,
238 room_update_completed_rx,
239 }
240 }
241
242 pub(crate) fn create(
243 called_user_id: u64,
244 initial_project: Option<Model<Project>>,
245 client: Arc<Client>,
246 user_store: Model<UserStore>,
247 cx: &mut AppContext,
248 ) -> Task<Result<Model<Self>>> {
249 cx.spawn(move |mut cx| async move {
250 let response = client.request(proto::CreateRoom {}).await?;
251 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
252 let room = cx.build_model(|cx| {
253 Self::new(
254 room_proto.id,
255 None,
256 response.live_kit_connection_info,
257 client,
258 user_store,
259 cx,
260 )
261 })?;
262
263 let initial_project_id = if let Some(initial_project) = initial_project {
264 let initial_project_id = room
265 .update(&mut cx, |room, cx| {
266 room.share_project(initial_project.clone(), cx)
267 })?
268 .await?;
269 Some(initial_project_id)
270 } else {
271 None
272 };
273
274 match room
275 .update(&mut cx, |room, cx| {
276 room.leave_when_empty = true;
277 room.call(called_user_id, initial_project_id, cx)
278 })?
279 .await
280 {
281 Ok(()) => Ok(room),
282 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
283 }
284 })
285 }
286
287 pub(crate) fn join_channel(
288 channel_id: u64,
289 client: Arc<Client>,
290 user_store: Model<UserStore>,
291 cx: &mut AppContext,
292 ) -> Task<Result<Model<Self>>> {
293 cx.spawn(move |cx| async move {
294 Self::from_join_response(
295 client.request(proto::JoinChannel { channel_id }).await?,
296 client,
297 user_store,
298 cx,
299 )
300 })
301 }
302
303 pub(crate) fn join(
304 call: &IncomingCall,
305 client: Arc<Client>,
306 user_store: Model<UserStore>,
307 cx: &mut AppContext,
308 ) -> Task<Result<Model<Self>>> {
309 let id = call.room_id;
310 cx.spawn(move |cx| async move {
311 Self::from_join_response(
312 client.request(proto::JoinRoom { id }).await?,
313 client,
314 user_store,
315 cx,
316 )
317 })
318 }
319
320 fn released(&mut self, cx: &mut AppContext) {
321 if self.status.is_online() {
322 self.leave_internal(cx).detach_and_log_err(cx);
323 }
324 }
325
326 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
327 let task = if self.status.is_online() {
328 let leave = self.leave_internal(cx);
329 Some(cx.executor().spawn(async move {
330 leave.await.log_err();
331 }))
332 } else {
333 None
334 };
335
336 async move {
337 if let Some(task) = task {
338 task.await;
339 }
340 }
341 }
342
343 pub fn mute_on_join(cx: &AppContext) -> bool {
344 CallSettings::get_global(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some()
345 }
346
347 fn from_join_response(
348 response: proto::JoinRoomResponse,
349 client: Arc<Client>,
350 user_store: Model<UserStore>,
351 mut cx: AsyncAppContext,
352 ) -> Result<Model<Self>> {
353 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
354 let room = cx.build_model(|cx| {
355 Self::new(
356 room_proto.id,
357 response.channel_id,
358 response.live_kit_connection_info,
359 client,
360 user_store,
361 cx,
362 )
363 })?;
364 room.update(&mut cx, |room, cx| {
365 room.leave_when_empty = room.channel_id.is_none();
366 room.apply_room_update(room_proto, cx)?;
367 anyhow::Ok(())
368 })??;
369 Ok(room)
370 }
371
372 fn should_leave(&self) -> bool {
373 self.leave_when_empty
374 && self.pending_room_update.is_none()
375 && self.pending_participants.is_empty()
376 && self.remote_participants.is_empty()
377 && self.pending_call_count == 0
378 }
379
380 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
381 cx.notify();
382 cx.emit(Event::Left);
383 self.leave_internal(cx)
384 }
385
386 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
387 if self.status.is_offline() {
388 return Task::ready(Err(anyhow!("room is offline")));
389 }
390
391 log::info!("leaving room");
392 Audio::play_sound(Sound::Leave, cx);
393
394 self.clear_state(cx);
395
396 let leave_room = self.client.request(proto::LeaveRoom {});
397 cx.executor().spawn(async move {
398 leave_room.await?;
399 anyhow::Ok(())
400 })
401 }
402
403 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
404 for project in self.shared_projects.drain() {
405 if let Some(project) = project.upgrade() {
406 project.update(cx, |project, cx| {
407 project.unshare(cx).log_err();
408 });
409 }
410 }
411 for project in self.joined_projects.drain() {
412 if let Some(project) = project.upgrade() {
413 project.update(cx, |project, cx| {
414 project.disconnected_from_host(cx);
415 project.close(cx);
416 });
417 }
418 }
419
420 self.status = RoomStatus::Offline;
421 self.remote_participants.clear();
422 self.pending_participants.clear();
423 self.participant_user_ids.clear();
424 self.client_subscriptions.clear();
425 self.live_kit.take();
426 self.pending_room_update.take();
427 self.maintain_connection.take();
428 }
429
430 async fn maintain_connection(
431 this: WeakModel<Self>,
432 client: Arc<Client>,
433 mut cx: AsyncAppContext,
434 ) -> Result<()> {
435 let mut client_status = client.status();
436 loop {
437 let _ = client_status.try_recv();
438 let is_connected = client_status.borrow().is_connected();
439 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
440 if !is_connected || client_status.next().await.is_some() {
441 log::info!("detected client disconnection");
442
443 this.upgrade()
444 .ok_or_else(|| anyhow!("room was dropped"))?
445 .update(&mut cx, |this, cx| {
446 this.status = RoomStatus::Rejoining;
447 cx.notify();
448 })?;
449
450 // Wait for client to re-establish a connection to the server.
451 {
452 let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse();
453 let client_reconnection = async {
454 let mut remaining_attempts = 3;
455 while remaining_attempts > 0 {
456 if client_status.borrow().is_connected() {
457 log::info!("client reconnected, attempting to rejoin room");
458
459 let Some(this) = this.upgrade() else { break };
460 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
461 Ok(task) => {
462 if task.await.log_err().is_some() {
463 return true;
464 } else {
465 remaining_attempts -= 1;
466 }
467 }
468 Err(_app_dropped) => return false,
469 }
470 } else if client_status.borrow().is_signed_out() {
471 return false;
472 }
473
474 log::info!(
475 "waiting for client status change, remaining attempts {}",
476 remaining_attempts
477 );
478 client_status.next().await;
479 }
480 false
481 }
482 .fuse();
483 futures::pin_mut!(client_reconnection);
484
485 futures::select_biased! {
486 reconnected = client_reconnection => {
487 if reconnected {
488 log::info!("successfully reconnected to room");
489 // If we successfully joined the room, go back around the loop
490 // waiting for future connection status changes.
491 continue;
492 }
493 }
494 _ = reconnection_timeout => {
495 log::info!("room reconnection timeout expired");
496 }
497 }
498 }
499
500 break;
501 }
502 }
503
504 // The client failed to re-establish a connection to the server
505 // or an error occurred while trying to re-join the room. Either way
506 // we leave the room and return an error.
507 if let Some(this) = this.upgrade() {
508 log::info!("reconnection failed, leaving room");
509 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
510 }
511 Err(anyhow!(
512 "can't reconnect to room: client failed to re-establish connection"
513 ))
514 }
515
516 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
517 let mut projects = HashMap::default();
518 let mut reshared_projects = Vec::new();
519 let mut rejoined_projects = Vec::new();
520 self.shared_projects.retain(|project| {
521 if let Some(handle) = project.upgrade() {
522 let project = handle.read(cx);
523 if let Some(project_id) = project.remote_id() {
524 projects.insert(project_id, handle.clone());
525 reshared_projects.push(proto::UpdateProject {
526 project_id,
527 worktrees: project.worktree_metadata_protos(cx),
528 });
529 return true;
530 }
531 }
532 false
533 });
534 self.joined_projects.retain(|project| {
535 if let Some(handle) = project.upgrade() {
536 let project = handle.read(cx);
537 if let Some(project_id) = project.remote_id() {
538 projects.insert(project_id, handle.clone());
539 rejoined_projects.push(proto::RejoinProject {
540 id: project_id,
541 worktrees: project
542 .worktrees()
543 .map(|worktree| {
544 let worktree = worktree.read(cx);
545 proto::RejoinWorktree {
546 id: worktree.id().to_proto(),
547 scan_id: worktree.completed_scan_id() as u64,
548 }
549 })
550 .collect(),
551 });
552 }
553 return true;
554 }
555 false
556 });
557
558 let response = self.client.request_envelope(proto::RejoinRoom {
559 id: self.id,
560 reshared_projects,
561 rejoined_projects,
562 });
563
564 cx.spawn(|this, mut cx| async move {
565 let response = response.await?;
566 let message_id = response.message_id;
567 let response = response.payload;
568 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
569 this.update(&mut cx, |this, cx| {
570 this.status = RoomStatus::Online;
571 this.apply_room_update(room_proto, cx)?;
572
573 for reshared_project in response.reshared_projects {
574 if let Some(project) = projects.get(&reshared_project.id) {
575 project.update(cx, |project, cx| {
576 project.reshared(reshared_project, cx).log_err();
577 });
578 }
579 }
580
581 for rejoined_project in response.rejoined_projects {
582 if let Some(project) = projects.get(&rejoined_project.id) {
583 project.update(cx, |project, cx| {
584 project.rejoined(rejoined_project, message_id, cx).log_err();
585 });
586 }
587 }
588
589 anyhow::Ok(())
590 })?
591 })
592 }
593
594 pub fn id(&self) -> u64 {
595 self.id
596 }
597
598 pub fn status(&self) -> RoomStatus {
599 self.status
600 }
601
602 pub fn local_participant(&self) -> &LocalParticipant {
603 &self.local_participant
604 }
605
606 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
607 &self.remote_participants
608 }
609
610 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
611 self.remote_participants
612 .values()
613 .find(|p| p.peer_id == peer_id)
614 }
615
616 pub fn pending_participants(&self) -> &[Arc<User>] {
617 &self.pending_participants
618 }
619
620 pub fn contains_participant(&self, user_id: u64) -> bool {
621 self.participant_user_ids.contains(&user_id)
622 }
623
624 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
625 self.follows_by_leader_id_project_id
626 .get(&(leader_id, project_id))
627 .map_or(&[], |v| v.as_slice())
628 }
629
630 /// Returns the most 'active' projects, defined as most people in the project
631 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
632 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
633 for participant in self.remote_participants.values() {
634 match participant.location {
635 ParticipantLocation::SharedProject { project_id } => {
636 project_hosts_and_guest_counts
637 .entry(project_id)
638 .or_default()
639 .1 += 1;
640 }
641 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
642 }
643 for project in &participant.projects {
644 project_hosts_and_guest_counts
645 .entry(project.id)
646 .or_default()
647 .0 = Some(participant.user.id);
648 }
649 }
650
651 if let Some(user) = self.user_store.read(cx).current_user() {
652 for project in &self.local_participant.projects {
653 project_hosts_and_guest_counts
654 .entry(project.id)
655 .or_default()
656 .0 = Some(user.id);
657 }
658 }
659
660 project_hosts_and_guest_counts
661 .into_iter()
662 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
663 .max_by_key(|(_, _, guest_count)| *guest_count)
664 .map(|(id, host, _)| (id, host))
665 }
666
667 async fn handle_room_updated(
668 this: Model<Self>,
669 envelope: TypedEnvelope<proto::RoomUpdated>,
670 _: Arc<Client>,
671 mut cx: AsyncAppContext,
672 ) -> Result<()> {
673 let room = envelope
674 .payload
675 .room
676 .ok_or_else(|| anyhow!("invalid room"))?;
677 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
678 }
679
680 fn apply_room_update(
681 &mut self,
682 mut room: proto::Room,
683 cx: &mut ModelContext<Self>,
684 ) -> Result<()> {
685 // Filter ourselves out from the room's participants.
686 let local_participant_ix = room
687 .participants
688 .iter()
689 .position(|participant| Some(participant.user_id) == self.client.user_id());
690 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
691
692 let pending_participant_user_ids = room
693 .pending_participants
694 .iter()
695 .map(|p| p.user_id)
696 .collect::<Vec<_>>();
697
698 let remote_participant_user_ids = room
699 .participants
700 .iter()
701 .map(|p| p.user_id)
702 .collect::<Vec<_>>();
703
704 let (remote_participants, pending_participants) =
705 self.user_store.update(cx, move |user_store, cx| {
706 (
707 user_store.get_users(remote_participant_user_ids, cx),
708 user_store.get_users(pending_participant_user_ids, cx),
709 )
710 });
711
712 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
713 let (remote_participants, pending_participants) =
714 futures::join!(remote_participants, pending_participants);
715
716 this.update(&mut cx, |this, cx| {
717 this.participant_user_ids.clear();
718
719 if let Some(participant) = local_participant {
720 this.local_participant.projects = participant.projects;
721 } else {
722 this.local_participant.projects.clear();
723 }
724
725 if let Some(participants) = remote_participants.log_err() {
726 for (participant, user) in room.participants.into_iter().zip(participants) {
727 let Some(peer_id) = participant.peer_id else {
728 continue;
729 };
730 let participant_index = ParticipantIndex(participant.participant_index);
731 this.participant_user_ids.insert(participant.user_id);
732
733 let old_projects = this
734 .remote_participants
735 .get(&participant.user_id)
736 .into_iter()
737 .flat_map(|existing| &existing.projects)
738 .map(|project| project.id)
739 .collect::<HashSet<_>>();
740 let new_projects = participant
741 .projects
742 .iter()
743 .map(|project| project.id)
744 .collect::<HashSet<_>>();
745
746 for project in &participant.projects {
747 if !old_projects.contains(&project.id) {
748 cx.emit(Event::RemoteProjectShared {
749 owner: user.clone(),
750 project_id: project.id,
751 worktree_root_names: project.worktree_root_names.clone(),
752 });
753 }
754 }
755
756 for unshared_project_id in old_projects.difference(&new_projects) {
757 this.joined_projects.retain(|project| {
758 if let Some(project) = project.upgrade() {
759 project.update(cx, |project, cx| {
760 if project.remote_id() == Some(*unshared_project_id) {
761 project.disconnected_from_host(cx);
762 false
763 } else {
764 true
765 }
766 })
767 } else {
768 false
769 }
770 });
771 cx.emit(Event::RemoteProjectUnshared {
772 project_id: *unshared_project_id,
773 });
774 }
775
776 let location = ParticipantLocation::from_proto(participant.location)
777 .unwrap_or(ParticipantLocation::External);
778 if let Some(remote_participant) =
779 this.remote_participants.get_mut(&participant.user_id)
780 {
781 remote_participant.peer_id = peer_id;
782 remote_participant.projects = participant.projects;
783 remote_participant.participant_index = participant_index;
784 if location != remote_participant.location {
785 remote_participant.location = location;
786 cx.emit(Event::ParticipantLocationChanged {
787 participant_id: peer_id,
788 });
789 }
790 } else {
791 this.remote_participants.insert(
792 participant.user_id,
793 RemoteParticipant {
794 user: user.clone(),
795 participant_index,
796 peer_id,
797 projects: participant.projects,
798 location,
799 muted: true,
800 speaking: false,
801 video_tracks: Default::default(),
802 audio_tracks: Default::default(),
803 },
804 );
805
806 Audio::play_sound(Sound::Joined, cx);
807
808 if let Some(live_kit) = this.live_kit.as_ref() {
809 let video_tracks =
810 live_kit.room.remote_video_tracks(&user.id.to_string());
811 let audio_tracks =
812 live_kit.room.remote_audio_tracks(&user.id.to_string());
813 let publications = live_kit
814 .room
815 .remote_audio_track_publications(&user.id.to_string());
816
817 for track in video_tracks {
818 this.remote_video_track_updated(
819 RemoteVideoTrackUpdate::Subscribed(track),
820 cx,
821 )
822 .log_err();
823 }
824
825 for (track, publication) in
826 audio_tracks.iter().zip(publications.iter())
827 {
828 this.remote_audio_track_updated(
829 RemoteAudioTrackUpdate::Subscribed(
830 track.clone(),
831 publication.clone(),
832 ),
833 cx,
834 )
835 .log_err();
836 }
837 }
838 }
839 }
840
841 this.remote_participants.retain(|user_id, participant| {
842 if this.participant_user_ids.contains(user_id) {
843 true
844 } else {
845 for project in &participant.projects {
846 cx.emit(Event::RemoteProjectUnshared {
847 project_id: project.id,
848 });
849 }
850 false
851 }
852 });
853 }
854
855 if let Some(pending_participants) = pending_participants.log_err() {
856 this.pending_participants = pending_participants;
857 for participant in &this.pending_participants {
858 this.participant_user_ids.insert(participant.id);
859 }
860 }
861
862 this.follows_by_leader_id_project_id.clear();
863 for follower in room.followers {
864 let project_id = follower.project_id;
865 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
866 (Some(leader), Some(follower)) => (leader, follower),
867
868 _ => {
869 log::error!("Follower message {follower:?} missing some state");
870 continue;
871 }
872 };
873
874 let list = this
875 .follows_by_leader_id_project_id
876 .entry((leader, project_id))
877 .or_insert(Vec::new());
878 if !list.contains(&follower) {
879 list.push(follower);
880 }
881 }
882
883 this.pending_room_update.take();
884 if this.should_leave() {
885 log::info!("room is empty, leaving");
886 let _ = this.leave(cx);
887 }
888
889 this.user_store.update(cx, |user_store, cx| {
890 let participant_indices_by_user_id = this
891 .remote_participants
892 .iter()
893 .map(|(user_id, participant)| (*user_id, participant.participant_index))
894 .collect();
895 user_store.set_participant_indices(participant_indices_by_user_id, cx);
896 });
897
898 this.check_invariants();
899 this.room_update_completed_tx.try_send(Some(())).ok();
900 cx.notify();
901 })
902 .ok();
903 }));
904
905 cx.notify();
906 Ok(())
907 }
908
909 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
910 let mut done_rx = self.room_update_completed_rx.clone();
911 async move {
912 while let Some(result) = done_rx.next().await {
913 if result.is_some() {
914 break;
915 }
916 }
917 }
918 }
919
920 fn remote_video_track_updated(
921 &mut self,
922 change: RemoteVideoTrackUpdate,
923 cx: &mut ModelContext<Self>,
924 ) -> Result<()> {
925 match change {
926 RemoteVideoTrackUpdate::Subscribed(track) => {
927 let user_id = track.publisher_id().parse()?;
928 let track_id = track.sid().to_string();
929 let participant = self
930 .remote_participants
931 .get_mut(&user_id)
932 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
933 participant.video_tracks.insert(track_id.clone(), track);
934 cx.emit(Event::RemoteVideoTracksChanged {
935 participant_id: participant.peer_id,
936 });
937 }
938 RemoteVideoTrackUpdate::Unsubscribed {
939 publisher_id,
940 track_id,
941 } => {
942 let user_id = publisher_id.parse()?;
943 let participant = self
944 .remote_participants
945 .get_mut(&user_id)
946 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
947 participant.video_tracks.remove(&track_id);
948 cx.emit(Event::RemoteVideoTracksChanged {
949 participant_id: participant.peer_id,
950 });
951 }
952 }
953
954 cx.notify();
955 Ok(())
956 }
957
958 fn remote_audio_track_updated(
959 &mut self,
960 change: RemoteAudioTrackUpdate,
961 cx: &mut ModelContext<Self>,
962 ) -> Result<()> {
963 match change {
964 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
965 let mut speaker_ids = speakers
966 .into_iter()
967 .filter_map(|speaker_sid| speaker_sid.parse().ok())
968 .collect::<Vec<u64>>();
969 speaker_ids.sort_unstable();
970 for (sid, participant) in &mut self.remote_participants {
971 if let Ok(_) = speaker_ids.binary_search(sid) {
972 participant.speaking = true;
973 } else {
974 participant.speaking = false;
975 }
976 }
977 if let Some(id) = self.client.user_id() {
978 if let Some(room) = &mut self.live_kit {
979 if let Ok(_) = speaker_ids.binary_search(&id) {
980 room.speaking = true;
981 } else {
982 room.speaking = false;
983 }
984 }
985 }
986 cx.notify();
987 }
988 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
989 let mut found = false;
990 for participant in &mut self.remote_participants.values_mut() {
991 for track in participant.audio_tracks.values() {
992 if track.sid() == track_id {
993 found = true;
994 break;
995 }
996 }
997 if found {
998 participant.muted = muted;
999 break;
1000 }
1001 }
1002
1003 cx.notify();
1004 }
1005 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1006 let user_id = track.publisher_id().parse()?;
1007 let track_id = track.sid().to_string();
1008 let participant = self
1009 .remote_participants
1010 .get_mut(&user_id)
1011 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1012 participant.audio_tracks.insert(track_id.clone(), track);
1013 participant.muted = publication.is_muted();
1014
1015 cx.emit(Event::RemoteAudioTracksChanged {
1016 participant_id: participant.peer_id,
1017 });
1018 }
1019 RemoteAudioTrackUpdate::Unsubscribed {
1020 publisher_id,
1021 track_id,
1022 } => {
1023 let user_id = publisher_id.parse()?;
1024 let participant = self
1025 .remote_participants
1026 .get_mut(&user_id)
1027 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1028 participant.audio_tracks.remove(&track_id);
1029 cx.emit(Event::RemoteAudioTracksChanged {
1030 participant_id: participant.peer_id,
1031 });
1032 }
1033 }
1034
1035 cx.notify();
1036 Ok(())
1037 }
1038
1039 fn check_invariants(&self) {
1040 #[cfg(any(test, feature = "test-support"))]
1041 {
1042 for participant in self.remote_participants.values() {
1043 assert!(self.participant_user_ids.contains(&participant.user.id));
1044 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1045 }
1046
1047 for participant in &self.pending_participants {
1048 assert!(self.participant_user_ids.contains(&participant.id));
1049 assert_ne!(participant.id, self.client.user_id().unwrap());
1050 }
1051
1052 assert_eq!(
1053 self.participant_user_ids.len(),
1054 self.remote_participants.len() + self.pending_participants.len()
1055 );
1056 }
1057 }
1058
1059 pub(crate) fn call(
1060 &mut self,
1061 called_user_id: u64,
1062 initial_project_id: Option<u64>,
1063 cx: &mut ModelContext<Self>,
1064 ) -> Task<Result<()>> {
1065 if self.status.is_offline() {
1066 return Task::ready(Err(anyhow!("room is offline")));
1067 }
1068
1069 cx.notify();
1070 let client = self.client.clone();
1071 let room_id = self.id;
1072 self.pending_call_count += 1;
1073 cx.spawn(move |this, mut cx| async move {
1074 let result = client
1075 .request(proto::Call {
1076 room_id,
1077 called_user_id,
1078 initial_project_id,
1079 })
1080 .await;
1081 this.update(&mut cx, |this, cx| {
1082 this.pending_call_count -= 1;
1083 if this.should_leave() {
1084 this.leave(cx).detach_and_log_err(cx);
1085 }
1086 })?;
1087 result?;
1088 Ok(())
1089 })
1090 }
1091
1092 pub fn join_project(
1093 &mut self,
1094 id: u64,
1095 language_registry: Arc<LanguageRegistry>,
1096 fs: Arc<dyn Fs>,
1097 cx: &mut ModelContext<Self>,
1098 ) -> Task<Result<Model<Project>>> {
1099 let client = self.client.clone();
1100 let user_store = self.user_store.clone();
1101 cx.emit(Event::RemoteProjectJoined { project_id: id });
1102 cx.spawn(move |this, mut cx| async move {
1103 let project =
1104 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1105
1106 this.update(&mut cx, |this, cx| {
1107 this.joined_projects.retain(|project| {
1108 if let Some(project) = project.upgrade() {
1109 !project.read(cx).is_read_only()
1110 } else {
1111 false
1112 }
1113 });
1114 this.joined_projects.insert(project.downgrade());
1115 })?;
1116 Ok(project)
1117 })
1118 }
1119
1120 pub(crate) fn share_project(
1121 &mut self,
1122 project: Model<Project>,
1123 cx: &mut ModelContext<Self>,
1124 ) -> Task<Result<u64>> {
1125 if let Some(project_id) = project.read(cx).remote_id() {
1126 return Task::ready(Ok(project_id));
1127 }
1128
1129 let request = self.client.request(proto::ShareProject {
1130 room_id: self.id(),
1131 worktrees: project.read(cx).worktree_metadata_protos(cx),
1132 });
1133 cx.spawn(|this, mut cx| async move {
1134 let response = request.await?;
1135
1136 project.update(&mut cx, |project, cx| {
1137 project.shared(response.project_id, cx)
1138 })??;
1139
1140 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1141 this.update(&mut cx, |this, cx| {
1142 this.shared_projects.insert(project.downgrade());
1143 let active_project = this.local_participant.active_project.as_ref();
1144 if active_project.map_or(false, |location| *location == project) {
1145 this.set_location(Some(&project), cx)
1146 } else {
1147 Task::ready(Ok(()))
1148 }
1149 })?
1150 .await?;
1151
1152 Ok(response.project_id)
1153 })
1154 }
1155
1156 pub(crate) fn unshare_project(
1157 &mut self,
1158 project: Model<Project>,
1159 cx: &mut ModelContext<Self>,
1160 ) -> Result<()> {
1161 let project_id = match project.read(cx).remote_id() {
1162 Some(project_id) => project_id,
1163 None => return Ok(()),
1164 };
1165
1166 self.client.send(proto::UnshareProject { project_id })?;
1167 project.update(cx, |this, cx| this.unshare(cx))
1168 }
1169
1170 pub(crate) fn set_location(
1171 &mut self,
1172 project: Option<&Model<Project>>,
1173 cx: &mut ModelContext<Self>,
1174 ) -> Task<Result<()>> {
1175 if self.status.is_offline() {
1176 return Task::ready(Err(anyhow!("room is offline")));
1177 }
1178
1179 let client = self.client.clone();
1180 let room_id = self.id;
1181 let location = if let Some(project) = project {
1182 self.local_participant.active_project = Some(project.downgrade());
1183 if let Some(project_id) = project.read(cx).remote_id() {
1184 proto::participant_location::Variant::SharedProject(
1185 proto::participant_location::SharedProject { id: project_id },
1186 )
1187 } else {
1188 proto::participant_location::Variant::UnsharedProject(
1189 proto::participant_location::UnsharedProject {},
1190 )
1191 }
1192 } else {
1193 self.local_participant.active_project = None;
1194 proto::participant_location::Variant::External(proto::participant_location::External {})
1195 };
1196
1197 cx.notify();
1198 cx.executor().spawn_on_main(move || async move {
1199 client
1200 .request(proto::UpdateParticipantLocation {
1201 room_id,
1202 location: Some(proto::ParticipantLocation {
1203 variant: Some(location),
1204 }),
1205 })
1206 .await?;
1207 Ok(())
1208 })
1209 }
1210
1211 pub fn is_screen_sharing(&self) -> bool {
1212 self.live_kit.as_ref().map_or(false, |live_kit| {
1213 !matches!(live_kit.screen_track, LocalTrack::None)
1214 })
1215 }
1216
1217 pub fn is_sharing_mic(&self) -> bool {
1218 self.live_kit.as_ref().map_or(false, |live_kit| {
1219 !matches!(live_kit.microphone_track, LocalTrack::None)
1220 })
1221 }
1222
1223 pub fn is_muted(&self, cx: &AppContext) -> bool {
1224 self.live_kit
1225 .as_ref()
1226 .and_then(|live_kit| match &live_kit.microphone_track {
1227 LocalTrack::None => Some(Self::mute_on_join(cx)),
1228 LocalTrack::Pending { muted, .. } => Some(*muted),
1229 LocalTrack::Published { muted, .. } => Some(*muted),
1230 })
1231 .unwrap_or(false)
1232 }
1233
1234 pub fn is_speaking(&self) -> bool {
1235 self.live_kit
1236 .as_ref()
1237 .map_or(false, |live_kit| live_kit.speaking)
1238 }
1239
1240 pub fn is_deafened(&self) -> Option<bool> {
1241 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1242 }
1243
1244 #[track_caller]
1245 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1246 if self.status.is_offline() {
1247 return Task::ready(Err(anyhow!("room is offline")));
1248 } else if self.is_sharing_mic() {
1249 return Task::ready(Err(anyhow!("microphone was already shared")));
1250 }
1251
1252 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1253 let publish_id = post_inc(&mut live_kit.next_publish_id);
1254 live_kit.microphone_track = LocalTrack::Pending {
1255 publish_id,
1256 muted: false,
1257 };
1258 cx.notify();
1259 publish_id
1260 } else {
1261 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1262 };
1263
1264 cx.spawn(move |this, mut cx| async move {
1265 let publish_track = async {
1266 let track = LocalAudioTrack::create();
1267 this.upgrade()
1268 .ok_or_else(|| anyhow!("room was dropped"))?
1269 .update(&mut cx, |this, _| {
1270 this.live_kit
1271 .as_ref()
1272 .map(|live_kit| live_kit.room.publish_audio_track(track))
1273 })?
1274 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1275 .await
1276 };
1277
1278 let publication = publish_track.await;
1279 this.upgrade()
1280 .ok_or_else(|| anyhow!("room was dropped"))?
1281 .update(&mut cx, |this, cx| {
1282 let live_kit = this
1283 .live_kit
1284 .as_mut()
1285 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1286
1287 let (canceled, muted) = if let LocalTrack::Pending {
1288 publish_id: cur_publish_id,
1289 muted,
1290 } = &live_kit.microphone_track
1291 {
1292 (*cur_publish_id != publish_id, *muted)
1293 } else {
1294 (true, false)
1295 };
1296
1297 match publication {
1298 Ok(publication) => {
1299 if canceled {
1300 live_kit.room.unpublish_track(publication);
1301 } else {
1302 if muted {
1303 cx.executor().spawn(publication.set_mute(muted)).detach();
1304 }
1305 live_kit.microphone_track = LocalTrack::Published {
1306 track_publication: publication,
1307 muted,
1308 };
1309 cx.notify();
1310 }
1311 Ok(())
1312 }
1313 Err(error) => {
1314 if canceled {
1315 Ok(())
1316 } else {
1317 live_kit.microphone_track = LocalTrack::None;
1318 cx.notify();
1319 Err(error)
1320 }
1321 }
1322 }
1323 })?
1324 })
1325 }
1326
1327 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1328 if self.status.is_offline() {
1329 return Task::ready(Err(anyhow!("room is offline")));
1330 } else if self.is_screen_sharing() {
1331 return Task::ready(Err(anyhow!("screen was already shared")));
1332 }
1333
1334 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1335 let publish_id = post_inc(&mut live_kit.next_publish_id);
1336 live_kit.screen_track = LocalTrack::Pending {
1337 publish_id,
1338 muted: false,
1339 };
1340 cx.notify();
1341 (live_kit.room.display_sources(), publish_id)
1342 } else {
1343 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1344 };
1345
1346 cx.spawn_on_main(move |this, mut cx| async move {
1347 let publish_track = async {
1348 let displays = displays.await?;
1349 let display = displays
1350 .first()
1351 .ok_or_else(|| anyhow!("no display found"))?;
1352 let track = LocalVideoTrack::screen_share_for_display(&display);
1353 this.upgrade()
1354 .ok_or_else(|| anyhow!("room was dropped"))?
1355 .update(&mut cx, |this, _| {
1356 this.live_kit
1357 .as_ref()
1358 .map(|live_kit| live_kit.room.publish_video_track(track))
1359 })?
1360 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1361 .await
1362 };
1363
1364 let publication = publish_track.await;
1365 this.upgrade()
1366 .ok_or_else(|| anyhow!("room was dropped"))?
1367 .update(&mut cx, |this, cx| {
1368 let live_kit = this
1369 .live_kit
1370 .as_mut()
1371 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1372
1373 let (canceled, muted) = if let LocalTrack::Pending {
1374 publish_id: cur_publish_id,
1375 muted,
1376 } = &live_kit.screen_track
1377 {
1378 (*cur_publish_id != publish_id, *muted)
1379 } else {
1380 (true, false)
1381 };
1382
1383 match publication {
1384 Ok(publication) => {
1385 if canceled {
1386 live_kit.room.unpublish_track(publication);
1387 } else {
1388 if muted {
1389 cx.executor().spawn(publication.set_mute(muted)).detach();
1390 }
1391 live_kit.screen_track = LocalTrack::Published {
1392 track_publication: publication,
1393 muted,
1394 };
1395 cx.notify();
1396 }
1397
1398 Audio::play_sound(Sound::StartScreenshare, cx);
1399
1400 Ok(())
1401 }
1402 Err(error) => {
1403 if canceled {
1404 Ok(())
1405 } else {
1406 live_kit.screen_track = LocalTrack::None;
1407 cx.notify();
1408 Err(error)
1409 }
1410 }
1411 }
1412 })?
1413 })
1414 }
1415
1416 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1417 let should_mute = !self.is_muted(cx);
1418 if let Some(live_kit) = self.live_kit.as_mut() {
1419 if matches!(live_kit.microphone_track, LocalTrack::None) {
1420 return Ok(self.share_microphone(cx));
1421 }
1422
1423 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1424 live_kit.muted_by_user = should_mute;
1425
1426 if old_muted == true && live_kit.deafened == true {
1427 if let Some(task) = self.toggle_deafen(cx).ok() {
1428 task.detach();
1429 }
1430 }
1431
1432 Ok(ret_task)
1433 } else {
1434 Err(anyhow!("LiveKit not started"))
1435 }
1436 }
1437
1438 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1439 if let Some(live_kit) = self.live_kit.as_mut() {
1440 (*live_kit).deafened = !live_kit.deafened;
1441
1442 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1443 // Context notification is sent within set_mute itself.
1444 let mut mute_task = None;
1445 // When deafening, mute user's mic as well.
1446 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1447 if live_kit.deafened || !live_kit.muted_by_user {
1448 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1449 };
1450 for participant in self.remote_participants.values() {
1451 for track in live_kit
1452 .room
1453 .remote_audio_track_publications(&participant.user.id.to_string())
1454 {
1455 let deafened = live_kit.deafened;
1456 tasks.push(
1457 cx.executor()
1458 .spawn_on_main(move || track.set_enabled(!deafened)),
1459 );
1460 }
1461 }
1462
1463 Ok(cx.executor().spawn_on_main(|| async {
1464 if let Some(mute_task) = mute_task {
1465 mute_task.await?;
1466 }
1467 for task in tasks {
1468 task.await?;
1469 }
1470 Ok(())
1471 }))
1472 } else {
1473 Err(anyhow!("LiveKit not started"))
1474 }
1475 }
1476
1477 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1478 if self.status.is_offline() {
1479 return Err(anyhow!("room is offline"));
1480 }
1481
1482 let live_kit = self
1483 .live_kit
1484 .as_mut()
1485 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1486 match mem::take(&mut live_kit.screen_track) {
1487 LocalTrack::None => Err(anyhow!("screen was not shared")),
1488 LocalTrack::Pending { .. } => {
1489 cx.notify();
1490 Ok(())
1491 }
1492 LocalTrack::Published {
1493 track_publication, ..
1494 } => {
1495 live_kit.room.unpublish_track(track_publication);
1496 cx.notify();
1497
1498 Audio::play_sound(Sound::StopScreenshare, cx);
1499 Ok(())
1500 }
1501 }
1502 }
1503
1504 #[cfg(any(test, feature = "test-support"))]
1505 pub fn set_display_sources(&self, sources: Vec<live_kit_client2::MacOSDisplay>) {
1506 self.live_kit
1507 .as_ref()
1508 .unwrap()
1509 .room
1510 .set_display_sources(sources);
1511 }
1512}
1513
1514struct LiveKitRoom {
1515 room: Arc<live_kit_client2::Room>,
1516 screen_track: LocalTrack,
1517 microphone_track: LocalTrack,
1518 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1519 muted_by_user: bool,
1520 deafened: bool,
1521 speaking: bool,
1522 next_publish_id: usize,
1523 _maintain_room: Task<()>,
1524 _maintain_tracks: [Task<()>; 2],
1525}
1526
1527impl LiveKitRoom {
1528 fn set_mute(
1529 self: &mut LiveKitRoom,
1530 should_mute: bool,
1531 cx: &mut ModelContext<Room>,
1532 ) -> Result<(Task<Result<()>>, bool)> {
1533 if !should_mute {
1534 // clear user muting state.
1535 self.muted_by_user = false;
1536 }
1537
1538 let (result, old_muted) = match &mut self.microphone_track {
1539 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1540 LocalTrack::Pending { muted, .. } => {
1541 let old_muted = *muted;
1542 *muted = should_mute;
1543 cx.notify();
1544 Ok((Task::Ready(Some(Ok(()))), old_muted))
1545 }
1546 LocalTrack::Published {
1547 track_publication,
1548 muted,
1549 } => {
1550 let old_muted = *muted;
1551 *muted = should_mute;
1552 cx.notify();
1553 Ok((
1554 cx.executor().spawn(track_publication.set_mute(*muted)),
1555 old_muted,
1556 ))
1557 }
1558 }?;
1559
1560 if old_muted != should_mute {
1561 if should_mute {
1562 Audio::play_sound(Sound::Mute, cx);
1563 } else {
1564 Audio::play_sound(Sound::Unmute, cx);
1565 }
1566 }
1567
1568 Ok((result, old_muted))
1569 }
1570}
1571
1572enum LocalTrack {
1573 None,
1574 Pending {
1575 publish_id: usize,
1576 muted: bool,
1577 },
1578 Published {
1579 track_publication: LocalTrackPublication,
1580 muted: bool,
1581 },
1582}
1583
1584impl Default for LocalTrack {
1585 fn default() -> Self {
1586 Self::None
1587 }
1588}
1589
1590#[derive(Copy, Clone, PartialEq, Eq)]
1591pub enum RoomStatus {
1592 Online,
1593 Rejoining,
1594 Offline,
1595}
1596
1597impl RoomStatus {
1598 pub fn is_offline(&self) -> bool {
1599 matches!(self, RoomStatus::Offline)
1600 }
1601
1602 pub fn is_online(&self) -> bool {
1603 matches!(self, RoomStatus::Online)
1604 }
1605}